performance - ¿Cuál es el mejor enfoque para encapsular el bloqueo de E/S en futuros rs?
rust future (1)
Idealmente, una tarea independiente realizaría la E / S y el futuro asociado sondearía el hilo de E / S para el estado de finalización.
Sí, esto es lo que recomienda Tokio y para lo que se crearon cajas como futures-cpupool . ¡Tenga en cuenta que esto no está restringido a E / S , sino que es válido para cualquier tarea síncrona de larga duración!
En este caso, programa un cierre para que se ejecute en el grupo.
El grupo en sí realiza el trabajo para verificar si el cierre de bloqueo se ha completado todavía y cumple con el rasgo
Future
.
use futures::{future, Future}; // 0.1.27
use futures_cpupool::CpuPool; // 0.1.8
use std::thread;
use std::time::Duration;
fn main() {
let pool = CpuPool::new(8);
let a = pool.spawn_fn(|| {
thread::sleep(Duration::from_secs(3));
future::ok::<_, ()>(3)
});
let b = pool.spawn_fn(|| {
thread::sleep(Duration::from_secs(1));
future::ok::<_, ()>(1)
});
let c = a.join(b).map(|(a, b)| a + b);
let result = c.wait();
println!("{:?}", result);
}
Tenga en cuenta que esta no es una forma eficiente de dormir, es solo un marcador de posición para algunas operaciones de bloqueo. Si realmente necesita dormir, use algo como futures-timer o el futures-timer tokio-timer . Vea ¿Por qué Future :: select elige primero el futuro con un período de sueño más largo? para más detalles
Puede ver que el tiempo total es de solo 3 segundos:
$ time ./target/debug/example
Ok(4)
real 0m3.021s
user 0m0.008s
sys 0m0.009s
¿Qué pasa con tokio-threadpool?
Puede parecer que tokio-threadpool se puede usar para el mismo resultado:
use std::{thread, time::Duration};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.20
use tokio_threadpool; // 0.1.14
fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
future::poll_fn(move || {
tokio_threadpool::blocking(|| {
thread::sleep(Duration::from_secs(seconds));
seconds
})
})
}
fn main() {
let a = delay_for(3);
let b = delay_for(1);
let sum = a.join(b).map(|(a, b)| a + b);
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on(sum);
println!("{:?}", result);
}
Sin embargo, ejecutar este código muestra que lleva 4 segundos:
$ time ./target/debug/example
Ok(4)
real 0m4.033s
user 0m0.015s
sys 0m0.012s
Esto es tocado por
la documentación para el
blocking
(énfasis mío):
Toda la tarea que se llama
blocking
se bloquea cada vez que se bloquea el cierre suministrado, incluso si ha utilizado futuros combinadores comoselect
- los otros futuros en esta tarea no avanzarán hasta que regrese el cierre. Si esto no se desea, asegúrese de que elblocking
ejecute en su propia tarea (por ejemplo, utilizandofutures::sync::oneshot::spawn
).
Esto podría verse más o menos así:
use futures; // 0.1.27
use std::{thread, time::Duration};
use tokio::{executor::DefaultExecutor, prelude::*, runtime::Runtime}; // 0.1.20
use tokio_threadpool; // 0.1.14
fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
futures::lazy(move || {
let f = future::poll_fn(move || {
tokio_threadpool::blocking(|| {
thread::sleep(Duration::from_secs(seconds));
seconds
})
});
futures::sync::oneshot::spawn(f, &DefaultExecutor::current())
})
}
fn main() {
let a = delay_for(3);
let b = delay_for(1);
let sum = a.join(b).map(|(a, b)| a + b);
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on(sum);
println!("{:?}", result);
}
Puntos adicionales
ninguna de las soluciones es óptima y no obtiene la ventaja completa del modelo de hilos verdes
Eso es correcto, ¡porque no tienes algo que sea asíncrono! Estás tratando de combinar dos metodologías diferentes y tiene que haber algo feo en algún lugar para traducir entre ellas.
segundo no pase por el ejecutor proporcionado por el marco del reactor
No estoy seguro de lo que quieres decir aquí.
Solo hay un ejecutor en el ejemplo anterior;
el creado implícitamente por
wait
.
El grupo de subprocesos tiene cierta lógica interna que verifica si se ha realizado un subproceso, pero eso solo debe activarse cuando el ejecutor del usuario lo sondea.
Leí la documentación de tokio y me pregunto cuál es el mejor enfoque para encapsular costosas E / S síncronas en el futuro.
Con el marco del reactor, obtenemos la ventaja de un modelo de subprocesamiento verde: unos pocos subprocesos del sistema operativo manejan muchas tareas concurrentes a través de un ejecutor.
El modelo futuro de tokio está impulsado por la demanda, lo que significa que el futuro mismo sondeará su estado interno para proporcionar información sobre su finalización; permitiendo capacidades de contrapresión y cancelación. Según tengo entendido, la fase de sondeo del futuro no debe ser bloqueante para que funcione bien.
El I / OI que quiere encapsular puede verse como una operación atómica y costosa larga. Idealmente, una tarea independiente realizaría la E / S y el futuro asociado sondearía el hilo de E / S para el estado de finalización.
Las dos únicas opciones que veo son:
-
Incluya el bloqueo de E / S en la función de
poll
del futuro. - generar un hilo del sistema operativo para realizar la E / S y utilizar el mecanismo futuro para sondear su estado, como se muestra en la documentación
Según tengo entendido, ninguna de las soluciones es óptima y no aprovecha al máximo el modelo de subprocesamiento verde (el primero no se recomienda en la documentación y el segundo no pasa por el ejecutor proporcionado por el marco del reactor). ¿Hay otra solución?