python multithreading timeout subprocess

subplot python title



Usando el módulo ''subproceso'' con timeout (29)

Aquí está la solución de Alex Martelli como un módulo con el proceso correcto de matar. Los otros enfoques no funcionan porque no usan proc.communicate (). Entonces, si tiene un proceso que produce mucha salida, llenará su búfer de salida y luego se bloqueará hasta que lea algo de él.

from os import kill from signal import alarm, signal, SIGALRM, SIGKILL from subprocess import PIPE, Popen def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None): '''''' Run a command with a timeout after which it will be forcibly killed. '''''' class Alarm(Exception): pass def alarm_handler(signum, frame): raise Alarm p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env) if timeout != -1: signal(SIGALRM, alarm_handler) alarm(timeout) try: stdout, stderr = p.communicate() if timeout != -1: alarm(0) except Alarm: pids = [p.pid] if kill_tree: pids.extend(get_process_children(p.pid)) for pid in pids: # process might have died before getting to this line # so wrap to avoid OSError: no such process try: kill(pid, SIGKILL) except OSError: pass return -9, '''', '''' return p.returncode, stdout, stderr def get_process_children(pid): p = Popen(''ps --no-headers -o pid --ppid %d'' % pid, shell = True, stdout = PIPE, stderr = PIPE) stdout, stderr = p.communicate() return [int(p) for p in stdout.split()] if __name__ == ''__main__'': print run(''find /'', shell = True, timeout = 3) print run(''find'', shell = True)

Aquí está el código de Python para ejecutar un comando arbitrario que devuelve sus datos de salida stdout , o generar una excepción en los códigos de salida que no sean cero:

proc = subprocess.Popen( cmd, stderr=subprocess.STDOUT, # Merge stdout and stderr stdout=subprocess.PIPE, shell=True)

Se usa communicate para esperar a que salga el proceso:

stdoutdata, stderrdata = proc.communicate()

El módulo de subprocess no admite el tiempo de espera (capacidad para detener un proceso que se ejecuta durante más de X segundos), por lo tanto, la communicate puede tardar una eternidad en ejecutarse.

¿Cuál es la forma más sencilla de implementar los tiempos de espera en un programa de Python destinado a ejecutarse en Windows y Linux?


Aquí está mi solución, estaba usando Thread y Evento:

import subprocess from threading import Thread, Event def kill_on_timeout(done, timeout, proc): if not done.wait(timeout): proc.kill() def exec_command(command, timeout): done = Event() proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc)) watcher.daemon = True watcher.start() data, stderr = proc.communicate() done.set() return data, stderr, proc.returncode

En acción:

In [2]: exec_command([''sleep'', ''10''], 5) Out[2]: ('''', '''', -9) In [3]: exec_command([''sleep'', ''10''], 11) Out[3]: ('''', '''', 0)


Aunque no lo he visto mucho, este decorator que encontré en ActiveState parece ser muy útil para este tipo de cosas. Junto con subprocess.Popen(..., close_fds=True) , al menos estoy listo para shell-scripting en Python.


Desafortunadamente, estoy sujeto a políticas muy estrictas sobre la divulgación del código fuente por parte de mi empleador, por lo que no puedo proporcionar el código real. Pero para mi gusto, la mejor solución es crear una subclase que Popen.wait() para sondear en lugar de esperar indefinidamente, y Popen.__init__ para aceptar un parámetro de tiempo de espera. Una vez que haga eso, todos los otros métodos de Popen (que llaman a la wait ) funcionarán como se espera, incluyendo la communicate .


En Python 3.3+:

from subprocess import STDOUT, check_output output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output es una cadena de bytes que contiene datos stdout combinados, stderr del comando.

Este código genera CalledProcessError en un estado de salida distinto de cero como se especifica en el texto de la pregunta, a diferencia del método proc.communicate() .

He eliminado shell=True porque a menudo se usa innecesariamente. Siempre puedes volver a agregarlo si cmd lo requiere. Si agrega shell=True , es decir, si el proceso hijo genera sus propios descendientes; check_output() puede devolver mucho más tarde de lo que indica el tiempo de espera, consulte Fallo de tiempo de espera de subproceso .

La función de tiempo de espera está disponible en Python 2.x a través del backport subprocess32 del módulo de subproceso 3.2+.


Esta solución elimina el árbol de procesos en caso de que shell = True, pase parámetros al proceso (o no), tenga un tiempo de espera y obtenga la salida estándar, stderr y proceso de la devolución de llamada (usa psutil para kill_proc_tree). Esto se basó en varias soluciones publicadas en SO incluyendo jcollado''s. Publicación en respuesta a los comentarios de Anson y jradice en la respuesta de jcollado. Probado en Windows Srvr 2012 y Ubuntu 14.04. Tenga en cuenta que para Ubuntu necesita cambiar la llamada parent.children (...) a parent.get_children (...).

def kill_proc_tree(pid, including_parent=True): parent = psutil.Process(pid) children = parent.children(recursive=True) for child in children: child.kill() psutil.wait_procs(children, timeout=5) if including_parent: parent.kill() parent.wait(5) def run_with_timeout(cmd, current_dir, cmd_parms, timeout): def target(): process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) # wait for the process to terminate if (cmd_parms == ""): out, err = process.communicate() else: out, err = process.communicate(cmd_parms) errcode = process.returncode thread = Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): me = os.getpid() kill_proc_tree(me, including_parent=False) thread.join()


