process io rust blocking pty

process - ¿Cómo leo la salida de un proceso secundario sin bloquear en Rust?



blocking pty (2)

Estoy haciendo una pequeña aplicación ncurses en Rust que necesita comunicarse con un proceso secundario. Ya tengo un prototipo escrito en Common Lisp. Estoy tratando de reescribirlo porque CL usa una gran cantidad de memoria para una herramienta tan pequeña.

Tengo algunos problemas para descubrir cómo interactuar con el subproceso.

Lo que estoy haciendo actualmente es más o menos esto:

  1. Crea el proceso:

    let mut program = match Command::new(command) .args(arguments) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(child) => child, Err(_) => { println!("Cannot run program ''{}''.", command); return; } };

  2. Pásalo a un bucle infinito (hasta que el usuario salga), que lee y maneja la entrada y escucha la salida como esta (y la escribe en la pantalla):

    fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) { match program.stdout { Some(ref mut out) => { let mut buf_string = String::new(); match out.read_to_string(&mut buf_string) { Ok(_) => output_viewer.append_string(buf_string), Err(_) => return, }; } None => return, }; }

read_to_string embargo, la llamada a read_to_string bloquea el programa hasta que read_to_string el proceso. Por lo que puedo ver, read_to_end y read también parecen bloquearse. Si intento ejecutar algo como ls que sale de inmediato, funciona, pero con algo que no sale como python o sbcl solo continúa una vez que sbcl el subproceso manualmente.

Basado en esta respuesta , cambié el código para usar BufReader :

fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) { match program.stdout.as_mut() { Some(out) => { let buf_reader = BufReader::new(out); for line in buf_reader.lines() { match line { Ok(l) => { output_viewer.append_string(l); } Err(_) => return, }; } } None => return, } }

Sin embargo, el problema sigue siendo el mismo. Leerá todas las líneas disponibles y luego las bloqueará. Como se supone que la herramienta funciona con cualquier programa, no hay forma de adivinar cuándo terminará la salida, antes de intentar leer. Tampoco parece haber una manera de establecer un tiempo de espera para BufReader .


Proceso de Tokio

Aquí hay un ejemplo del uso de tokio y tokio-process .

use std::{ io::BufReader, process::{Command, Stdio}, }; use tokio::{io, prelude::*, runtime::Runtime}; // 0.1.18 use tokio_process::CommandExt; // 0.2.3 fn main() { let mut cmd = Command::new("/tmp/slow.bash") .stdout(Stdio::piped()) .spawn_async() .expect("cannot spawn"); let stdout = cmd.stdout().take().expect("no stdout"); let mut runtime = Runtime::new().expect("Unable to start the runtime"); let result = runtime.block_on({ io::lines(BufReader::new(stdout)) .inspect(|s| println!("> {}", s)) .collect() }); println!("All the lines: {:?}", result); }

Tokio-Threadpool

Aquí hay un ejemplo del uso de tokio y tokio-threadpool . Comenzamos el proceso en un hilo usando la función de blocking . Lo convertimos en una secuencia con stream::poll_fn

use std::process::{Command, Stdio}; use tokio::{prelude::*, runtime::Runtime}; // 0.1.18 use tokio_threadpool; // 0.1.13 fn stream_command_output( mut command: Command, ) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> { // Ensure that the output is available to read from and start the process let mut child = command .stdout(Stdio::piped()) .spawn() .expect("cannot spawn"); let mut stdout = child.stdout.take().expect("no stdout"); // Create a stream of data stream::poll_fn(move || { // Perform blocking IO tokio_threadpool::blocking(|| { // Allocate some space to store anything read let mut data = vec![0; 128]; // Read 1-128 bytes of data let n_bytes_read = stdout.read(&mut data).expect("cannot read"); if n_bytes_read == 0 { // Stdout is done None } else { // Only return as many bytes as we read data.truncate(n_bytes_read); Some(data) } }) }) } fn main() { let output_stream = stream_command_output(Command::new("/tmp/slow.bash")); let mut runtime = Runtime::new().expect("Unable to start the runtime"); let result = runtime.block_on({ output_stream .map(|d| String::from_utf8(d).expect("Not UTF-8")) .fold(Vec::new(), |mut v, s| { print!("> {}", s); v.push(s); Ok(v) }) }); println!("All the lines: {:?}", result); }

