Lectura sin bloqueo en un subproceso. PIPE en python
io subprocess (26)
¿Por qué molestar hilo y cola? a diferencia de readline (), BufferedReader.read1 () no bloqueará la espera de / r / n, devuelve CUANTO ANTES si llega alguna salida.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), ''rb'', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == ''__main__'':
__main__()
Estoy usando el módulo de subproceso para iniciar un subproceso y conectarme a su flujo de salida (stdout). Quiero poder ejecutar lecturas sin bloqueo en su stdout. ¿Hay una manera de hacer que .readline no bloquee o para verificar si hay datos en la transmisión antes de invocar .readline
? Me gustaría que esto sea portátil o al menos funcione bajo Windows y Linux.
Así es como lo hago por ahora (está bloqueando en la línea de .readline
si no hay datos disponibles):
p = subprocess.Popen(''myprogram.exe'', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Agregando esta respuesta aquí, ya que proporciona la capacidad de establecer canalizaciones sin bloqueo en Windows y Unix.
Todos los detalles de los ctypes
son gracias a la respuesta de @techtonik .
Hay una versión ligeramente modificada para ser usada tanto en sistemas Unix como Windows.
- Compatible con Python3 (solo se necesitan cambios menores) .
- Incluye la versión de posix, y define la excepción a usar para cualquiera.
De esta manera puede usar la misma función y excepción para Unix y el código de Windows.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Para evitar leer datos incompletos, terminé escribiendo mi propio generador de línea de lectura (que devuelve la cadena de bytes para cada línea).
Es un generador para que puedas por ejemplo ...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'''' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''''
continue
while True:
n = data.find(b''/n'')
if n == -1:
break
yield b''''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Agrego este problema para leer algunos subprocess.Popen stdout. Aquí está mi solución de lectura no bloqueante:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
''test/n''
Aquí está mi código, que se usa para capturar todas las salidas del subproceso lo antes posible, incluidas las líneas parciales. Bombea al mismo tiempo y stdout y stderr en orden casi correcto.
Probado y correctamente trabajado en Python 2.7 Linux y Windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr/n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), ''rb'', closefd=False)
serr = io.open(p.stderr.fileno(), ''rb'', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, ''stdout'')
Pump(serr, ''stderr'')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan==''stdout'':
print "STDOUT>>", line, "<?<"
elif chan==''stderr'':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == ''__main__'':
__main__()
Descargo de responsabilidad: esto funciona sólo para tornado
Puede hacer esto configurando el fd para que no esté bloqueado y luego use ioloop para registrar devoluciones de llamada. He empaquetado esto en un huevo llamado tornado_subprocess y puedes instalarlo a través de PyPI:
easy_install tornado_subprocess
Ahora puedes hacer algo como esto:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
También puedes usarlo con un RequestHandler.
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
EDITAR: Esta implementación aún bloquea. Usa la answer de JFSebastian en su lugar.
Probé la answer , pero el riesgo adicional y el mantenimiento del código de subproceso eran preocupantes.
Mirando a través del módulo io (y estando limitado a 2.6), encontré BufferedReader. Esta es mi solución sin hilos, sin bloqueo.
import io
from subprocess import PIPE, Popen
p = Popen([''myprogram.exe''], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), ''rb'', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while ''/n'' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
El módulo de select ayuda a determinar dónde está la siguiente entrada útil.
Sin embargo, casi siempre eres más feliz con hilos separados. Uno hace un bloqueo, lee el stdin, otro hace lo que sea que no quieras bloquear.
Esta versión de lectura no bloqueante no requiere módulos especiales y funcionará de forma inmediata en la mayoría de las distribuciones de Linux.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == ''__main__'':
cmd = ''ping 8.8.8.8''
sout, serr = shell(cmd.split())
He creado una biblioteca basada en la solución de JF Sebastian . Puedes usarlo.
Las soluciones existentes no funcionaron para mí (detalles a continuación). Lo que finalmente funcionó fue implementar readline utilizando read (1) (basado en esta respuesta ). Este último no bloquea:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '''' and myprocess.poll() != None: break
if out != '''':
buf += out
if out == ''/n'':
nextline = buf
buf = ''''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print ''Line is:'', line
myprocess.stdout.close()
myprocess = Popen(''myprogram.exe'', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Por qué las soluciones existentes no funcionaron:
- Las soluciones que requieren readline (incluidas las basadas en cola) siempre se bloquean. Es difícil (¿imposible?) Matar el hilo que ejecuta readline. Solo se elimina cuando finaliza el proceso que lo creó, pero no cuando se elimina el proceso de producción de salida.
- La mezcla de fcntl de bajo nivel con las llamadas de readline de alto nivel puede no funcionar correctamente, como lo ha señalado anonnn.
- El uso de select.poll () está limpio, pero no funciona en Windows de acuerdo con los documentos de Python.
- El uso de bibliotecas de terceros parece excesivo para esta tarea y agrega dependencias adicionales.
Muchas veces he tenido un problema similar; Los programas de Python que escribo con frecuencia deben tener la capacidad de ejecutar alguna funcionalidad principal y, al mismo tiempo, aceptar la entrada del usuario desde la línea de comandos (stdin). El simple hecho de colocar la funcionalidad de manejo de entrada del usuario en otro hilo no resuelve el problema porque readline()
bloquea y no tiene tiempo de espera. Si la funcionalidad principal está completa y ya no hay necesidad de esperar por más comentarios del usuario, normalmente quiero que mi programa se cierre, pero no puedo porque readline()
todavía está bloqueando en el otro hilo que espera una línea. Una solución que he encontrado para este problema es hacer que stdin sea un archivo no bloqueante usando el módulo fcntl:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
En mi opinión, esto es un poco más limpio que usar los módulos de selección o señal para resolver este problema, pero nuevamente solo funciona en UNIX ...
Prueba el módulo asyncproc . Por ejemplo:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
El módulo se encarga de todos los subprocesos como lo sugiere S.Lott.
Puedes hacerlo muy fácilmente en Twisted . Dependiendo de su base de código existente, esto podría no ser tan fácil de usar, pero si está creando una aplicación torcida, las cosas como estas se vuelven casi triviales. Crea una clase ProcessProtocol
y reemplaza el método outReceived()
. Retorcido (dependiendo del reactor utilizado) generalmente es solo un gran bucle select()
con devoluciones de llamada instaladas para manejar datos de diferentes descriptores de archivos (a menudo sockets de red). Por lo tanto, el método outReceived()
es simplemente instalar una devolución de llamada para manejar los datos provenientes de STDOUT
. Un ejemplo simple que demuestra este comportamiento es el siguiente:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, ''./myprogram'', [''./myprogram'', ''arg1'', ''arg2'', ''arg3''])
reactor.run()
La documentación retorcida tiene buena información sobre esto.
Si creas toda tu aplicación alrededor de Twisted, hace que la comunicación asíncrona con otros procesos, locales o remotos, sea realmente elegante. Por otro lado, si tu programa no está construido sobre Twisted, esto no va a ser tan útil. Esperamos que esto pueda ser útil para otros lectores, incluso si no es aplicable para su aplicación en particular.
Python 3.4 introduce una nueva API provisional para el módulo asíncrono IO - asyncio
.
El enfoque es similar a la respuesta basada en trenzas de @Bryan Ward : defina un protocolo y sus métodos se llaman tan pronto como los datos estén listos:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == ''nt'':
loop = asyncio.ProactorEventLoop() # for subprocess'' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
Consulte "Subproceso" en los documentos .
Hay una interfaz de alto nivel asyncio.create_subprocess_exec()
que devuelve objetos de Process
que permite leer una línea de forma asincrónica utilizando StreamReader.readline()
coroutine (con async
/ await Python 3.5+ syntax ):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b''/n'') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
realiza las siguientes tareas:
- iniciar subproceso, redirigir su stdout a una tubería
- leer una línea del subproceso ''stdout asincrónicamente
- matar subproceso
- espera a que salga
Cada paso podría estar limitado por segundos de tiempo de espera si es necesario.
Recientemente me topé con el mismo problema que necesito para leer una línea a la vez desde el flujo (ejecución en subproceso) en el modo de no bloqueo. Quería evitar los siguientes problemas: no grabar la CPU, no leer el flujo de un byte ( como hizo readline), etc.
Aquí está mi implementación https://gist.github.com/grubberr/5501e1a9760c3eab5e0a no es compatible con Windows (encuesta), no maneja EOF, pero funciona bien para mí
Trabajando a partir de la respuesta de JF Sebastian, y de varias otras fuentes, reuní a un simple administrador de subproceso. Proporciona la lectura sin bloqueo de la solicitud, así como la ejecución de varios procesos en paralelo. No utiliza ninguna llamada específica del sistema operativo (que yo sepa) y, por lo tanto, debería funcionar en cualquier lugar.
Está disponible desde pypi, así que solo pip install shelljob
. Consulte la página del proyecto para ver ejemplos y documentos completos.
Una solución es hacer otro proceso para realizar su lectura del proceso, o hacer un hilo del proceso con un tiempo de espera.
Aquí está la versión roscada de una función de tiempo de espera:
http://code.activestate.com/recipes/473878/
Sin embargo, ¿necesitas leer la salida estándar a medida que va llegando? Otra solución puede ser volcar la salida en un archivo y esperar a que el proceso termine de usar p.wait () .
f = open(''myprogram_output.txt'',''w'')
p = subprocess.Popen(''myprogram.exe'', stdout=f)
p.wait()
f.close()
str = open(''myprogram_output.txt'',''r'').read()
Utilice seleccionar y leer (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen([''/bin/ls''], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Para readline () - como
lines = ['''']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split(''/n'')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split(''/n'')
for a in range(len(lines)-1):
print a
fcntl
, select
, asyncproc
no ayudará en este caso.
Una forma confiable de leer un flujo sin bloquear independientemente del sistema operativo es usar Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = ''posix'' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''''):
queue.put(line)
out.close()
p = Popen([''myprogram.exe''], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print(''no output yet'')
else: # got line
# ... do something with line
Aquí hay un módulo que admite lecturas sin bloqueo y escrituras en segundo plano en python:
https://pypi.python.org/pypi/python-nonblock
Proporciona una función,
nonblock_read que leerá los datos de la secuencia, si están disponibles, de lo contrario devolverá una cadena vacía (o Ninguno si la secuencia está cerrada en el otro lado y se han leído todos los datos posibles)
También puede considerar el módulo python-subprocess2,
https://pypi.python.org/pypi/python-subprocess2
que se suma al módulo de subproceso. Así que en el objeto devuelto de "subprocess.Popen" se agrega un método adicional, runInBackground. Esto inicia un hilo y devuelve un objeto que se rellenará automáticamente a medida que las cosas se escriben en stdout / stderr, sin bloquear su hilo principal.
¡Disfrutar!
En mi caso, necesitaba un módulo de registro que capte el resultado de las aplicaciones en segundo plano y lo aumente (agregando sellos de tiempo, colores, etc.).
Terminé con un hilo de fondo que hace la E / S real. El siguiente código es solo para plataformas POSIX. Me quité las partes no esenciales.
Si alguien va a usar esta bestia para carreras largas, considere administrar descriptores abiertos. En mi caso no fue un gran problema.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print ''epoll is not available''
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = ''/n''
else: nl = ''''
sys.stdout.write(''[%s] %s%s'' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log(''ls'')
ls.chat(''go'')
system("ls", [''-l'', ''/''], output=ls)
date = log(''date'')
date.chat(''go'')
system("date", output=date)
Mi problema es un poco diferente, ya que quería recopilar tanto stdout como stderr de un proceso en ejecución, pero en última instancia es el mismo, ya que quería generar la salida en un widget como se genera.
No quería recurrir a muchas de las soluciones propuestas mediante el uso de colas o subprocesos adicionales, ya que no deberían ser necesarios para realizar una tarea tan común como ejecutar otro script y recopilar su salida.
Después de leer las soluciones propuestas y los documentos de Python, resolví mi problema con la implementación a continuación. Sí, solo funciona para POSIX ya que estoy usando la select
función call.
Estoy de acuerdo en que los documentos son confusos y la implementación es incómoda para una tarea de scripting tan común. Creo que las versiones anteriores de python tienen diferentes valores predeterminados Popen
y diferentes explicaciones, por lo que crearon mucha confusión. Esto parece funcionar bien tanto para Python 2.7.12 como para 3.5.2.
La clave fue establecer el bufsize=1
búfer de línea y luego universal_newlines=True
procesarlo como un archivo de texto en lugar de un binario que parece ser el predeterminado cuando se configura bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG y VERBOSE son simplemente macros que imprimen la salida al terminal.
Esta solución es IMHO 99.99% efectiva ya que aún utiliza la readline
función de bloqueo , por lo que asumimos que el subproceso es bueno y genera líneas completas.
Doy la bienvenida a los comentarios para mejorar la solución, ya que todavía soy nuevo en Python.
Esta solución utiliza el select
módulo para "leer los datos disponibles" de una secuencia de IO. Esta función se bloquea inicialmente hasta que los datos estén disponibles, pero luego solo lee los datos que están disponibles y no bloquea más.
Dado que utiliza el select
módulo, esto solo funciona en Unix.
El código es totalmente compatible con PEP8.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using ''select''
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using ''select''
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Este es un ejemplo para ejecutar un comando interactivo en subproceso, y la salida estándar es interactiva mediante el uso de pseudo terminal. Puede consultar: https://.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = ''bash''
# command = ''docker run -it --rm centos /bin/bash''.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
También enfrenté el problema descrito por y lo resolví utilizando "seleccionar" como lo hicieron , y otros, pero en un modo de bloqueo para evitar un ciclo de ocupado. Utiliza un tubo ficticio como un falso stdin. Seleccione los bloques y espere a que esté lista la entrada estándar o la tubería. Cuando se presiona una tecla, stdin desbloquea la selección y el valor de la tecla se puede recuperar con lectura (1). Cuando un hilo diferente escribe en la tubería, la tubería desbloquea la selección y puede tomarse como una indicación de que la necesidad de la entrada estándar ha terminado. Aquí hay un código de referencia:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write('' '')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Tengo el problema del interrogador original, pero no deseo invocar hilos. Mezclé la solución de Jesse con una lectura directa () de la tubería y mi propio controlador de búfer para las lecturas de línea (sin embargo, mi subproceso, ping, siempre escribía líneas completas <un tamaño de página del sistema). Evito la espera ocupada solo leyendo en un reloj registrado en gobject. Estos días generalmente ejecuto código dentro de un objeto MainLoop para evitar subprocesos.
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen([''/bin/ping'', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
El observador es
def watch(f, *other):
print ''reading'',f.read()
return True
Y el programa principal configura un ping y luego llama a bucle de correo de objeto.
def main():
set_up_ping(''192.168.1.8'', watch)
# discard gid as unused here
gobject.MainLoop().run()
Cualquier otro trabajo se adjunta a devoluciones de llamada en gobject.