async - python multiprocessing windows
¿Cómo puedo paralelizar un simple bucle de Python? (7)
¿Cuál es la forma más fácil de paralelizar este código?
Me gusta concurrent.futures
para esto, disponible en Python3 desde la versión 3.2 - y a través de backport a 2.6 y 2.7 en PyPi .
Puede usar hilos o procesos y usar exactamente la misma interfaz.
Multiprocesamiento
Pon esto en un archivo - futuretest.py:
import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don''t supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let''s see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f''original inputs: {repr(output1)}'')
print(f''total time to execute {sum(output2)} = sum({repr(output2)})'')
print(f''time saved by parallelizing: {sum(output2) - (finish-start)}'')
print(f''returned in order given: {repr(output3)}'')
if __name__ == ''__main__'':
main()
Y aquí está la salida:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Multihilo
Ahora cambie ProcessPoolExecutor
a ThreadPoolExecutor
y ThreadPoolExecutor
ejecutar el módulo:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
¡Ahora has hecho tanto multihilo como multiprocesamiento!
Tenga en cuenta el rendimiento y el uso de ambos juntos.
El muestreo es demasiado pequeño para comparar los resultados.
Sin embargo, sospecho que el multihilo será más rápido que el multiprocesamiento en general, especialmente en Windows, ya que Windows no admite bifurcaciones, por lo que cada nuevo proceso debe tomarse un tiempo para su lanzamiento. En Linux o Mac, probablemente estarán más cerca.
Puede anidar varios subprocesos en varios procesos, pero se recomienda no utilizar varios subprocesos para derivar procesos múltiples.
Esta es probablemente una pregunta trivial, pero ¿cómo puedo paralelizar el siguiente ciclo en Python?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
Sé cómo iniciar subprocesos individuales en Python, pero no sé cómo "recopilar" los resultados.
Múltiples procesos también estarían bien, lo que sea más fácil para este caso. Estoy usando actualmente Linux, pero el código debería ejecutarse en Windows y Mac como ... bueno.
¿Cuál es la forma más fácil de paralelizar este código?
¿por qué no usas hilos y un mutex para proteger una lista global?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
ten en cuenta que serás tan rápido como tu hilo más lento
Echa un vistazo a esto;
http://docs.python.org/library/queue.html
Esta podría no ser la manera correcta de hacerlo, pero haría algo así como;
Código real;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == ''__main__'':
Main()
Espero que ayude.
Esto podría ser útil al implementar multiprocesamiento y computación distribuida / paralela en Python.
Tutorial de YouTube sobre el uso del paquete techila
Techila es un middleware de informática distribuida, que se integra directamente con Python utilizando el paquete techila. La función de melocotón en el paquete puede ser útil para paralelizar estructuras de bucle. (El siguiente fragmento de código es de los foros de la comunidad de Techila )
techila.peach(funcname = ''theheavyalgorithm'', # Function that will be called on the compute nodes/ Workers
files = ''theheavyalgorithm.py'', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
Para paralelizar un bucle for simple, joblib aporta un gran valor al uso crudo de multiprocesamiento. No solo la breve sintaxis, sino también elementos como el agrupamiento transparente de iteraciones cuando son muy rápidas (para eliminar la sobrecarga) o la captura del rastreo del proceso secundario, para tener un mejor informe de errores.
Descargo de responsabilidad: soy el autor original de joblib.
Usar múltiples hilos en CPython no le dará un mejor rendimiento para el código Python puro debido al bloqueo de intérprete global (GIL). Sugiero usar el módulo de multiprocessing
lugar:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Tenga en cuenta que esto no funcionará en el intérprete interactivo.
Para evitar el FUD usual alrededor del GIL: de todos modos, no habría ninguna ventaja al usar subprocesos para este ejemplo. Desea utilizar procesos aquí, no hilos, ya que evitan un montón de problemas.
ejemplo muy simple de procesamiento paralelo es
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == ''__main__'':
p = Process(target=pa.yourfunction, args=(''bob'',))
p.start()
p.join()