Hay numerosas posibles compensaciones que se pueden hacer aquí. Por ejemplo, siempre asignar 128 bytes no es ideal, pero es fácil de implementar.

Apoyo

Como referencia, aquí está slow.bash :

#!/usr/bin/env bash set -eu val=0 while [[ $val -lt 10 ]]; do echo $val val=$(($val + 1)) sleep 1 done

Ver también:

  • ¿Cómo devuelvo sincrónicamente un valor calculado en un futuro asíncrono en óxido estable?

Las transmisiones están bloqueadas por defecto. Las secuencias TCP / IP, las secuencias del sistema de archivos, las secuencias de tuberías, todas están bloqueando. Cuando le dice a una secuencia que le dé una porción de bytes, se detendrá y esperará hasta que tenga la cantidad de bytes dada o hasta que ocurra algo más (una interrupt , un final de secuencia, un error).

Los sistemas operativos están ansiosos por devolver los datos al proceso de lectura, por lo que si todo lo que desea es esperar a la siguiente línea y manejarla tan pronto como llegue, entonces el método sugerido por Shepmaster en No se puede canalizar hacia o desde el niño generado procesar más de una vez funciona. (En teoría, no tiene que hacerlo, porque un sistema operativo puede hacer que el BufReader espere más datos en read , pero en la práctica los sistemas operativos prefieren las primeras "lecturas cortas" a esperar).

Este enfoque simple basado en BufReader deja de funcionar cuando necesita manejar múltiples flujos (como stdout y stderr de un proceso secundario) o múltiples procesos. Por ejemplo, el BufReader basado en BufReader podría llegar a un punto muerto cuando un proceso secundario espera a que usted drene su tubería stderr mientras su proceso está bloqueado esperando su salida BufReader vacía.

Del mismo modo, no puede usar BufReader cuando no desea que su programa espere el proceso secundario indefinidamente. Tal vez desee mostrar una barra de progreso o un temporizador mientras el niño todavía está trabajando y no le da salida.

No puede utilizar el BufReader basado en BufReader si su sistema operativo no está ansioso por devolver los datos al proceso (prefiere "lecturas completas" a "lecturas cortas") porque en ese caso algunas últimas líneas impresas por el proceso secundario podría terminar en una zona gris: el sistema operativo los consiguió, pero no son lo suficientemente grandes como para llenar el BufReader del BufReader .

BufReader está limitado a lo que la interfaz de Read permite hacer con la transmisión, no es menos bloqueante que la transmisión subyacente. Para ser eficiente, read la entrada en fragmentos y le indicará al sistema operativo que llene la mayor cantidad de búfer que tenga disponible.

Tal vez se pregunte por qué es tan importante leer datos en fragmentos aquí, por qué BufReader no puede leer los datos byte a byte. El problema es que para leer los datos de una transmisión necesitamos la ayuda del sistema operativo. Por otro lado, no somos el sistema operativo, trabajamos aislados de él, para no meternos con él si algo sale mal en nuestro proceso. Entonces, para llamar al sistema operativo, debe haber una transición al "modo kernel", que también podría incurrir en un "cambio de contexto". Es por eso que llamar al sistema operativo para leer cada byte es costoso. Queremos la menor cantidad posible de llamadas al sistema operativo, por lo que obtenemos los datos de la secuencia en lotes.

Para esperar en una transmisión sin bloquear, necesitará una transmisión sin bloqueo. MIO promete tener el soporte de flujo sin bloqueo requerido para tuberías , probablemente con PipeReader , pero hasta ahora no lo he comprobado.

La naturaleza sin bloqueo de una secuencia debería permitir leer datos en fragmentos independientemente de si el sistema operativo prefiere las "lecturas cortas" o no. Porque la secuencia sin bloqueo nunca bloquea. Si no hay datos en la secuencia, simplemente se lo indica.

En ausencia de una secuencia sin bloqueo, tendrá que recurrir a subprocesos de generación para que las lecturas de bloqueo se realicen en un subproceso separado y, por lo tanto, no bloqueen su subproceso principal. También es posible que desee leer el flujo byte a byte para reaccionar al separador de línea de inmediato en caso de que el sistema operativo no prefiera las "lecturas cortas". Aquí hay un ejemplo de trabajo: https://gist.github.com/ArtemGr/db40ae04b431a95f2b78 .