python io subprocess nonblocking

Lectura sin bloqueo en un subproceso. PIPE en python



io subprocess (26)

¿Por qué molestar hilo y cola? a diferencia de readline (), BufferedReader.read1 () no bloqueará la espera de / r / n, devuelve CUANTO ANTES si llega alguna salida.

#!/usr/bin/python from subprocess import Popen, PIPE, STDOUT import io def __main__(): try: p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT ) except: print("Popen failed"); quit() sout = io.open(p.stdout.fileno(), ''rb'', closefd=False) while True: buf = sout.read1(1024) if len(buf) == 0: break print buf, if __name__ == ''__main__'': __main__()

Estoy usando el módulo de subproceso para iniciar un subproceso y conectarme a su flujo de salida (stdout). Quiero poder ejecutar lecturas sin bloqueo en su stdout. ¿Hay una manera de hacer que .readline no bloquee o para verificar si hay datos en la transmisión antes de invocar .readline ? Me gustaría que esto sea portátil o al menos funcione bajo Windows y Linux.

Así es como lo hago por ahora (está bloqueando en la línea de .readline si no hay datos disponibles):

p = subprocess.Popen(''myprogram.exe'', stdout = subprocess.PIPE) output_str = p.stdout.readline()


Agregando esta respuesta aquí, ya que proporciona la capacidad de establecer canalizaciones sin bloqueo en Windows y Unix.

Todos los detalles de los ctypes son gracias a la respuesta de @techtonik .

Hay una versión ligeramente modificada para ser usada tanto en sistemas Unix como Windows.

  • Compatible con Python3 (solo se necesitan cambios menores) .
  • Incluye la versión de posix, y define la excepción a usar para cualquiera.

De esta manera puede usar la misma función y excepción para Unix y el código de Windows.

# pipe_non_blocking.py (module) """ Example use: p = subprocess.Popen( command, stdout=subprocess.PIPE, ) pipe_non_blocking_set(p.stdout.fileno()) try: data = os.read(p.stdout.fileno(), 1) except PortableBlockingIOError as ex: if not pipe_non_blocking_is_error_blocking(ex): raise ex """ __all__ = ( "pipe_non_blocking_set", "pipe_non_blocking_is_error_blocking", "PortableBlockingIOError", ) import os if os.name == "nt": def pipe_non_blocking_set(fd): # Constant could define globally but avoid polluting the name-space # thanks to: https://.com/questions/34504970 import msvcrt from ctypes import windll, byref, wintypes, WinError, POINTER from ctypes.wintypes import HANDLE, DWORD, BOOL LPDWORD = POINTER(DWORD) PIPE_NOWAIT = wintypes.DWORD(0x00000001) def pipe_no_wait(pipefd): SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD] SetNamedPipeHandleState.restype = BOOL h = msvcrt.get_osfhandle(pipefd) res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None) if res == 0: print(WinError()) return False return True return pipe_no_wait(fd) def pipe_non_blocking_is_error_blocking(ex): if not isinstance(ex, PortableBlockingIOError): return False from ctypes import GetLastError ERROR_NO_DATA = 232 return (GetLastError() == ERROR_NO_DATA) PortableBlockingIOError = OSError else: def pipe_non_blocking_set(fd): import fcntl fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) return True def pipe_non_blocking_is_error_blocking(ex): if not isinstance(ex, PortableBlockingIOError): return False return True PortableBlockingIOError = BlockingIOError

Para evitar leer datos incompletos, terminé escribiendo mi propio generador de línea de lectura (que devuelve la cadena de bytes para cada línea).

Es un generador para que puedas por ejemplo ...

def non_blocking_readlines(f, chunk=1024): """ Iterate over lines, yielding b'''' when nothings left or when new data is not yet available. stdout_iter = iter(non_blocking_readlines(process.stdout)) line = next(stdout_iter) # will be a line or b''''. """ import os from .pipe_non_blocking import ( pipe_non_blocking_set, pipe_non_blocking_is_error_blocking, PortableBlockingIOError, ) fd = f.fileno() pipe_non_blocking_set(fd) blocks = [] while True: try: data = os.read(fd, chunk) if not data: # case were reading finishes with no trailing newline yield b''''.join(blocks) blocks.clear() except PortableBlockingIOError as ex: if not pipe_non_blocking_is_error_blocking(ex): raise ex yield b'''' continue while True: n = data.find(b''/n'') if n == -1: break yield b''''.join(blocks) + data[:n + 1] data = data[n + 1:] blocks.clear() blocks.append(data)


