process - ¿Cómo leo la salida de un proceso secundario sin bloquear en Rust?
blocking pty (2)
Estoy haciendo una pequeña aplicación ncurses en Rust que necesita comunicarse con un proceso secundario. Ya tengo un prototipo escrito en Common Lisp. Estoy tratando de reescribirlo porque CL usa una gran cantidad de memoria para una herramienta tan pequeña.
Tengo algunos problemas para descubrir cómo interactuar con el subproceso.
Lo que estoy haciendo actualmente es más o menos esto:
-
Crea el proceso:
let mut program = match Command::new(command) .args(arguments) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(child) => child, Err(_) => { println!("Cannot run program ''{}''.", command); return; } };
-
Pásalo a un bucle infinito (hasta que el usuario salga), que lee y maneja la entrada y escucha la salida como esta (y la escribe en la pantalla):
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) { match program.stdout { Some(ref mut out) => { let mut buf_string = String::new(); match out.read_to_string(&mut buf_string) { Ok(_) => output_viewer.append_string(buf_string), Err(_) => return, }; } None => return, }; }
read_to_string
embargo, la llamada a
read_to_string
bloquea el programa hasta que
read_to_string
el proceso.
Por lo que puedo ver,
read_to_end
y
read
también parecen bloquearse.
Si intento ejecutar algo como
ls
que sale de inmediato, funciona, pero con algo que no sale como
python
o
sbcl
solo continúa una vez que
sbcl
el subproceso manualmente.
Basado en
esta respuesta
, cambié el código para usar
BufReader
:
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) {
match program.stdout.as_mut() {
Some(out) => {
let buf_reader = BufReader::new(out);
for line in buf_reader.lines() {
match line {
Ok(l) => {
output_viewer.append_string(l);
}
Err(_) => return,
};
}
}
None => return,
}
}
Sin embargo, el problema sigue siendo el mismo.
Leerá todas las líneas disponibles y luego las bloqueará.
Como se supone que la herramienta funciona con cualquier programa, no hay forma de adivinar cuándo terminará la salida, antes de intentar leer.
Tampoco parece haber una manera de establecer un tiempo de espera para
BufReader
.
Proceso de Tokio
Aquí hay un ejemplo del uso de tokio y tokio-process .
use std::{
io::BufReader,
process::{Command, Stdio},
};
use tokio::{io, prelude::*, runtime::Runtime}; // 0.1.18
use tokio_process::CommandExt; // 0.2.3
fn main() {
let mut cmd = Command::new("/tmp/slow.bash")
.stdout(Stdio::piped())
.spawn_async()
.expect("cannot spawn");
let stdout = cmd.stdout().take().expect("no stdout");
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
io::lines(BufReader::new(stdout))
.inspect(|s| println!("> {}", s))
.collect()
});
println!("All the lines: {:?}", result);
}
Tokio-Threadpool
Aquí hay un ejemplo del uso de
tokio
y
tokio-threadpool
.
Comenzamos el proceso en un hilo usando la función de
blocking
.
Lo convertimos en una secuencia con
stream::poll_fn
use std::process::{Command, Stdio};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13
fn stream_command_output(
mut command: Command,
) -> impl Stream<Item = Vec<u8>, Error = tokio_threadpool::BlockingError> {
// Ensure that the output is available to read from and start the process
let mut child = command
.stdout(Stdio::piped())
.spawn()
.expect("cannot spawn");
let mut stdout = child.stdout.take().expect("no stdout");
// Create a stream of data
stream::poll_fn(move || {
// Perform blocking IO
tokio_threadpool::blocking(|| {
// Allocate some space to store anything read
let mut data = vec![0; 128];
// Read 1-128 bytes of data
let n_bytes_read = stdout.read(&mut data).expect("cannot read");
if n_bytes_read == 0 {
// Stdout is done
None
} else {
// Only return as many bytes as we read
data.truncate(n_bytes_read);
Some(data)
}
})
})
}
fn main() {
let output_stream = stream_command_output(Command::new("/tmp/slow.bash"));
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on({
output_stream
.map(|d| String::from_utf8(d).expect("Not UTF-8"))
.fold(Vec::new(), |mut v, s| {
print!("> {}", s);
v.push(s);
Ok(v)
})
});
println!("All the lines: {:?}", result);
}
Hay numerosas posibles compensaciones que se pueden hacer aquí. Por ejemplo, siempre asignar 128 bytes no es ideal, pero es fácil de implementar.
Apoyo
Como referencia, aquí está slow.bash :
#!/usr/bin/env bash
set -eu
val=0
while [[ $val -lt 10 ]]; do
echo $val
val=$(($val + 1))
sleep 1
done
Ver también:
- ¿Cómo devuelvo sincrónicamente un valor calculado en un futuro asíncrono en óxido estable?
Las transmisiones están bloqueadas por defecto. Las secuencias TCP / IP, las secuencias del sistema de archivos, las secuencias de tuberías, todas están bloqueando. Cuando le dice a una secuencia que le dé una porción de bytes, se detendrá y esperará hasta que tenga la cantidad de bytes dada o hasta que ocurra algo más (una interrupt , un final de secuencia, un error).
Los sistemas operativos están ansiosos por devolver los datos al proceso de lectura, por lo que si todo lo que desea es esperar a la siguiente línea y manejarla tan pronto como llegue, entonces el método sugerido por Shepmaster en
No se puede canalizar hacia o desde el niño generado procesar más de una vez
funciona.
(En teoría, no tiene que hacerlo, porque un sistema operativo puede hacer que el
BufReader
espere más datos en
read
, pero en la práctica los sistemas operativos prefieren las primeras "lecturas cortas" a esperar).
Este enfoque simple basado en
BufReader
deja de funcionar cuando necesita manejar múltiples flujos (como
stdout
y
stderr
de un proceso secundario) o múltiples procesos.
Por ejemplo, el
BufReader
basado en
BufReader
podría llegar a un punto muerto cuando un proceso secundario espera a que usted drene su tubería
stderr
mientras su proceso está bloqueado esperando su salida
BufReader
vacía.
Del mismo modo, no puede usar
BufReader
cuando no desea que su programa espere el proceso secundario indefinidamente.
Tal vez desee mostrar una barra de progreso o un temporizador mientras el niño todavía está trabajando y no le da salida.
No puede utilizar el
BufReader
basado en
BufReader
si su sistema operativo no está ansioso por devolver los datos al proceso (prefiere "lecturas completas" a "lecturas cortas") porque en ese caso algunas últimas líneas impresas por el proceso secundario podría terminar en una zona gris: el sistema operativo los consiguió, pero no son lo suficientemente grandes como para llenar el
BufReader
del
BufReader
.
BufReader
está limitado a lo que la interfaz de
Read
permite hacer con la transmisión, no es menos bloqueante que la transmisión subyacente.
Para ser eficiente,
read
la entrada en fragmentos y le indicará al sistema operativo que llene la mayor cantidad de búfer que tenga disponible.
Tal vez se pregunte por qué es tan importante leer datos en fragmentos aquí, por qué
BufReader
no puede leer los datos byte a byte.
El problema es que para leer los datos de una transmisión necesitamos la ayuda del sistema operativo.
Por otro lado, no somos el sistema operativo, trabajamos aislados de él, para no meternos con él si algo sale mal en nuestro proceso.
Entonces, para llamar al sistema operativo, debe haber una transición al "modo kernel", que también podría incurrir en un "cambio de contexto".
Es por eso que llamar al sistema operativo para leer cada byte es costoso.
Queremos la menor cantidad posible de llamadas al sistema operativo, por lo que obtenemos los datos de la secuencia en lotes.
Para esperar en una transmisión sin bloquear, necesitará una transmisión sin bloqueo. MIO promete tener el soporte de flujo sin bloqueo requerido para tuberías , probablemente con PipeReader , pero hasta ahora no lo he comprobado.
La naturaleza sin bloqueo de una secuencia debería permitir leer datos en fragmentos independientemente de si el sistema operativo prefiere las "lecturas cortas" o no. Porque la secuencia sin bloqueo nunca bloquea. Si no hay datos en la secuencia, simplemente se lo indica.
En ausencia de una secuencia sin bloqueo, tendrá que recurrir a subprocesos de generación para que las lecturas de bloqueo se realicen en un subproceso separado y, por lo tanto, no bloqueen su subproceso principal. También es posible que desee leer el flujo byte a byte para reaccionar al separador de línea de inmediato en caso de que el sistema operativo no prefiera las "lecturas cortas". Aquí hay un ejemplo de trabajo: https://gist.github.com/ArtemGr/db40ae04b431a95f2b78 .