set_start_method - python multiprocessing windows
¿Cómo puedo recuperar el valor de retorno de una función pasada a multiprocesamiento.Proceso? (8)
Creo que el enfoque sugerido por @sega_sai es el mejor. Pero realmente necesita un ejemplo de código, así que aquí va:
import multiprocessing
from os import getpid
def worker(procnum):
print ''I am number %d in process %d'' % (procnum, getpid())
return getpid()
if __name__ == ''__main__'':
pool = multiprocessing.Pool(processes = 3)
print pool.map(worker, range(5))
Que imprimirá los valores de retorno:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
Si está familiarizado con el map
(el Python 2 incorporado) esto no debería ser demasiado difícil. De lo contrario, eche un vistazo al enlace de sega_Sai .
Tenga en cuenta qué tan poco código se necesita. (También tenga en cuenta cómo se reutilizan los procesos).
En el siguiente código de ejemplo, me gustaría recuperar el valor de retorno del worker
de la función. ¿Cómo puedo hacer esto? ¿Dónde está almacenado este valor?
Código de ejemplo:
import multiprocessing
def worker(procnum):
''''''worker function''''''
print str(procnum) + '' represent!''
return procnum
if __name__ == ''__main__'':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
Salida:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Parece que no puedo encontrar el atributo relevante en los objetos almacenados en los jobs
.
Gracias de antemano, blz
Este ejemplo muestra cómo usar una lista de instancias multiprocessing.Pipe para devolver cadenas de un número arbitrario de procesos:
import multiprocessing
def worker(procnum, send_end):
''''''worker function''''''
result = str(procnum) + '' represent!''
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == ''__main__'':
main()
Salida:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[''0 represent!'', ''1 represent!'', ''2 represent!'', ''3 represent!'', ''4 represent!'']
Esta solución usa menos recursos que un multiprocessing.Queue utiliza
- un tubo
- al menos un bloqueo
- un amortiguador
- un hilo
o un multiprocessing.SimpleQueue que utiliza
- un tubo
- al menos un bloqueo
Es muy instructivo mirar la fuente de cada uno de estos tipos.
Modifiqué un poco la respuesta de Vartec porque necesitaba obtener los códigos de error de la función. (Gracias vertec !!! es un truco increíble)
Esto también se puede hacer con manager.list
pero creo que es mejor tenerlo en un dict y almacenar una lista dentro de él. De esa forma, conservamos la función y los resultados, ya que no podemos estar seguros del orden en que se completará la lista.
from multiprocessing import Process
import time
import datetime
import multiprocessing
def func1(fn, m_list):
print ''func1: starting''
time.sleep(1)
m_list[fn] = "this is the first function"
print ''func1: finishing''
# return "func1" # no need for return since Multiprocess doesnt return it =(
def func2(fn, m_list):
print ''func2: starting''
time.sleep(3)
m_list[fn] = "this is function 2"
print ''func2: finishing''
# return "func2"
def func3(fn, m_list):
print ''func3: starting''
time.sleep(9)
# if fail wont join the rest because it never populate the dict
# or do a try/except to get something in return.
raise ValueError("failed here")
# if we want to get the error in the manager dict we can catch the error
try:
raise ValueError("failed here")
m_list[fn] = "this is third"
except:
m_list[fn] = "this is third and it fail horrible"
# print ''func3: finishing''
# return "func3"
def runInParallel(*fns): # * is to accept any input in list
start_time = datetime.datetime.now()
proc = []
manager = multiprocessing.Manager()
m_list = manager.dict()
for fn in fns:
# print fn
# print dir(fn)
p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
p.start()
proc.append(p)
for p in proc:
p.join() # 5 is the time out
print datetime.datetime.now() - start_time
return m_list, proc
if __name__ == ''__main__'':
manager, proc = runInParallel(func1, func2, func3)
# print dir(proc[0])
# print proc[0]._name
# print proc[0].name
# print proc[0].exitcode
# here you can check what did fail
for i in proc:
print i.name, i.exitcode # name was set up in the Process line 53
# here will only show the function that worked and where able to populate the
# manager dict
for i, j in manager.items():
print dir(i) # things you can do to the function
print i, j
Para cualquier otra persona que esté buscando cómo obtener un valor de un Process
utilizando Queue
:
import multiprocessing
ret = {''foo'': False}
def worker(queue):
ret = queue.get()
ret[''foo''] = True
queue.put(ret)
if __name__ == ''__main__'':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
print queue.get() # Prints {"foo": True}
p.join()
Parece que debes utilizar la clase multiprocesamiento.Pool en su lugar y usar los métodos .apply () .apply_async (), map ()
http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult
Por alguna razón, no pude encontrar un ejemplo general de cómo hacer esto con Queue
cualquier lugar (incluso los ejemplos de doc de Python no generan múltiples procesos), así que esto es lo que obtuve trabajando después de 10 intentos:
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
es una Queue
bloqueo y de seguridad de subprocesos que puede usar para almacenar los valores de retorno de los procesos secundarios. Entonces debes pasar la cola a cada proceso. Algo menos obvio aquí es que debes get()
de la cola antes de join
al Process
o, de lo contrario, la cola se llena y bloquea todo.
Actualización para aquellos que están orientados a objetos (probados en Python 3.4):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
Puede usar la exit
incorporada para establecer el código de salida de un proceso. Se puede obtener del atributo de exitcode
de exitcode
del proceso:
import multiprocessing
def worker(procnum):
print str(procnum) + '' represent!''
exit(procnum)
if __name__ == ''__main__'':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
Salida:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Use una variable compartida para comunicarse. Por ejemplo, así:
import multiprocessing
def worker(procnum, return_dict):
''''''worker function''''''
print str(procnum) + '' represent!''
return_dict[procnum] = procnum
if __name__ == ''__main__'':
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print return_dict.values()