Agrego este problema para leer algunos subprocess.Popen stdout. Aquí está mi solución de lectura no bloqueante:

import fcntl def non_block_read(output): fd = output.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: return output.read() except: return "" # Use example from subprocess import * sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE) sb.kill() # sb.stdout.read() # <-- This will block non_block_read(sb.stdout) ''test/n''


Aquí está mi código, que se usa para capturar todas las salidas del subproceso lo antes posible, incluidas las líneas parciales. Bombea al mismo tiempo y stdout y stderr en orden casi correcto.

Probado y correctamente trabajado en Python 2.7 Linux y Windows.

#!/usr/bin/python # # Runner with stdout/stderr catcher # from sys import argv from subprocess import Popen, PIPE import os, io from threading import Thread import Queue def __main__(): if (len(argv) > 1) and (argv[-1] == "-sub-"): import time, sys print "Application runned!" time.sleep(2) print "Slept 2 second" time.sleep(1) print "Slept 1 additional second", time.sleep(2) sys.stderr.write("Stderr output after 5 seconds") print "Eol on stdin" sys.stderr.write("Eol on stderr/n") time.sleep(1) print "Wow, we have end of work!", else: os.environ["PYTHONUNBUFFERED"]="1" try: p = Popen( argv + ["-sub-"], bufsize=0, # line-buffered stdin=PIPE, stdout=PIPE, stderr=PIPE ) except WindowsError, W: if W.winerror==193: p = Popen( argv + ["-sub-"], shell=True, # Try to run via shell bufsize=0, # line-buffered stdin=PIPE, stdout=PIPE, stderr=PIPE ) else: raise inp = Queue.Queue() sout = io.open(p.stdout.fileno(), ''rb'', closefd=False) serr = io.open(p.stderr.fileno(), ''rb'', closefd=False) def Pump(stream, category): queue = Queue.Queue() def rdr(): while True: buf = stream.read1(8192) if len(buf)>0: queue.put( buf ) else: queue.put( None ) return def clct(): active = True while active: r = queue.get() try: while True: r1 = queue.get(timeout=0.005) if r1 is None: active = False break else: r += r1 except Queue.Empty: pass inp.put( (category, r) ) for tgt in [rdr, clct]: th = Thread(target=tgt) th.setDaemon(True) th.start() Pump(sout, ''stdout'') Pump(serr, ''stderr'') while p.poll() is None: # App still working try: chan,line = inp.get(timeout = 1.0) if chan==''stdout'': print "STDOUT>>", line, "<?<" elif chan==''stderr'': print " ERROR==", line, "=?=" except Queue.Empty: pass print "Finish" if __name__ == ''__main__'': __main__()


Descargo de responsabilidad: esto funciona sólo para tornado

Puede hacer esto configurando el fd para que no esté bloqueado y luego use ioloop para registrar devoluciones de llamada. He empaquetado esto en un huevo llamado tornado_subprocess y puedes instalarlo a través de PyPI:

easy_install tornado_subprocess

Ahora puedes hacer algo como esto:

import tornado_subprocess import tornado.ioloop def print_res( status, stdout, stderr ) : print status, stdout, stderr if status == 0: print "OK:" print stdout else: print "ERROR:" print stderr t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] ) t.start() tornado.ioloop.IOLoop.instance().start()

También puedes usarlo con un RequestHandler.

class MyHandler(tornado.web.RequestHandler): def on_done(self, status, stdout, stderr): self.write( stdout ) self.finish() @tornado.web.asynchronous def get(self): t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] ) t.start()


EDITAR: Esta implementación aún bloquea. Usa la answer de JFSebastian en su lugar.

Probé la answer , pero el riesgo adicional y el mantenimiento del código de subproceso eran preocupantes.

Mirando a través del módulo io (y estando limitado a 2.6), encontré BufferedReader. Esta es mi solución sin hilos, sin bloqueo.

