lock async python parallel-processing

async - python multiprocessing windows



¿Cómo puedo paralelizar un simple bucle de Python? (7)

¿Cuál es la forma más fácil de paralelizar este código?

Me gusta concurrent.futures para esto, disponible en Python3 desde la versión 3.2 - y a través de backport a 2.6 y 2.7 en PyPi .

Puede usar hilos o procesos y usar exactamente la misma interfaz.

Multiprocesamiento

Pon esto en un archivo - futuretest.py:

import concurrent.futures import time, random # add some random sleep time offset = 2 # you don''t supply these so def calc_stuff(parameter=None): # these are examples. sleep_time = random.choice([0, 1, 2, 3, 4, 5]) time.sleep(sleep_time) return parameter / 2, sleep_time, parameter * parameter def procedure(j): # just factoring out the parameter = j * offset # procedure # call the calculation return calc_stuff(parameter=parameter) def main(): output1 = list() output2 = list() output3 = list() start = time.time() # let''s see how long this takes # we can swap out ProcessPoolExecutor for ThreadPoolExecutor with concurrent.futures.ProcessPoolExecutor() as executor: for out1, out2, out3 in executor.map(procedure, range(0, 10)): # put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3) finish = time.time() # these kinds of format strings are only available on Python 3.6: # time to upgrade! print(f''original inputs: {repr(output1)}'') print(f''total time to execute {sum(output2)} = sum({repr(output2)})'') print(f''time saved by parallelizing: {sum(output2) - (finish-start)}'') print(f''returned in order given: {repr(output3)}'') if __name__ == ''__main__'': main()

Y aquí está la salida:

$ python3 -m futuretest original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4]) time saved by parallellizing: 27.68999981880188 returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Multihilo

Ahora cambie ProcessPoolExecutor a ThreadPoolExecutor y ThreadPoolExecutor ejecutar el módulo:

$ python3 -m futuretest original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1]) time saved by parallellizing: 13.992000102996826 returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

¡Ahora has hecho tanto multihilo como multiprocesamiento!

Tenga en cuenta el rendimiento y el uso de ambos juntos.

El muestreo es demasiado pequeño para comparar los resultados.

Sin embargo, sospecho que el multihilo será más rápido que el multiprocesamiento en general, especialmente en Windows, ya que Windows no admite bifurcaciones, por lo que cada nuevo proceso debe tomarse un tiempo para su lanzamiento. En Linux o Mac, probablemente estarán más cerca.

Puede anidar varios subprocesos en varios procesos, pero se recomienda no utilizar varios subprocesos para derivar procesos múltiples.

Esta es probablemente una pregunta trivial, pero ¿cómo puedo paralelizar el siguiente ciclo en Python?

# setup output lists output1 = list() output2 = list() output3 = list() for j in range(0, 10): # calc individual parameter value parameter = j * offset # call the calculation out1, out2, out3 = calc_stuff(parameter = parameter) # put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3)

Sé cómo iniciar subprocesos individuales en Python, pero no sé cómo "recopilar" los resultados.

Múltiples procesos también estarían bien, lo que sea más fácil para este caso. Estoy usando actualmente Linux, pero el código debería ejecutarse en Windows y Mac como ... bueno.

¿Cuál es la forma más fácil de paralelizar este código?


¿por qué no usas hilos y un mutex para proteger una lista global?

import os import re import time import sys import thread from threading import Thread class thread_it(Thread): def __init__ (self,param): Thread.__init__(self) self.param = param def run(self): mutex.acquire() output.append(calc_stuff(self.param)) mutex.release() threads = [] output = [] mutex = thread.allocate_lock() for j in range(0, 10): current = thread_it(j * offset) threads.append(current) current.start() for t in threads: t.join() #here you have output list filled with data

ten en cuenta que serás tan rápido como tu hilo más lento


Echa un vistazo a esto;

http://docs.python.org/library/queue.html

Esta podría no ser la manera correcta de hacerlo, pero haría algo así como;

Código real;

from multiprocessing import Process, JoinableQueue as Queue class CustomWorker(Process): def __init__(self,workQueue, out1,out2,out3): Process.__init__(self) self.input=workQueue self.out1=out1 self.out2=out2 self.out3=out3 def run(self): while True: try: value = self.input.get() #value modifier temp1,temp2,temp3 = self.calc_stuff(value) self.out1.put(temp1) self.out2.put(temp2) self.out3.put(temp3) self.input.task_done() except Queue.Empty: return #Catch things better here def calc_stuff(self,param): out1 = param * 2 out2 = param * 4 out3 = param * 8 return out1,out2,out3 def Main(): inputQueue = Queue() for i in range(10): inputQueue.put(i) out1 = Queue() out2 = Queue() out3 = Queue() processes = [] for x in range(2): p = CustomWorker(inputQueue,out1,out2,out3) p.daemon = True p.start() processes.append(p) inputQueue.join() while(not out1.empty()): print out1.get() print out2.get() print out3.get() if __name__ == ''__main__'': Main()

Espero que ayude.


Esto podría ser útil al implementar multiprocesamiento y computación distribuida / paralela en Python.

Tutorial de YouTube sobre el uso del paquete techila

Techila es un middleware de informática distribuida, que se integra directamente con Python utilizando el paquete techila. La función de melocotón en el paquete puede ser útil para paralelizar estructuras de bucle. (El siguiente fragmento de código es de los foros de la comunidad de Techila )

techila.peach(funcname = ''theheavyalgorithm'', # Function that will be called on the compute nodes/ Workers files = ''theheavyalgorithm.py'', # Python-file that will be sourced on Workers jobs = jobcount # Number of Jobs in the Project )


Para paralelizar un bucle for simple, joblib aporta un gran valor al uso crudo de multiprocesamiento. No solo la breve sintaxis, sino también elementos como el agrupamiento transparente de iteraciones cuando son muy rápidas (para eliminar la sobrecarga) o la captura del rastreo del proceso secundario, para tener un mejor informe de errores.

Descargo de responsabilidad: soy el autor original de joblib.


Usar múltiples hilos en CPython no le dará un mejor rendimiento para el código Python puro debido al bloqueo de intérprete global (GIL). Sugiero usar el módulo de multiprocessing lugar:

pool = multiprocessing.Pool(4) out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Tenga en cuenta que esto no funcionará en el intérprete interactivo.

Para evitar el FUD usual alrededor del GIL: de todos modos, no habría ninguna ventaja al usar subprocesos para este ejemplo. Desea utilizar procesos aquí, no hilos, ya que evitan un montón de problemas.


ejemplo muy simple de procesamiento paralelo es

from multiprocessing import Process output1 = list() output2 = list() output3 = list() def yourfunction(): for j in range(0, 10): # calc individual parameter value parameter = j * offset # call the calculation out1, out2, out3 = calc_stuff(parameter = parameter) # put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3) if __name__ == ''__main__'': p = Process(target=pa.yourfunction, args=(''bob'',)) p.start() p.join()