parallelism - python new process
python subclassing multiprocessing.Process (4)
Subclassing multiprocessing.Process
:
Sin embargo, no puedo recuperar los valores, ¿cómo puedo usar las colas de esta manera?
El proceso necesita una Queue()
para recibir los resultados ... Un ejemplo de cómo subclasificar el multiprocessing.Process
sigue ...
from multiprocessing import Process, Queue
class Processor(Process):
def __init__(self, queue, idx, **kwargs):
super(Processor, self).__init__()
self.queue = queue
self.idx = idx
self.kwargs = kwargs
def run(self):
"""Build some CPU-intensive tasks to run via multiprocessing here."""
hash(self.kwargs) # Shameless usage of CPU for no gain...
## Return some information back through multiprocessing.Queue
## NOTE: self.name is an attribute of multiprocessing.Process
self.queue.put("Process idx={0} is called ''{1}''".format(self.idx, self.name))
if __name__ == "__main__":
NUMBER_OF_PROCESSES = 5
## Create a list to hold running Processor object instances...
processes = list()
q = Queue() # Build a single queue to send to all process objects...
for i in range(0, NUMBER_OF_PROCESSES):
p=Processor(queue=q, idx=i)
p.start()
processes.append(p)
# Incorporating ideas from this answer, below...
# https://stackoverflow.com/a/42137966/667301
[proc.join() for proc in processes]
while not q.empty():
print "RESULT: {0}".format(q.get()) # get results from the queue...
En mi máquina, esto resulta en ...
$ python test.py
RESULT: Process idx=0 is called ''Processor-1''
RESULT: Process idx=4 is called ''Processor-5''
RESULT: Process idx=3 is called ''Processor-4''
RESULT: Process idx=1 is called ''Processor-2''
RESULT: Process idx=2 is called ''Processor-3''
$
Utilizando multiprocessing.Pool
:
FWIW, una desventaja que he encontrado para crear subclases de multiprocessing.Process
es que no se puede aprovechar todas las bondades integradas del multiprocessing.Pool
. multiprocessing.Pool
; Pool
le ofrece una muy buena API si no necesita su código de productor y consumidor para hablar entre sí a través de una cola.
Puede hacer mucho solo con algunos valores de retorno creativos ... en el siguiente ejemplo, uso un dict()
para encapsular los valores de entrada y salida de pool_job()
...
from multiprocessing import Pool
def pool_job(input_val=0):
# FYI, multiprocessing.Pool can''t guarantee that it keeps inputs ordered correctly
# dict format is {input: output}...
return {''pool_job(input_val={0})''.format(input_val): int(input_val)*12}
pool = Pool(5) # Use 5 multiprocessing processes to handle jobs...
results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job()
print results
Esto resulta en:
[
{''pool_job(input_val=0)'': 0},
{''pool_job(input_val=1)'': 12},
{''pool_job(input_val=2)'': 24},
{''pool_job(input_val=3)'': 36},
{''pool_job(input_val=4)'': 48},
{''pool_job(input_val=5)'': 60},
{''pool_job(input_val=6)'': 72},
{''pool_job(input_val=7)'': 84},
{''pool_job(input_val=8)'': 96},
{''pool_job(input_val=9)'': 108},
{''pool_job(input_val=10)'': 120},
{''pool_job(input_val=11)'': 132}
]
Obviamente, hay muchas otras mejoras que se pueden hacer en pool_job()
, como el manejo de errores, pero esto ilustra lo esencial. FYI, esta respuesta proporciona otro ejemplo de cómo usar multiprocessing.Pool
. multiprocessing.Pool
.
Soy nuevo en python orientado a objetos y estoy reescribiendo mi aplicación existente como una versión orientada a objetos, porque ahora los desarrolladores están aumentando y mi código se está volviendo inmanejable.
Normalmente utilizo colas de multiprocesamiento pero encontré en este ejemplo http://www.doughellmann.com/PyMOTW/multiprocessing/basics.html que puedo subclasificar el multiprocessing.Process
así que creo que es una buena idea y escribí una clase para probar como esta:
código:
from multiprocessing import Process
class Processor(Process):
def return_name(self):
return "Process %s" % self.name
def run(self):
return self.return_name()
processes = []
if __name__ == "__main__":
for i in range(0,5):
p=Processor()
processes.append(p)
p.start()
for p in processes:
p.join()
Sin embargo, no puedo recuperar los valores, ¿cómo puedo usar las colas de esta manera?
EDITAR: Quiero obtener el valor de retorno y pensar dónde colocar Queues()
.
El valor de retorno de Process.run
no va a ninguna parte. Debe enviarlos de vuelta al proceso padre, por ejemplo, utilizando un multiprocessing.Queue
( aquí los documentos ).
Muchas gracias a todos.
Ahora aquí es cómo lo hice :)
En este ejemplo, uso múltiples queus ya que no quiero comunicarme entre cada ohter, pero solo con el proceso principal.
from multiprocessing import Process,Queue
class Processor(Process):
def __init__(self,queue):
Process.__init__(self)
self.que=queue
def get_name(self):
return "Process %s" % self.name
def run(self):
self.que.put(self.get_name())
if __name__ == "__main__":
processes = []
for i in range(0,5):
p=Processor(Queue())
processes.append(p)
p.start()
for p in processes:
p.join()
print p.que.get()
La respuesta de Mike es la mejor, pero para completar, quiero mencionar que prefiero cosechar la cola de contextos de join
para que el último bit se vea así:
[proc.join() for proc in processes] # 1. join
while not q.empty(): # 2. get the results
print "RESULT: %s" % q.get()