performance rust future

performance - ¿Cuál es el mejor enfoque para encapsular el bloqueo de E/S en futuros rs?



rust future (1)

Idealmente, una tarea independiente realizaría la E / S y el futuro asociado sondearía el hilo de E / S para el estado de finalización.

Sí, esto es lo que recomienda Tokio y para lo que se crearon cajas como futures-cpupool . ¡Tenga en cuenta que esto no está restringido a E / S , sino que es válido para cualquier tarea síncrona de larga duración!

En este caso, programa un cierre para que se ejecute en el grupo. El grupo en sí realiza el trabajo para verificar si el cierre de bloqueo se ha completado todavía y cumple con el rasgo Future .

use futures::{future, Future}; // 0.1.27 use futures_cpupool::CpuPool; // 0.1.8 use std::thread; use std::time::Duration; fn main() { let pool = CpuPool::new(8); let a = pool.spawn_fn(|| { thread::sleep(Duration::from_secs(3)); future::ok::<_, ()>(3) }); let b = pool.spawn_fn(|| { thread::sleep(Duration::from_secs(1)); future::ok::<_, ()>(1) }); let c = a.join(b).map(|(a, b)| a + b); let result = c.wait(); println!("{:?}", result); }

Tenga en cuenta que esta no es una forma eficiente de dormir, es solo un marcador de posición para algunas operaciones de bloqueo. Si realmente necesita dormir, use algo como futures-timer o el futures-timer tokio-timer . Vea ¿Por qué Future :: select elige primero el futuro con un período de sueño más largo? para más detalles

Puede ver que el tiempo total es de solo 3 segundos:

$ time ./target/debug/example Ok(4) real 0m3.021s user 0m0.008s sys 0m0.009s

¿Qué pasa con tokio-threadpool?

Puede parecer que tokio-threadpool se puede usar para el mismo resultado:

use std::{thread, time::Duration}; use tokio::{prelude::*, runtime::Runtime}; // 0.1.20 use tokio_threadpool; // 0.1.14 fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> { future::poll_fn(move || { tokio_threadpool::blocking(|| { thread::sleep(Duration::from_secs(seconds)); seconds }) }) } fn main() { let a = delay_for(3); let b = delay_for(1); let sum = a.join(b).map(|(a, b)| a + b); let mut runtime = Runtime::new().expect("Unable to start the runtime"); let result = runtime.block_on(sum); println!("{:?}", result); }

Sin embargo, ejecutar este código muestra que lleva 4 segundos:

$ time ./target/debug/example Ok(4) real 0m4.033s user 0m0.015s sys 0m0.012s

Esto es tocado por la documentación para el blocking (énfasis mío):

Toda la tarea que se llama blocking se bloquea cada vez que se bloquea el cierre suministrado, incluso si ha utilizado futuros combinadores como select - los otros futuros en esta tarea no avanzarán hasta que regrese el cierre. Si esto no se desea, asegúrese de que el blocking ejecute en su propia tarea (por ejemplo, utilizando futures::sync::oneshot::spawn ).

Esto podría verse más o menos así:

use futures; // 0.1.27 use std::{thread, time::Duration}; use tokio::{executor::DefaultExecutor, prelude::*, runtime::Runtime}; // 0.1.20 use tokio_threadpool; // 0.1.14 fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> { futures::lazy(move || { let f = future::poll_fn(move || { tokio_threadpool::blocking(|| { thread::sleep(Duration::from_secs(seconds)); seconds }) }); futures::sync::oneshot::spawn(f, &DefaultExecutor::current()) }) } fn main() { let a = delay_for(3); let b = delay_for(1); let sum = a.join(b).map(|(a, b)| a + b); let mut runtime = Runtime::new().expect("Unable to start the runtime"); let result = runtime.block_on(sum); println!("{:?}", result); }

Puntos adicionales

ninguna de las soluciones es óptima y no obtiene la ventaja completa del modelo de hilos verdes

Eso es correcto, ¡porque no tienes algo que sea asíncrono! Estás tratando de combinar dos metodologías diferentes y tiene que haber algo feo en algún lugar para traducir entre ellas.

segundo no pase por el ejecutor proporcionado por el marco del reactor

No estoy seguro de lo que quieres decir aquí. Solo hay un ejecutor en el ejemplo anterior; el creado implícitamente por wait . El grupo de subprocesos tiene cierta lógica interna que verifica si se ha realizado un subproceso, pero eso solo debe activarse cuando el ejecutor del usuario lo sondea.

Leí la documentación de tokio y me pregunto cuál es el mejor enfoque para encapsular costosas E / S síncronas en el futuro.

Con el marco del reactor, obtenemos la ventaja de un modelo de subprocesamiento verde: unos pocos subprocesos del sistema operativo manejan muchas tareas concurrentes a través de un ejecutor.

El modelo futuro de tokio está impulsado por la demanda, lo que significa que el futuro mismo sondeará su estado interno para proporcionar información sobre su finalización; permitiendo capacidades de contrapresión y cancelación. Según tengo entendido, la fase de sondeo del futuro no debe ser bloqueante para que funcione bien.

El I / OI que quiere encapsular puede verse como una operación atómica y costosa larga. Idealmente, una tarea independiente realizaría la E / S y el futuro asociado sondearía el hilo de E / S para el estado de finalización.

Las dos únicas opciones que veo son:

  • Incluya el bloqueo de E / S en la función de poll del futuro.
  • generar un hilo del sistema operativo para realizar la E / S y utilizar el mecanismo futuro para sondear su estado, como se muestra en la documentación

Según tengo entendido, ninguna de las soluciones es óptima y no aprovecha al máximo el modelo de subprocesamiento verde (el primero no se recomienda en la documentación y el segundo no pasa por el ejecutor proporcionado por el marco del reactor). ¿Hay otra solución?