stop - python multithreading espera hasta que todos los hilos terminen
programa de hilos en python (8)
Esto puede haberse preguntado en un contexto similar, pero no pude encontrar una respuesta después de unos 20 minutos de búsqueda, así que lo preguntaré.
He escrito un script de Python (digamos: scriptA.py) y un script (digamos scriptB.py)
En scriptB quiero llamar a scriptA varias veces con diferentes argumentos, cada vez toma alrededor de una hora en ejecutarse (es un script enorme, hace muchas cosas ... no se preocupe) y quiero poder ejecutar el script. scriptA con todos los diferentes argumentos al mismo tiempo, pero tengo que esperar hasta que TODOS hayan terminado antes de continuar; mi código:
import subprocess
#setup
do_setup()
#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)
#finish
do_finish()
Quiero ejecutar todo el subprocess.call()
al mismo tiempo, y luego esperar a que todo esté listo, ¿cómo debo hacer esto?
Intenté usar el enhebrado como el ejemplo here :
from threading import Thread
import subprocess
def call_script(args)
subprocess.call(args)
#run scriptA
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()
Pero no creo que esto sea correcto.
¿Cómo sé que todos han terminado de ejecutarse antes de ir a mi do_finish()
?
Coloque los hilos en una lista y luego use el método Join
threads = []
t = Thread(...)
threads.append(t)
...repeat as often as necessary...
# Start all threads
for x in threads:
x.start()
# Wait for all of them to finish
for x in threads:
x.join()
De la documentación del módulo de threading
Hay un objeto "hilo principal"; esto corresponde al hilo de control inicial en el programa Python. No es un hilo demoníaco.
Existe la posibilidad de que se creen "objetos ficticios". Estos son objetos de subprocesos correspondientes a "hilos de origen", que son hilos de control que se inician fuera del módulo de subprocesamiento, como por ejemplo directamente desde el código C. Los objetos de subprocesos tienen una funcionalidad limitada; siempre se consideran vivos y demoníacos, y no se pueden
join()
ed. Nunca se eliminan, ya que es imposible detectar la terminación de los hilos extraterrestres.
Entonces, para atrapar esos dos casos cuando no esté interesado en mantener una lista de los hilos que crea:
import threading as thrd
def alter_data(data, index):
data[index] *= 2
data = [0, 2, 6, 20]
for i, value in enumerate(data):
thrd.Thread(target=alter_data, args=[data, i]).start()
for thread in thrd.enumerate():
if thread.daemon:
continue
try:
thread.join()
except RuntimeError as err:
if ''cannot join current thread'' in err.args[0]:
# catchs main thread
continue
else:
raise
Después de lo cual:
>>> print(data)
[0, 4, 12, 40]
Debe utilizar el método de join del objeto Thread
al final del script.
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
Por lo tanto, el hilo principal esperará hasta que t1
, t2
y t3
finalicen la ejecución.
En Python3, desde Python 3.2 hay un nuevo enfoque para alcanzar el mismo resultado, que personalmente prefiero al paquete tradicional de creación de hilos / inicio / unión, concurrent.futures
: https://docs.python.org/3/library/concurrent.futures.html
Usando un ThreadPoolExecutor
el código sería:
from concurrent.futures.thread import ThreadPoolExecutor
def call_script(arg)
subprocess.call(scriptA + arg)
args = [argumentsA, argumentsB, argumentsC]
with ThreadPoolExecutor(max_workers=2) as executor:
for arg in args:
executor.submit(call_script, arg)
print(''All tasks has been finished'')
Una de las ventajas es que puede controlar la configuración de rendimiento de los trabajadores simultáneos máximos.
Me encontré con el mismo problema en el que necesitaba esperar a todos los hilos creados con el ciclo for. Simplemente probé la siguiente pieza de código. Puede que no sea la solución perfecta, pero pensé que sería una solución simple. Probar:
for t in threading.enumerate():
try:
t.join()
except RuntimeError as err:
if ''cannot join current thread'' in err:
continue
else:
raise
Prefiero usar la lista de comprensión basada en una lista de entrada:
inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Puede tener una clase similar a la siguiente a partir de la cual puede agregar ''n'' número de funciones o console_scripts que desea ejecutar en paralelo passion e iniciar la ejecución y esperar a que se completen todos los trabajos.
from multiprocessing import Process
class ProcessParallel(object):
"""
To Process the functions parallely
"""
def __init__(self, *jobs):
"""
"""
self.jobs = jobs
self.processes = []
def fork_processes(self):
"""
Creates the process objects for given function deligates
"""
for job in self.jobs:
proc = Process(target=job)
self.processes.append(proc)
def start_all(self):
"""
Starts the functions process all together.
"""
for proc in self.processes:
proc.start()
def join_all(self):
"""
Waits untill all the functions executed.
"""
for proc in self.processes:
proc.join()
def two_sum(a=2, b=2):
return a + b
def multiply(a=2, b=2):
return a * b
#How to run:
if __name__ == ''__main__'':
#note: two_sum, multiply can be replace with any python console scripts which
#you wanted to run parallel..
procs = ProcessParallel(two_sum, multiply)
#Add all the process in list
procs.fork_processes()
#starts process execution
procs.start_all()
#wait until all the process got executed
procs.join_all()
Tal vez, algo como
for t in threading.enumerate():
if t.daemon:
t.join()