python - libreria - ¿Los grupos de multiprocesamiento otorgan a cada proceso la misma cantidad de tareas o están asignados como disponibles?
multiprocessing python ejemplos (3)
Así que dado este código de sugerencia no probado; si hay 4 procesos en el grupo, a cada proceso se le asignan 25 tareas para hacer, o si las 100 cosas se seleccionan una por una por los procesos que buscan cosas que hacer para que cada proceso pueda hacer un número diferente de cosas, por ejemplo, 30 , 26, 24, 20.
Bueno, la respuesta obvia es probarlo.
Tal como está, es posible que la prueba no le diga mucho, porque los trabajos se terminarán lo antes posible, y es posible que las cosas terminen distribuidas uniformemente incluso si los procesos agrupados capturan los trabajos a medida que están listos. Pero hay una manera fácil de arreglar eso:
import collections
import multiprocessing
import os
import random
import time
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
#print moo
time.sleep(random.randint(0, 50) / 10.)
return os.getpid()
pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
Si los números son "irregulares", sabe que los procesos agrupados deben estar tomando nuevos trabajos como están listos. (Establecí explícitamente chunksize
a 1 para asegurarme de que los trozos no sean tan grandes que cada uno solo obtenga un trozo en primer lugar).
Cuando lo ejecuto en una máquina de 8 núcleos:
Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
Entonces, parece que los procesos están consiguiendo nuevos trabajos sobre la marcha.
Ya que usted preguntó específicamente sobre 4 trabajadores, cambié Pool()
a Pool(4)
y obtuve esto:
Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
Sin embargo, hay una manera aún mejor de averiguarlo que mediante las pruebas: leer la fuente .
Como puede ver, map
simplemente llama map_async
, que crea un montón de lotes y los coloca en un objeto self._taskqueue
(una instancia de Queue.Queue
). Si sigue leyendo, esta cola no se comparte con los otros procesos directamente, pero hay un hilo de administrador de grupo que, cada vez que un proceso finaliza y devuelve un resultado, saca el siguiente trabajo de la cola y lo envía de nuevo al proceso.
Esta es también la forma en que puede averiguar cuál es el tamaño predeterminado para el map
. La implementación 2.7 vinculada anteriormente muestra que es solo len(iterable) / (len(self._pool) * 4)
redondeado (un poco más detallado que para evitar la aritmética fraccional), o dicho de otra manera, lo suficientemente grande para aproximadamente 4 trozos por proceso. Pero realmente no deberías confiar en esto; La documentación, de manera vaga e indirecta, implica que va a utilizar algún tipo de heurística, pero no le da ninguna garantía en cuanto a lo que será. Entonces, si realmente necesita "aproximadamente 4 trozos por proceso", calcúlelo explícitamente. De manera más realista, si alguna vez necesita algo que no sea el predeterminado, probablemente necesite un valor específico de dominio que va a calcular (mediante cálculo, adivinación o creación de perfiles).
Cuando se asigna un iterable a un multiprocessing.Pool
¿Las iteraciones se dividen en una cola para cada proceso en el grupo al comienzo, o hay una cola común de la que se toma una tarea cuando un proceso se libera?
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
print moo
pool = multiprocessing.Pool()
pool.map(func=process, iterable=generate_stuff())
pool.close()
Así que dado este código de sugerencia no probado; si hay 4 procesos en el grupo, a cada proceso se le asignan 25 tareas para hacer, o si las 100 cosas se seleccionan una por una por los procesos que buscan cosas que hacer para que cada proceso pueda hacer un número diferente de cosas, por ejemplo, 30 , 26, 24, 20.
Para estimar el chunksize
utilizado por una implementación de Python sin mirar el código fuente de su módulo de multiprocessing
, ejecute:
#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby
def work(index):
mp.get_logger().info(index)
return index, mp.current_process().name
if __name__ == "__main__":
import logging
import sys
logger = mp.log_to_stderr()
# process cmdline args
try:
sys.argv.remove(''--verbose'')
except ValueError:
pass # not verbose
else:
logger.setLevel(logging.INFO) # verbose
nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
# choices: ''map'', ''imap'', ''imap_unordered''
map_name = sys.argv[1] if len(sys.argv) > 1 else ''map''
kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}
# estimate chunksize used
max_chunksize = 0
map_func = getattr(mp.Pool(nprocesses), map_name)
for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
key=lambda x: x[0]), # sort by index
key=lambda x: x[1]): # group by process name
max_chunksize = max(max_chunksize, len(list(group)))
print("%s: max_chunksize %d" % (map_name, max_chunksize))
Muestra que imap
, imap_unordered
usa chunksize=1
por defecto y max_chunksize
para el map
depende de nprocesses
, nitem
(la cantidad de trozos por proceso no es fija) y max_chunksize
depende de la versión de python. Todas las funciones *map*
tienen en cuenta el parámetro chunksize
si se especifica.
Uso
$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]
Para ver cómo se distribuyen los trabajos individuales; especifique el parámetro --verbose
.
http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map
map(func, iterable[, chunksize])
Este método divide el iterable en varios fragmentos que envía al grupo de procesos como tareas separadas. El tamaño (aproximado) de estos trozos se puede especificar estableciendo chunksize en un entero positivo.
Supongo que un proceso retira el siguiente fragmento de una cola cuando se realiza con el anterior.
El chunksize
predeterminado de chunksize
depende de la longitud de iterable
y se elige de modo que el número de trozos sea aproximadamente cuatro veces el número de procesos. (source)