procesos libreria ejemplos crear concurrentes con python multiprocessing pool

python - libreria - ¿Los grupos de multiprocesamiento otorgan a cada proceso la misma cantidad de tareas o están asignados como disponibles?



multiprocessing python ejemplos (3)

Así que dado este código de sugerencia no probado; si hay 4 procesos en el grupo, a cada proceso se le asignan 25 tareas para hacer, o si las 100 cosas se seleccionan una por una por los procesos que buscan cosas que hacer para que cada proceso pueda hacer un número diferente de cosas, por ejemplo, 30 , 26, 24, 20.

Bueno, la respuesta obvia es probarlo.

Tal como está, es posible que la prueba no le diga mucho, porque los trabajos se terminarán lo antes posible, y es posible que las cosas terminen distribuidas uniformemente incluso si los procesos agrupados capturan los trabajos a medida que están listos. Pero hay una manera fácil de arreglar eso:

import collections import multiprocessing import os import random import time def generate_stuff(): for foo in range(100): yield foo def process(moo): #print moo time.sleep(random.randint(0, 50) / 10.) return os.getpid() pool = multiprocessing.Pool() pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1) pool.close() print collections.Counter(pids)

Si los números son "irregulares", sabe que los procesos agrupados deben estar tomando nuevos trabajos como están listos. (Establecí explícitamente chunksize a 1 para asegurarme de que los trozos no sean tan grandes que cada uno solo obtenga un trozo en primer lugar).

Cuando lo ejecuto en una máquina de 8 núcleos:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

Entonces, parece que los procesos están consiguiendo nuevos trabajos sobre la marcha.

Ya que usted preguntó específicamente sobre 4 trabajadores, cambié Pool() a Pool(4) y obtuve esto:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

Sin embargo, hay una manera aún mejor de averiguarlo que mediante las pruebas: leer la fuente .

Como puede ver, map simplemente llama map_async , que crea un montón de lotes y los coloca en un objeto self._taskqueue (una instancia de Queue.Queue ). Si sigue leyendo, esta cola no se comparte con los otros procesos directamente, pero hay un hilo de administrador de grupo que, cada vez que un proceso finaliza y devuelve un resultado, saca el siguiente trabajo de la cola y lo envía de nuevo al proceso.

Esta es también la forma en que puede averiguar cuál es el tamaño predeterminado para el map . La implementación 2.7 vinculada anteriormente muestra que es solo len(iterable) / (len(self._pool) * 4) redondeado (un poco más detallado que para evitar la aritmética fraccional), o dicho de otra manera, lo suficientemente grande para aproximadamente 4 trozos por proceso. Pero realmente no deberías confiar en esto; La documentación, de manera vaga e indirecta, implica que va a utilizar algún tipo de heurística, pero no le da ninguna garantía en cuanto a lo que será. Entonces, si realmente necesita "aproximadamente 4 trozos por proceso", calcúlelo explícitamente. De manera más realista, si alguna vez necesita algo que no sea el predeterminado, probablemente necesite un valor específico de dominio que va a calcular (mediante cálculo, adivinación o creación de perfiles).

Cuando se asigna un iterable a un multiprocessing.Pool ¿Las iteraciones se dividen en una cola para cada proceso en el grupo al comienzo, o hay una cola común de la que se toma una tarea cuando un proceso se libera?

def generate_stuff(): for foo in range(100): yield foo def process(moo): print moo pool = multiprocessing.Pool() pool.map(func=process, iterable=generate_stuff()) pool.close()

Así que dado este código de sugerencia no probado; si hay 4 procesos en el grupo, a cada proceso se le asignan 25 tareas para hacer, o si las 100 cosas se seleccionan una por una por los procesos que buscan cosas que hacer para que cada proceso pueda hacer un número diferente de cosas, por ejemplo, 30 , 26, 24, 20.


Para estimar el chunksize utilizado por una implementación de Python sin mirar el código fuente de su módulo de multiprocessing , ejecute:

#!/usr/bin/env python import multiprocessing as mp from itertools import groupby def work(index): mp.get_logger().info(index) return index, mp.current_process().name if __name__ == "__main__": import logging import sys logger = mp.log_to_stderr() # process cmdline args try: sys.argv.remove(''--verbose'') except ValueError: pass # not verbose else: logger.setLevel(logging.INFO) # verbose nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1)) # choices: ''map'', ''imap'', ''imap_unordered'' map_name = sys.argv[1] if len(sys.argv) > 1 else ''map'' kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {} # estimate chunksize used max_chunksize = 0 map_func = getattr(mp.Pool(nprocesses), map_name) for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs), key=lambda x: x[0]), # sort by index key=lambda x: x[1]): # group by process name max_chunksize = max(max_chunksize, len(list(group))) print("%s: max_chunksize %d" % (map_name, max_chunksize))

Muestra que imap , imap_unordered usa chunksize=1 por defecto y max_chunksize para el map depende de nprocesses , nitem (la cantidad de trozos por proceso no es fija) y max_chunksize depende de la versión de python. Todas las funciones *map* tienen en cuenta el parámetro chunksize si se especifica.

Uso

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

Para ver cómo se distribuyen los trabajos individuales; especifique el parámetro --verbose .


http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

Este método divide el iterable en varios fragmentos que envía al grupo de procesos como tareas separadas. El tamaño (aproximado) de estos trozos se puede especificar estableciendo chunksize en un entero positivo.

Supongo que un proceso retira el siguiente fragmento de una cola cuando se realiza con el anterior.

El chunksize predeterminado de chunksize depende de la longitud de iterable y se elige de modo que el número de trozos sea aproximadamente cuatro veces el número de procesos. (source)