Estaba tratando de escribir algo más simple.

#!/usr/bin/python from subprocess import Popen, PIPE import datetime import time popen = Popen(["/bin/sleep", "10"]); pid = popen.pid sttime = time.time(); waittime = 3 print "Start time %s"%(sttime) while True: popen.poll(); time.sleep(1) rcode = popen.returncode now = time.time(); if [ rcode is None ] and [ now > (sttime + waittime) ] : print "Killing it now" popen.kill()


Hay una idea para subclasificar la clase Popen y extenderla con algunos decoradores de métodos simples. Llamémoslo ExpirablePopen.

from logging import error from subprocess import Popen from threading import Event from threading import Thread class ExpirablePopen(Popen): def __init__(self, *args, **kwargs): self.timeout = kwargs.pop(''timeout'', 0) self.timer = None self.done = Event() Popen.__init__(self, *args, **kwargs) def __tkill(self): timeout = self.timeout if not self.done.wait(timeout): error(''Terminating process {} by timeout of {} secs.''.format(self.pid, timeout)) self.kill() def expirable(func): def wrapper(self, *args, **kwargs): # zero timeout means call of parent method if self.timeout == 0: return func(self, *args, **kwargs) # if timer is None, need to start it if self.timer is None: self.timer = thr = Thread(target=self.__tkill) thr.daemon = True thr.start() result = func(self, *args, **kwargs) self.done.set() return result return wrapper wait = expirable(Popen.wait) communicate = expirable(Popen.communicate) if __name__ == ''__main__'': from subprocess import PIPE print ExpirablePopen(''ssh -T [email protected]'', stdout=PIPE, timeout=1).communicate()


He implementado lo que podría reunir de algunos de estos. Esto funciona en Windows, y como se trata de una wiki de la comunidad, creo que también compartiría mi código:

class Command(threading.Thread): def __init__(self, cmd, outFile, errFile, timeout): threading.Thread.__init__(self) self.cmd = cmd self.process = None self.outFile = outFile self.errFile = errFile self.timed_out = False self.timeout = timeout def run(self): self.process = subprocess.Popen(self.cmd, stdout = self.outFile, / stderr = self.errFile) while (self.process.poll() is None and self.timeout > 0): time.sleep(1) self.timeout -= 1 if not self.timeout > 0: self.process.terminate() self.timed_out = True else: self.timed_out = False

Luego de otra clase o archivo:

outFile = tempfile.SpooledTemporaryFile() errFile = tempfile.SpooledTemporaryFile() executor = command.Command(c, outFile, errFile, timeout) executor.daemon = True executor.start() executor.join() if executor.timed_out: out = ''timed out'' else: outFile.seek(0) errFile.seek(0) out = outFile.read() err = errFile.read() outFile.close() errFile.close()


He modificado la respuesta de sussudio . Ahora la función devuelve: ( returncode , stdout , stderr , timeout ) - stdout y stderr se decodifican en la cadena utf-8

def kill_proc(proc, timeout): timeout["value"] = True proc.kill() def run(cmd, timeout_sec): proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) timeout = {"value": False} timer = Timer(timeout_sec, kill_proc, [proc, timeout]) timer.start() stdout, stderr = proc.communicate() timer.cancel() return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]



La respuesta de jcollado se puede simplificar usando la clase threading.Timer :

import shlex from subprocess import Popen, PIPE from threading import Timer def run(cmd, timeout_sec): proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE) timer = Timer(timeout_sec, proc.kill) try: timer.start() stdout, stderr = proc.communicate() finally: timer.cancel() # Examples: both take 1 second run("sleep 1", 5) # process ends normally at 1 second run("sleep 5", 1) # timeout happens at 1 second



No sé mucho sobre los detalles de bajo nivel; pero, dado que en Python 2.6, la API ofrece la posibilidad de esperar subprocesos y finalizar procesos, ¿qué hay de ejecutar el proceso en un subproceso separado?

import subprocess, threading class Command(object): def __init__(self, cmd): self.cmd = cmd self.process = None def run(self, timeout): def target(): print ''Thread started'' self.process = subprocess.Popen(self.cmd, shell=True) self.process.communicate() print ''Thread finished'' thread = threading.Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): print ''Terminating process'' self.process.terminate() thread.join() print self.process.returncode command = Command("echo ''Process started''; sleep 2; echo ''Process finished''") command.run(timeout=3) command.run(timeout=1)

La salida de este fragmento en mi máquina es:

Thread started Process started Process finished Thread finished 0 Thread started Process started Terminating process Thread finished -15

donde se puede ver que, en la primera ejecución, el proceso finalizó correctamente (código de retorno 0), mientras que en la segunda se terminó el proceso (código de retorno -15).

No he probado en windows; pero, aparte de actualizar el comando de ejemplo, creo que debería funcionar ya que no he encontrado en la documentación nada que diga que thread.join o process.terminate no sea compatible.


No sé por qué no se menciona, pero desde Python 3.5, hay un nuevo comando universal subprocess.run (que reemplaza a check_call , check_output ...) y que también tiene el parámetro timeout .

subprocess.run (args, *, stdin = None, input = None, stdout = None, stderr = None, shell = False, cwd = None, timeout = None, check = False, encoding = None, errors = None)

Run the command described by args. Wait for command to complete, then return a CompletedProcess instance.

Provoca una excepción subprocess.TimeoutExpired cuando el tiempo de espera ha caducado.


Otra opción es escribir en un archivo temporal para evitar el bloqueo de la salida estándar en lugar de tener que sondear con comunicarse (). Esto funcionó para mí donde las otras respuestas no lo hicieron; por ejemplo en windows.

outFile = tempfile.SpooledTemporaryFile() errFile = tempfile.SpooledTemporaryFile() proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False) wait_remaining_sec = timeout while proc.poll() is None and wait_remaining_sec > 0: time.sleep(1) wait_remaining_sec -= 1 if wait_remaining_sec <= 0: killProc(proc.pid) raise ProcessIncompleteError(proc, timeout) # read temp streams from start outFile.seek(0); errFile.seek(0); out = outFile.read() err = errFile.read() outFile.close() errFile.close()


Preponer el timeout comando de Linux no es una mala solución y funcionó para mí.

cmd = "timeout 20 "+ cmd subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) (output, err) = p.communicate()


Puedes hacerlo usando select

import subprocess from datetime import datetime from select import select def call_with_timeout(cmd, timeout): started = datetime.now() sp = subprocess.Popen(cmd, stdout=subprocess.PIPE) while True: p = select([sp.stdout], [], [], timeout) if p[0]: p[0][0].read() ret = sp.poll() if ret is not None: return ret if (datetime.now()-started).total_seconds() > timeout: sp.kill() return None


Si está usando Python 2, inténtelo.

import subprocess32 try: output = subprocess32.check_output(command, shell=True, timeout=3) except subprocess32.TimeoutExpired as e: print e


Si estás en Unix,

import signal ... class Alarm(Exception): pass def alarm_handler(signum, frame): raise Alarm signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(5*60) # 5 minutes try: stdoutdata, stderrdata = proc.communicate() signal.alarm(0) # reset the alarm except Alarm: print "Oops, taking too long!" # whatever else


Tuve el problema de que quería terminar un subproceso de subprocesos múltiples si tomaba más tiempo que un tiempo de espera determinado. Quería establecer un tiempo de espera en Popen() , pero no funcionó. Entonces, me di cuenta de que Popen().wait() es igual a call() y, por lo tanto, tuve la idea de establecer un tiempo de espera dentro del .wait(timeout=xxx) , que finalmente funcionó. Así, lo resolví de esta manera:

import os import sys import signal import subprocess from multiprocessing import Pool cores_for_parallelization = 4 timeout_time = 15 # seconds def main(): jobs = [...YOUR_JOB_LIST...] with Pool(cores_for_parallelization) as p: p.map(run_parallel_jobs, jobs) def run_parallel_jobs(args): # Define the arguments including the paths initial_terminal_command = ''C://Python34//python.exe'' # Python executable function_to_start = ''C://temp//xyz.py'' # The multithreading script final_list = [initial_terminal_command, function_to_start] final_list.extend(args) # Start the subprocess and determine the process PID subp = subprocess.Popen(final_list) # starts the process pid = subp.pid # Wait until the return code returns from the function by considering the timeout. # If not, terminate the process. try: returncode = subp.wait(timeout=timeout_time) # should be zero if accomplished except subprocess.TimeoutExpired: # Distinguish between Linux and Windows and terminate the process if # the timeout has been expired if sys.platform == ''linux2'': os.kill(pid, signal.SIGTERM) elif sys.platform == ''win32'': subp.terminate() if __name__ == ''__main__'': main()


Una vez que comprenda el proceso completo ejecutando maquinaria en * unix, encontrará fácilmente una solución más simple:

Considere este sencillo ejemplo de cómo hacer que el método de tiempo de comunicación () se pueda usar usando select.select () (disponible al menos en cada momento en * nix en la actualidad). Esto también se puede escribir con epoll / poll / kqueue, pero la variante select.select () podría ser un buen ejemplo para usted. Y las principales limitaciones de select.select () (velocidad y 1024 fds máx.) No son aplicables para su tarea.

Esto funciona bajo * nix, no crea subprocesos, no usa señales, puede ser lanzado desde cualquier subproceso (no solo principal), y lo suficientemente rápido para leer 250 mb / s de datos de stdout en mi máquina (i5 2.3ghz).

Hay un problema en unirse a stdout / stderr al final de la comunicación. Si tiene una salida de programa enorme, esto podría llevar a un gran uso de memoria. Pero puede llamar a se comunique () varias veces con tiempos de espera más pequeños.

class Popen(subprocess.Popen): def communicate(self, input=None, timeout=None): if timeout is None: return subprocess.Popen.communicate(self, input) if self.stdin: # Flush stdio buffer, this might block if user # has been writing to .stdin in an uncontrolled # fashion. self.stdin.flush() if not input: self.stdin.close() read_set, write_set = [], [] stdout = stderr = None if self.stdin and input: write_set.append(self.stdin) if self.stdout: read_set.append(self.stdout) stdout = [] if self.stderr: read_set.append(self.stderr) stderr = [] input_offset = 0 deadline = time.time() + timeout while read_set or write_set: try: rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time())) except select.error as ex: if ex.args[0] == errno.EINTR: continue raise if not (rlist or wlist): # Just break if timeout # Since we do not close stdout/stderr/stdin, we can call # communicate() several times reading data by smaller pieces. break if self.stdin in wlist: chunk = input[input_offset:input_offset + subprocess._PIPE_BUF] try: bytes_written = os.write(self.stdin.fileno(), chunk) except OSError as ex: if ex.errno == errno.EPIPE: self.stdin.close() write_set.remove(self.stdin) else: raise else: input_offset += bytes_written if input_offset >= len(input): self.stdin.close() write_set.remove(self.stdin) # Read stdout / stderr by 1024 bytes for fn, tgt in ( (self.stdout, stdout), (self.stderr, stderr), ): if fn in rlist: data = os.read(fn.fileno(), 1024) if data == '''': fn.close() read_set.remove(fn) tgt.append(data) if stdout is not None: stdout = ''''.join(stdout) if stderr is not None: stderr = ''''.join(stderr) return (stdout, stderr)


para python 2.6+, usa gevent

from gevent.subprocess import Popen, PIPE, STDOUT def call_sys(cmd, timeout): p= Popen(cmd, shell=True, stdout=PIPE) output, _ = p.communicate(timeout=timeout) assert p.returncode == 0, p. returncode return output call_sys(''./t.sh'', 2) # t.sh example sleep 5 echo done exit 1


python 2.7

import time import subprocess def run_command(cmd, timeout=0): start_time = time.time() df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) while timeout and df.poll() == None: if time.time()-start_time >= timeout: df.kill() return -1, "" output = ''/n''.join(df.communicate()).strip() return df.returncode, output


sorprendido nadie mencionó usar el timeout

timeout 5 ping -c 3 somehost

Obviamente, esto no funcionará para cada caso de uso, pero si se trata de un script simple, es difícil de superar.

También disponible como gtimeout en coreutils a través de homebrew para usuarios de mac.


