rust - socket - servidor varios clientes java
Reconexión asÃncrona de un cliente a un servidor en un bucle infinito (1)
La pregunta clave parece ser: ¿cómo implemento un bucle infinito usando Tokio ? Al responder a esta pregunta, podemos abordar el problema de reconectarnos infinitamente después de la desconexión. Desde mi experiencia escribiendo código asíncrono, la recursión parece ser una solución directa a este problema.
ACTUALIZACIÓN : como lo señaló Shepmaster (y la gente de Tokio Gitter), mi respuesta original pierde memoria ya que construimos una cadena de futuros que crece en cada iteración. Aquí sigue una nueva:
Respuesta actualizada: usar loop_fn
Hay una función en la caja de futures
que hace exactamente lo que necesita. Se llama loop_fn
. Puedes usarlo cambiando tu función principal a lo siguiente:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = future::loop_fn((), |_| {
// Run the get_connection function and loop again regardless of its result
get_connection(&handle).map(|_| -> Loop<(), ()> {
Loop::Continue(())
})
});
core.run(client).unwrap();
}
La función se asemeja a un bucle for, que puede continuar o interrumpirse dependiendo del resultado de get_connection
(consulte la documentación para la enumeración de Loop
). En este caso, optamos por continuar siempre, por lo que se seguirá reconectando infinitamente.
Tenga en cuenta que su versión de get_connection
entrará en pánico si se produce un error (por ejemplo, si el cliente no puede conectarse al servidor). Si también desea volver a intentarlo después de un error, ¡debe eliminar la llamada al panic!
.
Antigua respuesta: usar la recursividad.
Aquí sigue mi antigua respuesta, en caso de que a alguien le resulte interesante.
ADVERTENCIA : el uso del código a continuación produce un crecimiento ilimitado de la memoria.
Haciendo bucle get_connection
infinitamente
Queremos llamar a la función get_connection
cada vez que el cliente se desconecta, así que eso es exactamente lo que vamos a hacer (mire el comentario después de reader.and_then
):
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let handle_clone = handle.clone();
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.and_then(move |_| {
println!("CLIENT DISCONNECTED");
// Attempt to reconnect in the future
get_connection(&handle_clone)
})
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
Recuerda que get_connection
no es bloqueante. Simplemente construye una Box<Future>
. Esto significa que cuando lo llamamos recursivamente, todavía no bloqueamos. En cambio, obtenemos un nuevo futuro, que podemos vincular al anterior mediante el uso de and_then
. Como puede ver, esto es diferente a la recursión normal ya que la pila no crece en cada iteración.
Tenga en cuenta que necesitamos clonar el handle
(ver handle_clone
), y moverlo al cierre que se le pasa al reader.and_then
. Esto es necesario porque el cierre durará más tiempo que la función (estará contenido en el futuro al que regresaremos).
Errores de manejo
El código que proporcionó no controla el caso en el que el cliente no puede conectarse al servidor (ni ningún otro error). Siguiendo el mismo principio que se muestra arriba, podemos manejar los errores cambiando el final de get_connection
a lo siguiente:
let handle_clone = handle.clone();
let client = client.or_else(move |err| {
// Note: this code will infinitely retry, but you could pattern match on the error
// to retry only on certain kinds of error
println!("Error connecting to server: {}", err);
get_connection(&handle_clone)
});
Box::new(client)
Tenga en cuenta que or_else
es como and_then
, pero funciona con el error producido por el futuro.
Eliminando código innecesario de main
Por último, no es necesario utilizar and_then
en la función main
. Puedes reemplazar tu main
por el siguiente código:
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
core.run(client).unwrap();
}
No puedo crear un cliente que intente conectarse a un servidor y:
- Si el servidor está inactivo, tiene que volver a intentarlo en un bucle infinito.
- Si el servidor está activo y la conexión es exitosa, cuando se pierde la conexión (es decir, el servidor desconecta al cliente), el cliente debe reiniciar el bucle infinito para intentar conectarse al servidor.
Aquí está el código para conectarse a un servidor; Actualmente, cuando se pierde la conexión, el programa sale. No estoy seguro de cuál es la mejor manera de implementarlo; tal vez tengo que crear un Future
con un bucle infinito?
extern crate tokio_line;
use tokio_line::LineCodec;
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
let remote_addr = "127.0.0.1:9876".parse().unwrap();
let tcp = TcpStream::connect(&remote_addr, handle);
let client = tcp.and_then(|stream| {
let (sink, from_server) = stream.framed(LineCodec).split();
let reader = from_server.for_each(|message| {
println!("{}", message);
Ok(())
});
reader.map(|_| {
println!("CLIENT DISCONNECTED");
()
}).map_err(|err| err)
});
let client = client.map_err(|_| { panic!()});
Box::new(client)
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = get_connection(&handle);
let client = client.and_then(|c| {
println!("Try to reconnect");
get_connection(&handle);
Ok(())
});
core.run(client).unwrap();
}
Agregue la caja tokio-line con:
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }