lock - python pool apply_async
Pool.map multiproceso de Python para mĂșltiples argumentos (16)
En la biblioteca de multiprocesamiento de Python, ¿existe una variante de pool.map que admita múltiples argumentos?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == ''__main__'':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
¿Existe una variante de pool.map que soporte múltiples argumentos?
Python 3.3 incluye el Pool.starmap :
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
Para versiones anteriores:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
Salida
1 1
2 1
3 1
Observe cómo se itertools.izip()
e itertools.repeat()
aquí.
Debido al bugs.python.org/issue5228 no puede usar functools.partial()
o capacidades similares en Python 2.6, por lo que la simple función de envoltura func_star()
debe definirse explícitamente. Vea también la solución sugerida por uptimebox
.
# "Cómo tomar múltiples argumentos".
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
Creo que lo de abajo será mejor.
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
salida
[3, 5, 7]
Desde Python 3.4.4, puedes usar multiprocessing.get_context () para obtener un objeto de contexto para usar múltiples métodos de inicio:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + '' '' + w)
print(h + '' '' + w)
if __name__ == ''__main__'':
ctx = mp.get_context(''spawn'')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,''hello'', ''world''))
p.start()
print(q.get())
p.join()
O simplemente reemplaza
pool.map(harvester(text,case),case, 1)
por:
pool.apply_async(harvester(text,case),case, 1)
Después de haber aprendido acerca de itertools en la respuesta de JF Sebastian , decidí ir un paso más allá y escribir un paquete parmap
que se encargue de la paralelización, ofreciendo funciones de map
y starmap
en python-2.7 y python-3.2 (y más tarde también) que pueden tomar cualquier número de los argumentos posicionales.
Instalación
pip install parmap
Cómo paralelizar:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
He cargado parmap en PyPI y en un repositorio de github .
Como ejemplo, la pregunta se puede responder de la siguiente manera:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
En la documentación oficial se afirma que apoya solo un argumento iterable. Me gusta usar apply_async en tales casos. En tu caso yo haría:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == ''__main__'':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
Hay una bifurcación de multiprocessing
llamada pathos ( nota: use la versión en github ) que no necesita starmap
mapa de starmap
: las funciones del mapa reflejan la API del mapa de python, por lo que el mapa puede tomar múltiples argumentos. Con pathos
, generalmente también puede hacer multiprocesamiento en el intérprete, en lugar de estar atrapado en el bloque __main__
. Pathos se debe a un lanzamiento, después de algunas actualizaciones leves, principalmente la conversión a Python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
La respuesta a esto depende de la versión y de la situación. La respuesta más general para las versiones recientes de Python (desde la versión 3.3) fue descrita a continuación por JF Sebastian . 1 Utiliza el método Pool.starmap
, que acepta una secuencia de tuplas de argumentos. Luego, automáticamente desempaqueta los argumentos de cada tupla y los pasa a la función dada:
import multiprocessing
from itertools import product
def merge_names(a, b):
return ''{} & {}''.format(a, b)
if __name__ == ''__main__'':
names = [''Brown'', ''Wilson'', ''Bartlett'', ''Rivera'', ''Molloy'', ''Opie'']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: [''Brown & Brown'', ''Brown & Wilson'', ''Brown & Bartlett'', ...
Para versiones anteriores de Python, deberá escribir una función auxiliar para desempaquetar los argumentos explícitamente. Si desea utilizar with
, también deberá escribir un contenedor para convertir Pool
en un administrador de contexto. (Gracias a muon por señalar esto).
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return ''{} & {}''.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == ''__main__'':
names = [''Brown'', ''Wilson'', ''Bartlett'', ''Rivera'', ''Molloy'', ''Opie'']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: [''Brown & Brown'', ''Brown & Wilson'', ''Brown & Bartlett'', ...
En casos más simples, con un segundo argumento fijo, también puede usar partial
, pero solo en Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return ''{} & {}''.format(a, b)
if __name__ == ''__main__'':
names = [''Brown'', ''Wilson'', ''Bartlett'', ''Rivera'', ''Molloy'', ''Opie'']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b=''Sons''), names)
print(results)
# Output: [''Brown & Sons'', ''Wilson & Sons'', ''Bartlett & Sons'', ...
1. Gran parte de esto se inspiró en su respuesta, que probablemente debería haber sido aceptada en su lugar. Pero como esta está atascada en la parte superior, parecía mejor mejorarla para futuros lectores.
Otra alternativa simple es envolver los parámetros de su función en una tupla y luego envolver los parámetros que también deben pasarse en tuplas. Quizás esto no sea ideal cuando se trata de grandes datos. Creo que haría copias para cada tupla.
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Da la salida en algún orden aleatorio:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
Otra forma es pasar una lista de listas a una rutina de un argumento:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
Uno puede construir una lista de argumentos con su método favorito.
Puede usar las siguientes dos funciones para evitar escribir un contenedor para cada nueva función:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
Utilice la función de function
con las listas de argumentos arg_0
, arg_1
y arg_2
siguiente manera:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Una mejor manera es usar el decorador en lugar de escribir la función de envoltura a mano. Especialmente cuando tiene muchas funciones para asignar, el decorador le ahorrará tiempo al evitar escribir envoltorio para cada función. Por lo general, una función decorada no es seleccionable, sin embargo, podemos usar functools
para sortearla. Más disscusiones se pueden encontrar here .
Aqui el ejemplo
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
Entonces puedes mapearlo con argumentos comprimidos
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
Por supuesto, siempre puede usar Pool.starmap
en Python 3 (> = 3.3) como se menciona en otras respuestas.
Una mejor solución para python2:
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == ''__main__'':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
2 3 4
1 2 3
0 1 2
afuera[]:
[3, 5, 7]
Usando Python 3.3 o superior con pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
Resultado:
1 --- 4
2 --- 5
3 --- 6
También puede zip () más argumentos si lo desea: zip(a,b,c,d,e)
En caso de que quiera que se le pase un valor constante como argumento, debe usar import itertools
y luego zip(itertools.repeat(constant), a)
por ejemplo.
para python2, puedes usar este truco
def fun(a,b):
return a+b
pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
text = "test"
def unpack(args):
return args[0](*args[1:])
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == ''__main__'':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
# args is a list of tuples
# with the function to execute as the first item in each tuple
args = [(harvester, text, c) for c in case]
# doing it this way, we can pass any function
# and we don''t need to define a wrapper for each different function
# if we need to use more than one
pool.map(unpack, args)
pool.close()
pool.join()