timeout ahora es compatible con call() y se communicate() en el módulo de subproceso (a partir de Python3.3):

import subprocess subprocess.call("command", timeout=20, shell=True)

Esto llamará al comando y levantará la excepción.

subprocess.TimeoutExpired

Si el comando no termina después de 20 segundos.

Luego puede manejar la excepción para continuar su código, algo como:

try: subprocess.call("command", timeout=20, shell=True) except subprocess.TimeoutExpired: # insert code here

Espero que esto ayude.


https://pypi.python.org/pypi/python-subprocess2 proporciona extensiones al módulo de subproceso que le permiten esperar hasta un cierto período de tiempo; de lo contrario, finalice.

Por lo tanto, esperar hasta 10 segundos para que finalice el proceso, de lo contrario, elimine:

pipe = subprocess.Popen(''...'') timeout = 10 results = pipe.waitOrTerminate(timeout)

Esto es compatible con Windows y Unix. "resultados" es un diccionario, contiene "returnCode", que es el retorno de la aplicación (o Ninguno si se tuvo que eliminar), así como "actionTaken". que será "SUBPROCESS2_PROCESS_COMPLETED" si el proceso se completó normalmente, o una máscara de "SUBPROCESS2_PROCESS_TERMINATED" y SUBPROCESS2_PROCESS_KILLED según la acción tomada (consulte la información completa)


jcollado la solución con subprocesos desde jcollado a mi módulo de Python easyprocess .

Instalar:

pip install easyprocess

Ejemplo:

from easyprocess import Proc # shell is not supported! stdout=Proc(''ping localhost'').call(timeout=1.5).stdout print stdout


import subprocess, optparse, os, sys, re, datetime, threading, time, glob, shutil, xml.dom.minidom, traceback class OutputManager: def __init__(self, filename, mode, console, logonly): self.con = console self.logtoconsole = True self.logtofile = False if filename: try: self.f = open(filename, mode) self.logtofile = True if logonly == True: self.logtoconsole = False except IOError: print (sys.exc_value) print ("Switching to console only output.../n") self.logtofile = False self.logtoconsole = True def write(self, data): if self.logtoconsole == True: self.con.write(data) if self.logtofile == True: self.f.write(data) sys.stdout.flush() def getTimeString(): return time.strftime("%Y-%m-%d", time.gmtime()) def runCommand(command): '''''' Execute a command in new thread and return the stdout and stderr content of it. '''''' try: Output = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True).communicate()[0] except Exception as e: print ("runCommand failed :%s" % (command)) print (str(e)) sys.stdout.flush() return None return Output def GetOs(): Os = "" if sys.platform.startswith(''win32''): Os = "win" elif sys.platform.startswith(''linux''): Os = "linux" elif sys.platform.startswith(''darwin''): Os = "mac" return Os def check_output(*popenargs, **kwargs): try: if ''stdout'' in kwargs: raise ValueError(''stdout argument not allowed, it will be overridden.'') # Get start time. startTime = datetime.datetime.now() timeoutValue=3600 cmd = popenargs[0] if sys.platform.startswith(''win32''): process = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True) elif sys.platform.startswith(''linux''): process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) elif sys.platform.startswith(''darwin''): process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) stdoutdata, stderrdata = process.communicate( timeout = timeoutValue ) retcode = process.poll() #################################### # Catch crash error and log it. #################################### OutputHandle = None try: if retcode >= 1: OutputHandle = OutputManager( ''CrashJob_'' + getTimeString() + ''.txt'', ''a+'', sys.stdout, False) OutputHandle.write( cmd ) print (stdoutdata) print (stderrdata) sys.stdout.flush() except Exception as e: print (str(e)) except subprocess.TimeoutExpired: #################################### # Catch time out error and log it. #################################### Os = GetOs() if Os == ''win'': killCmd = "taskkill /FI /"IMAGENAME eq {0}/" /T /F" elif Os == ''linux'': killCmd = "pkill {0)" elif Os == ''mac'': # Linux, Mac OS killCmd = "killall -KILL {0}" runCommand(killCmd.format("java")) runCommand(killCmd.format("YouApp")) OutputHandle = None try: OutputHandle = OutputManager( ''KillJob_'' + getTimeString() + ''.txt'', ''a+'', sys.stdout, False) OutputHandle.write( cmd ) except Exception as e: print (str(e)) except Exception as e: for frame in traceback.extract_tb(sys.exc_info()[2]): fname,lineno,fn,text = frame print "Error in %s on line %d" % (fname, lineno)