import io from subprocess import PIPE, Popen p = Popen([''myprogram.exe''], stdout=PIPE) SLEEP_DELAY = 0.001 # Create an io.BufferedReader on the file descriptor for stdout with io.open(p.stdout.fileno(), ''rb'', closefd=False) as buffer: while p.poll() == None: time.sleep(SLEEP_DELAY) while ''/n'' in bufferedStdout.peek(bufferedStdout.buffer_size): line = buffer.readline() # do stuff with the line # Handle any remaining output after the process has ended while buffer.peek(): line = buffer.readline() # do stuff with the line


El módulo de select ayuda a determinar dónde está la siguiente entrada útil.

Sin embargo, casi siempre eres más feliz con hilos separados. Uno hace un bloqueo, lee el stdin, otro hace lo que sea que no quieras bloquear.


Esta versión de lectura no bloqueante no requiere módulos especiales y funcionará de forma inmediata en la mayoría de las distribuciones de Linux.

import os import sys import time import fcntl import subprocess def async_read(fd): # set non-blocking flag while preserving old flags fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # read char until EOF hit while True: try: ch = os.read(fd.fileno(), 1) # EOF if not ch: break sys.stdout.write(ch) except OSError: # waiting for data be available on fd pass def shell(args, async=True): # merge stderr and stdout proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if async: async_read(proc.stdout) sout, serr = proc.communicate() return (sout, serr) if __name__ == ''__main__'': cmd = ''ping 8.8.8.8'' sout, serr = shell(cmd.split())



Las soluciones existentes no funcionaron para mí (detalles a continuación). Lo que finalmente funcionó fue implementar readline utilizando read (1) (basado en esta respuesta ). Este último no bloquea:

from subprocess import Popen, PIPE from threading import Thread def process_output(myprocess): #output-consuming thread nextline = None buf = '''' while True: #--- extract line using read(1) out = myprocess.stdout.read(1) if out == '''' and myprocess.poll() != None: break if out != '''': buf += out if out == ''/n'': nextline = buf buf = '''' if not nextline: continue line = nextline nextline = None #--- do whatever you want with line here print ''Line is:'', line myprocess.stdout.close() myprocess = Popen(''myprogram.exe'', stdout=PIPE) #output-producing process p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread p1.daemon = True p1.start() #--- do whatever here and then kill process and thread if needed if myprocess.poll() == None: #kill process; will automatically stop thread myprocess.kill() myprocess.wait() if p1 and p1.is_alive(): #wait for thread to finish p1.join()

Por qué las soluciones existentes no funcionaron:

  1. Las soluciones que requieren readline (incluidas las basadas en cola) siempre se bloquean. Es difícil (¿imposible?) Matar el hilo que ejecuta readline. Solo se elimina cuando finaliza el proceso que lo creó, pero no cuando se elimina el proceso de producción de salida.
  2. La mezcla de fcntl de bajo nivel con las llamadas de readline de alto nivel puede no funcionar correctamente, como lo ha señalado anonnn.
  3. El uso de select.poll () está limpio, pero no funciona en Windows de acuerdo con los documentos de Python.
  4. El uso de bibliotecas de terceros parece excesivo para esta tarea y agrega dependencias adicionales.

Muchas veces he tenido un problema similar; Los programas de Python que escribo con frecuencia deben tener la capacidad de ejecutar alguna funcionalidad principal y, al mismo tiempo, aceptar la entrada del usuario desde la línea de comandos (stdin). El simple hecho de colocar la funcionalidad de manejo de entrada del usuario en otro hilo no resuelve el problema porque readline() bloquea y no tiene tiempo de espera. Si la funcionalidad principal está completa y ya no hay necesidad de esperar por más comentarios del usuario, normalmente quiero que mi programa se cierre, pero no puedo porque readline() todavía está bloqueando en el otro hilo que espera una línea. Una solución que he encontrado para este problema es hacer que stdin sea un archivo no bloqueante usando el módulo fcntl:

import fcntl import os import sys # make stdin a non-blocking file fd = sys.stdin.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # user input handling thread while mainThreadIsRunning: try: input = sys.stdin.readline() except: continue handleInput(input)

En mi opinión, esto es un poco más limpio que usar los módulos de selección o señal para resolver este problema, pero nuevamente solo funciona en UNIX ...


Prueba el módulo asyncproc . Por ejemplo:

import os from asyncproc import Process myProc = Process("myprogram.app") while True: # check to see if process has ended poll = myProc.wait(os.WNOHANG) if poll != None: break # print any new output out = myProc.read() if out != "": print out

El módulo se encarga de todos los subprocesos como lo sugiere S.Lott.


Puedes hacerlo muy fácilmente en Twisted . Dependiendo de su base de código existente, esto podría no ser tan fácil de usar, pero si está creando una aplicación torcida, las cosas como estas se vuelven casi triviales. Crea una clase ProcessProtocol y reemplaza el método outReceived() . Retorcido (dependiendo del reactor utilizado) generalmente es solo un gran bucle select() con devoluciones de llamada instaladas para manejar datos de diferentes descriptores de archivos (a menudo sockets de red). Por lo tanto, el método outReceived() es simplemente instalar una devolución de llamada para manejar los datos provenientes de STDOUT . Un ejemplo simple que demuestra este comportamiento es el siguiente:

from twisted.internet import protocol, reactor class MyProcessProtocol(protocol.ProcessProtocol): def outReceived(self, data): print data proc = MyProcessProtocol() reactor.spawnProcess(proc, ''./myprogram'', [''./myprogram'', ''arg1'', ''arg2'', ''arg3'']) reactor.run()

La documentación retorcida tiene buena información sobre esto.

Si creas toda tu aplicación alrededor de Twisted, hace que la comunicación asíncrona con otros procesos, locales o remotos, sea realmente elegante. Por otro lado, si tu programa no está construido sobre Twisted, esto no va a ser tan útil. Esperamos que esto pueda ser útil para otros lectores, incluso si no es aplicable para su aplicación en particular.


Python 3.4 introduce una nueva API provisional para el módulo asíncrono IO - asyncio .

El enfoque es similar a la respuesta basada en trenzas de @Bryan Ward : defina un protocolo y sus métodos se llaman tan pronto como los datos estén listos:

#!/usr/bin/env python3 import asyncio import os class SubprocessProtocol(asyncio.SubprocessProtocol): def pipe_data_received(self, fd, data): if fd == 1: # got stdout data (bytes) print(data) def connection_lost(self, exc): loop.stop() # end loop.run_forever() if os.name == ''nt'': loop = asyncio.ProactorEventLoop() # for subprocess'' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() try: loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, "myprogram.exe", "arg1", "arg2")) loop.run_forever() finally: loop.close()

Consulte "Subproceso" en los documentos .

Hay una interfaz de alto nivel asyncio.create_subprocess_exec() que devuelve objetos de Process que permite leer una línea de forma asincrónica utilizando StreamReader.readline() coroutine (con async / await Python 3.5+ syntax ):

#!/usr/bin/env python3.5 import asyncio import locale import sys from asyncio.subprocess import PIPE from contextlib import closing async def readline_and_kill(*args): # start child process process = await asyncio.create_subprocess_exec(*args, stdout=PIPE) # read line (sequence of bytes ending with b''/n'') asynchronously async for line in process.stdout: print("got line:", line.decode(locale.getpreferredencoding(False))) break process.kill() return await process.wait() # wait for the child process to exit if sys.platform == "win32": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() with closing(loop): sys.exit(loop.run_until_complete(readline_and_kill( "myprogram.exe", "arg1", "arg2")))

readline_and_kill() realiza las siguientes tareas:

  • iniciar subproceso, redirigir su stdout a una tubería
  • leer una línea del subproceso ''stdout asincrónicamente
  • matar subproceso
  • espera a que salga

Cada paso podría estar limitado por segundos de tiempo de espera si es necesario.


Recientemente me topé con el mismo problema que necesito para leer una línea a la vez desde el flujo (ejecución en subproceso) en el modo de no bloqueo. Quería evitar los siguientes problemas: no grabar la CPU, no leer el flujo de un byte ( como hizo readline), etc.

Aquí está mi implementación https://gist.github.com/grubberr/5501e1a9760c3eab5e0a no es compatible con Windows (encuesta), no maneja EOF, pero funciona bien para mí


Trabajando a partir de la respuesta de JF Sebastian, y de varias otras fuentes, reuní a un simple administrador de subproceso. Proporciona la lectura sin bloqueo de la solicitud, así como la ejecución de varios procesos en paralelo. No utiliza ninguna llamada específica del sistema operativo (que yo sepa) y, por lo tanto, debería funcionar en cualquier lugar.

