procesos multitarea multiprocesamiento crear concurrentes con python multiprocessing generator

python - multitarea - multiprocesamiento con grandes datos



multitarea en python (3)

Esto suena como un caso de uso ideal para una cola: http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

Simplemente introduzca sus resultados en la cola de los trabajadores agrupados e ingiéralos en el maestro.

Tenga en cuenta que todavía puede tener problemas de presión de memoria a menos que drene la cola casi tan rápido como los trabajadores la están llenando. Podría limitar el tamaño de la cola (el número máximo de objetos que cabrán en la cola), en cuyo caso los trabajadores agrupados bloquearían las declaraciones queue.put hasta que haya espacio disponible en la cola. Esto pondría un techo al uso de la memoria. Pero si está haciendo esto, puede ser el momento de reconsiderar si necesita agruparse y / o si tiene sentido usar menos trabajadores.

Estoy usando multiprocessing.Pool() para paralelizar algunos cálculos pesados.

La función de destino devuelve una gran cantidad de datos (una lista enorme). Me estoy quedando sin memoria RAM.

Sin el multiprocessing , solo cambiaría la función objetivo en un generador, generando los elementos resultantes uno tras otro, a medida que se calculan.

Entiendo que el multiprocesamiento no es compatible con los generadores: espera toda la salida y la devuelve de inmediato, ¿verdad? No ceder. ¿Hay alguna manera de hacer que los empleados del Pool produzcan datos tan pronto como estén disponibles, sin construir la matriz de resultados completa en la RAM?

Ejemplo simple:

def target_fnc(arg): result = [] for i in xrange(1000000): result.append(''dvsdbdfbngd'') # <== would like to just use yield! return result def process_args(some_args): pool = Pool(16) for result in pool.imap_unordered(target_fnc, some_args): for element in result: yield element

Esto es Python 2.7.


Según su descripción, parece que no está tan interesado en procesar los datos como entran, sino en evitar pasar una list millones de elementos.

Hay una forma más sencilla de hacerlo: simplemente coloque los datos en un archivo. Por ejemplo:

def target_fnc(arg): fd, path = tempfile.mkstemp(text=True) with os.fdopen(fd) as f: for i in xrange(1000000): f.write(''dvsdbdfbngd/n'') return path def process_args(some_args): pool = Pool(16) for result in pool.imap_unordered(target_fnc, some_args): with open(result) as f: for element in f: yield element

Obviamente, si sus resultados pueden contener nuevas líneas, o no son cadenas, etc., querrá usar un archivo csv , un numpy , etc. en lugar de un archivo de texto simple, pero la idea es la misma.

Dicho esto, incluso si esto es más simple, por lo general hay beneficios al procesar los datos un trozo a la vez, por lo que dividir sus tareas o usar una Queue (como sugieren las otras dos respuestas) puede ser mejor, si las desventajas (respectivamente) , que necesitan una manera de dividir las tareas, o tener que poder consumir los datos tan rápido como se producen) no son un factor decisivo.


Si sus tareas pueden devolver datos en fragmentos ... ¿se pueden dividir en tareas más pequeñas, cada una de las cuales devuelve un solo fragmento? Obviamente, esto no siempre es posible. Cuando no es así, tienes que usar algún otro mecanismo (como una Queue , como sugiere Loren Abrams). Pero cuando lo es , probablemente sea una mejor solución por otras razones, así como para resolver este problema.

Con tu ejemplo, esto es ciertamente factible. Por ejemplo:

def target_fnc(arg, low, high): result = [] for i in xrange(low, high): result.append(''dvsdbdfbngd'') # <== would like to just use yield! return result def process_args(some_args): pool = Pool(16) pool_args = [] for low in in range(0, 1000000, 10000): pool_args.extend(args + [low, low+10000] for args in some_args) for result in pool.imap_unordered(target_fnc, pool_args): for element in result: yield element

(Por supuesto, puede reemplazar el bucle con una comprensión anidada, o con una zip y flatten , si lo prefiere).

Entonces, si some_args es [1, 2, 3] , obtendrás 300 tareas— [[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …] , cada uno de los cuales solo devuelve 10000 elementos en lugar de 1000000.