Está disponible desde pypi, así que solo pip install shelljob . Consulte la página del proyecto para ver ejemplos y documentos completos.


Una solución es hacer otro proceso para realizar su lectura del proceso, o hacer un hilo del proceso con un tiempo de espera.

Aquí está la versión roscada de una función de tiempo de espera:

http://code.activestate.com/recipes/473878/

Sin embargo, ¿necesitas leer la salida estándar a medida que va llegando? Otra solución puede ser volcar la salida en un archivo y esperar a que el proceso termine de usar p.wait () .

f = open(''myprogram_output.txt'',''w'') p = subprocess.Popen(''myprogram.exe'', stdout=f) p.wait() f.close() str = open(''myprogram_output.txt'',''r'').read()


Utilice seleccionar y leer (1).

import subprocess #no new requirements def readAllSoFar(proc, retVal=''''): while (select.select([proc.stdout],[],[],0)[0]!=[]): retVal+=proc.stdout.read(1) return retVal p = subprocess.Popen([''/bin/ls''], stdout=subprocess.PIPE) while not p.poll(): print (readAllSoFar(p))

Para readline () - como

lines = [''''] while not p.poll(): lines = readAllSoFar(p, lines[-1]).split(''/n'') for a in range(len(lines)-1): print a lines = readAllSoFar(p, lines[-1]).split(''/n'') for a in range(len(lines)-1): print a


fcntl , select , asyncproc no ayudará en este caso.

Una forma confiable de leer un flujo sin bloquear independientemente del sistema operativo es usar Queue.get_nowait() :

import sys from subprocess import PIPE, Popen from threading import Thread try: from queue import Queue, Empty except ImportError: from Queue import Queue, Empty # python 2.x ON_POSIX = ''posix'' in sys.builtin_module_names def enqueue_output(out, queue): for line in iter(out.readline, b''''): queue.put(line) out.close() p = Popen([''myprogram.exe''], stdout=PIPE, bufsize=1, close_fds=ON_POSIX) q = Queue() t = Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True # thread dies with the program t.start() # ... do other things here # read line without blocking try: line = q.get_nowait() # or q.get(timeout=.1) except Empty: print(''no output yet'') else: # got line # ... do something with line


Aquí hay un módulo que admite lecturas sin bloqueo y escrituras en segundo plano en python:

https://pypi.python.org/pypi/python-nonblock

Proporciona una función,

nonblock_read que leerá los datos de la secuencia, si están disponibles, de lo contrario devolverá una cadena vacía (o Ninguno si la secuencia está cerrada en el otro lado y se han leído todos los datos posibles)

También puede considerar el módulo python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

que se suma al módulo de subproceso. Así que en el objeto devuelto de "subprocess.Popen" se agrega un método adicional, runInBackground. Esto inicia un hilo y devuelve un objeto que se rellenará automáticamente a medida que las cosas se escriben en stdout / stderr, sin bloquear su hilo principal.

¡Disfrutar!


En mi caso, necesitaba un módulo de registro que capte el resultado de las aplicaciones en segundo plano y lo aumente (agregando sellos de tiempo, colores, etc.).

Terminé con un hilo de fondo que hace la E / S real. El siguiente código es solo para plataformas POSIX. Me quité las partes no esenciales.

Si alguien va a usar esta bestia para carreras largas, considere administrar descriptores abiertos. En mi caso no fue un gran problema.

# -*- python -*- import fcntl import threading import sys, os, errno import subprocess class Logger(threading.Thread): def __init__(self, *modules): threading.Thread.__init__(self) try: from select import epoll, EPOLLIN self.__poll = epoll() self.__evt = EPOLLIN self.__to = -1 except: from select import poll, POLLIN print ''epoll is not available'' self.__poll = poll() self.__evt = POLLIN self.__to = 100 self.__fds = {} self.daemon = True self.start() def run(self): while True: events = self.__poll.poll(self.__to) for fd, ev in events: if (ev&self.__evt) != self.__evt: continue try: self.__fds[fd].run() except Exception, e: print e def add(self, fd, log): assert not self.__fds.has_key(fd) self.__fds[fd] = log self.__poll.register(fd, self.__evt) class log: logger = Logger() def __init__(self, name): self.__name = name self.__piped = False def fileno(self): if self.__piped: return self.write self.read, self.write = os.pipe() fl = fcntl.fcntl(self.read, fcntl.F_GETFL) fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK) self.fdRead = os.fdopen(self.read) self.logger.add(self.read, self) self.__piped = True return self.write def __run(self, line): self.chat(line, nl=False) def run(self): while True: try: line = self.fdRead.readline() except IOError, exc: if exc.errno == errno.EAGAIN: return raise self.__run(line) def chat(self, line, nl=True): if nl: nl = ''/n'' else: nl = '''' sys.stdout.write(''[%s] %s%s'' % (self.__name, line, nl)) def system(command, param=[], cwd=None, env=None, input=None, output=None): args = [command] + param p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0) p.wait() ls = log(''ls'') ls.chat(''go'') system("ls", [''-l'', ''/''], output=ls) date = log(''date'') date.chat(''go'') system("date", output=date)


Mi problema es un poco diferente, ya que quería recopilar tanto stdout como stderr de un proceso en ejecución, pero en última instancia es el mismo, ya que quería generar la salida en un widget como se genera.

No quería recurrir a muchas de las soluciones propuestas mediante el uso de colas o subprocesos adicionales, ya que no deberían ser necesarios para realizar una tarea tan común como ejecutar otro script y recopilar su salida.

Después de leer las soluciones propuestas y los documentos de Python, resolví mi problema con la implementación a continuación. Sí, solo funciona para POSIX ya que estoy usando la selectfunción call.

Estoy de acuerdo en que los documentos son confusos y la implementación es incómoda para una tarea de scripting tan común. Creo que las versiones anteriores de python tienen diferentes valores predeterminados Popeny diferentes explicaciones, por lo que crearon mucha confusión. Esto parece funcionar bien tanto para Python 2.7.12 como para 3.5.2.

La clave fue establecer el bufsize=1búfer de línea y luego universal_newlines=Trueprocesarlo como un archivo de texto en lugar de un binario que parece ser el predeterminado cuando se configura bufsize=1.

class workerThread(QThread): def __init__(self, cmd): QThread.__init__(self) self.cmd = cmd self.result = None ## return code self.error = None ## flag indicates an error self.errorstr = "" ## info message about the error def __del__(self): self.wait() DEBUG("Thread removed") def run(self): cmd_list = self.cmd.split(" ") try: cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None , universal_newlines=True , stderr=subprocess.PIPE , stdout=subprocess.PIPE) except OSError: self.error = 1 self.errorstr = "Failed to execute " + self.cmd ERROR(self.errorstr) finally: VERBOSE("task started...") import select while True: try: r,w,x = select.select([cmd.stdout, cmd.stderr],[],[]) if cmd.stderr in r: line = cmd.stderr.readline() if line != "": line = line.strip() self.emit(SIGNAL("update_error(QString)"), line) if cmd.stdout in r: line = cmd.stdout.readline() if line == "": break line = line.strip() self.emit(SIGNAL("update_output(QString)"), line) except IOError: pass cmd.wait() self.result = cmd.returncode if self.result < 0: self.error = 1 self.errorstr = "Task terminated by signal " + str(self.result) ERROR(self.errorstr) return if self.result: self.error = 1 self.errorstr = "exit code " + str(self.result) ERROR(self.errorstr) return return

ERROR, DEBUG y VERBOSE son simplemente macros que imprimen la salida al terminal.

Esta solución es IMHO 99.99% efectiva ya que aún utiliza la readlinefunción de bloqueo , por lo que asumimos que el subproceso es bueno y genera líneas completas.

Doy la bienvenida a los comentarios para mejorar la solución, ya que todavía soy nuevo en Python.


Esta solución utiliza el selectmódulo para "leer los datos disponibles" de una secuencia de IO. Esta función se bloquea inicialmente hasta que los datos estén disponibles, pero luego solo lee los datos que están disponibles y no bloquea más.

Dado que utiliza el selectmódulo, esto solo funciona en Unix.

El código es totalmente compatible con PEP8.

import select def read_available(input_stream, max_bytes=None): """ Blocks until any data is available, then all available data is then read and returned. This function returns an empty string when end of stream is reached. Args: input_stream: The stream to read from. max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this. Returns: str """ # Prepare local variables input_streams = [input_stream] empty_list = [] read_buffer = "" # Initially block for input using ''select'' if len(select.select(input_streams, empty_list, empty_list)[0]) > 0: # Poll read-readiness using ''select'' def select_func(): return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0 # Create while function based on parameters if max_bytes is not None: def while_func(): return (len(read_buffer) < max_bytes) and select_func() else: while_func = select_func while True: # Read single byte at a time read_data = input_stream.read(1) if len(read_data) == 0: # End of stream break # Append byte to string buffer read_buffer += read_data # Check if more data is available if not while_func(): break # Return read buffer return read_buffer


Este es un ejemplo para ejecutar un comando interactivo en subproceso, y la salida estándar es interactiva mediante el uso de pseudo terminal. Puede consultar: https://.com/a/43012138/3555925

#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import select import termios import tty import pty from subprocess import Popen command = ''bash'' # command = ''docker run -it --rm centos /bin/bash''.split() # save original tty setting then set it to raw mode old_tty = termios.tcgetattr(sys.stdin) tty.setraw(sys.stdin.fileno()) # open pseudo-terminal to interact with subprocess master_fd, slave_fd = pty.openpty() # use os.setsid() make it run in a new process group, or bash job control will not be enabled p = Popen(command, preexec_fn=os.setsid, stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, universal_newlines=True) while p.poll() is None: r, w, e = select.select([sys.stdin, master_fd], [], []) if sys.stdin in r: d = os.read(sys.stdin.fileno(), 10240) os.write(master_fd, d) elif master_fd in r: o = os.read(master_fd, 10240) if o: os.write(sys.stdout.fileno(), o) # restore tty settings back termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)


También enfrenté el problema descrito por y lo resolví utilizando "seleccionar" como lo hicieron , y otros, pero en un modo de bloqueo para evitar un ciclo de ocupado. Utiliza un tubo ficticio como un falso stdin. Seleccione los bloques y espere a que esté lista la entrada estándar o la tubería. Cuando se presiona una tecla, stdin desbloquea la selección y el valor de la tecla se puede recuperar con lectura (1). Cuando un hilo diferente escribe en la tubería, la tubería desbloquea la selección y puede tomarse como una indicación de que la necesidad de la entrada estándar ha terminado. Aquí hay un código de referencia:

import sys import os from select import select # ------------------------------------------------------------------------- # Set the pipe (fake stdin) to simulate a final key stroke # which will unblock the select statement readEnd, writeEnd = os.pipe() readFile = os.fdopen(readEnd) writeFile = os.fdopen(writeEnd, "w") # ------------------------------------------------------------------------- def getKey(): # Wait for stdin or pipe (fake stdin) to be ready dr,dw,de = select([sys.__stdin__, readFile], [], []) # If stdin is the one ready then read it and return value if sys.__stdin__ in dr: return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt # Must finish else: return None # ------------------------------------------------------------------------- def breakStdinRead(): writeFile.write('' '') writeFile.flush() # ------------------------------------------------------------------------- # MAIN CODE # Get key stroke key = getKey() # Keyboard input if key: # ... do your stuff with the key value # Faked keystroke else: # ... use of stdin finished # ------------------------------------------------------------------------- # OTHER THREAD CODE breakStdinRead()


Tengo el problema del interrogador original, pero no deseo invocar hilos. Mezclé la solución de Jesse con una lectura directa () de la tubería y mi propio controlador de búfer para las lecturas de línea (sin embargo, mi subproceso, ping, siempre escribía líneas completas <un tamaño de página del sistema). Evito la espera ocupada solo leyendo en un reloj registrado en gobject. Estos días generalmente ejecuto código dentro de un objeto MainLoop para evitar subprocesos.

def set_up_ping(ip, w): # run the sub-process # watch the resultant pipe p = subprocess.Popen([''/bin/ping'', ip], stdout=subprocess.PIPE) # make stdout a non-blocking file fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL) fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w) return stdout_gid # for shutting down

El observador es

def watch(f, *other): print ''reading'',f.read() return True

Y el programa principal configura un ping y luego llama a bucle de correo de objeto.

def main(): set_up_ping(''192.168.1.8'', watch) # discard gid as unused here gobject.MainLoop().run()

Cualquier otro trabajo se adjunta a devoluciones de llamada en gobject.