thread lock dummy async apply_async python multiprocessing

lock - python pool apply_async



Pool.map multiproceso de Python para mĂșltiples argumentos (16)

En la biblioteca de multiprocesamiento de Python, ¿existe una variante de pool.map que admita múltiples argumentos?

text = "test" def harvester(text, case): X = case[0] text+ str(X) if __name__ == ''__main__'': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET pool.map(harvester(text,case),case, 1) pool.close() pool.join()


¿Existe una variante de pool.map que soporte múltiples argumentos?

Python 3.3 incluye el Pool.starmap :

#!/usr/bin/env python3 from functools import partial from itertools import repeat from multiprocessing import Pool, freeze_support def func(a, b): return a + b def main(): a_args = [1,2,3] second_arg = 1 with Pool() as pool: L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)]) M = pool.starmap(func, zip(a_args, repeat(second_arg))) N = pool.map(partial(func, b=second_arg), a_args) assert L == M == N if __name__=="__main__": freeze_support() main()

Para versiones anteriores:

#!/usr/bin/env python2 import itertools from multiprocessing import Pool, freeze_support def func(a, b): print a, b def func_star(a_b): """Convert `f([1,2])` to `f(1,2)` call.""" return func(*a_b) def main(): pool = Pool() a_args = [1,2,3] second_arg = 1 pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg))) if __name__=="__main__": freeze_support() main()

Salida

1 1 2 1 3 1

Observe cómo se itertools.izip() e itertools.repeat() aquí.

Debido al bugs.python.org/issue5228 no puede usar functools.partial() o capacidades similares en Python 2.6, por lo que la simple función de envoltura func_star() debe definirse explícitamente. Vea también la solución sugerida por uptimebox .


# "Cómo tomar múltiples argumentos".

def f1(args): a, b, c = args[0] , args[1] , args[2] return a+b+c if __name__ == "__main__": import multiprocessing pool = multiprocessing.Pool(4) result1 = pool.map(f1, [ [1,2,3] ]) print(result1)


Creo que lo de abajo será mejor.

def multi_run_wrapper(args): return add(*args) def add(x,y): return x+y if __name__ == "__main__": from multiprocessing import Pool pool = Pool(4) results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)]) print results

salida

[3, 5, 7]


Desde Python 3.4.4, puedes usar multiprocessing.get_context () para obtener un objeto de contexto para usar múltiples métodos de inicio:

import multiprocessing as mp def foo(q, h, w): q.put(h + '' '' + w) print(h + '' '' + w) if __name__ == ''__main__'': ctx = mp.get_context(''spawn'') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,''hello'', ''world'')) p.start() print(q.get()) p.join()

O simplemente reemplaza

pool.map(harvester(text,case),case, 1)

por:

pool.apply_async(harvester(text,case),case, 1)


Después de haber aprendido acerca de itertools en la respuesta de JF Sebastian , decidí ir un paso más allá y escribir un paquete parmap que se encargue de la paralelización, ofreciendo funciones de map y starmap en python-2.7 y python-3.2 (y más tarde también) que pueden tomar cualquier número de los argumentos posicionales.

Instalación

pip install parmap

Cómo paralelizar:

import parmap # If you want to do: y = [myfunction(x, argument1, argument2) for x in mylist] # In parallel: y = parmap.map(myfunction, mylist, argument1, argument2) # If you want to do: z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist] # In parallel: z = parmap.starmap(myfunction, mylist, argument1, argument2) # If you want to do: listx = [1, 2, 3, 4, 5, 6] listy = [2, 3, 4, 5, 6, 7] param = 3.14 param2 = 42 listz = [] for (x, y) in zip(listx, listy): listz.append(myfunction(x, y, param1, param2)) # In parallel: listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

He cargado parmap en PyPI y en un repositorio de github .

Como ejemplo, la pregunta se puede responder de la siguiente manera:

import parmap def harvester(case, text): X = case[0] text+ str(X) if __name__ == "__main__": case = RAW_DATASET # assuming this is an iterable parmap.map(harvester, case, "test", chunksize=1)


En la documentación oficial se afirma que apoya solo un argumento iterable. Me gusta usar apply_async en tales casos. En tu caso yo haría:

from multiprocessing import Process, Pool, Manager text = "test" def harvester(text, case, q = None): X = case[0] res = text+ str(X) if q: q.put(res) return res def block_until(q, results_queue, until_counter=0): i = 0 while i < until_counter: results_queue.put(q.get()) i+=1 if __name__ == ''__main__'': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET m = Manager() q = m.Queue() results_queue = m.Queue() # when it completes results will reside in this queue blocking_process = Process(block_until, (q, results_queue, len(case))) blocking_process.start() for c in case: try: res = pool.apply_async(harvester, (text, case, q = None)) res.get(timeout=0.1) except: pass blocking_process.join()


Hay una bifurcación de multiprocessing llamada pathos ( nota: use la versión en github ) que no necesita starmap mapa de starmap : las funciones del mapa reflejan la API del mapa de python, por lo que el mapa puede tomar múltiples argumentos. Con pathos , generalmente también puede hacer multiprocesamiento en el intérprete, en lugar de estar atrapado en el bloque __main__ . Pathos se debe a un lanzamiento, después de algunas actualizaciones leves, principalmente la conversión a Python 3.x.

Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> def func(a,b): ... print a,b ... >>> >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> pool.map(func, [1,2,3], [1,1,1]) 1 1 2 1 3 1 [None, None, None] >>> >>> # also can pickle stuff like lambdas >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> >>> # also does asynchronous map >>> result = pool.amap(pow, [1,2,3], [4,5,6]) >>> result.get() [1, 32, 729] >>> >>> # or can return a map iterator >>> result = pool.imap(pow, [1,2,3], [4,5,6]) >>> result <processing.pool.IMapIterator object at 0x110c2ffd0> >>> list(result) [1, 32, 729]


La respuesta a esto depende de la versión y de la situación. La respuesta más general para las versiones recientes de Python (desde la versión 3.3) fue descrita a continuación por JF Sebastian . 1 Utiliza el método Pool.starmap , que acepta una secuencia de tuplas de argumentos. Luego, automáticamente desempaqueta los argumentos de cada tupla y los pasa a la función dada:

import multiprocessing from itertools import product def merge_names(a, b): return ''{} & {}''.format(a, b) if __name__ == ''__main__'': names = [''Brown'', ''Wilson'', ''Bartlett'', ''Rivera'', ''Molloy'', ''Opie''] with multiprocessing.Pool(processes=3) as pool: results = pool.starmap(merge_names, product(names, repeat=2)) print(results) # Output: [''Brown & Brown'', ''Brown & Wilson'', ''Brown & Bartlett'', ...

Para versiones anteriores de Python, deberá escribir una función auxiliar para desempaquetar los argumentos explícitamente. Si desea utilizar with , también deberá escribir un contenedor para convertir Pool en un administrador de contexto. (Gracias a muon por señalar esto).

import multiprocessing from itertools import product from contextlib import contextmanager def merge_names(a, b): return ''{} & {}''.format(a, b) def merge_names_unpack(args): return merge_names(*args) @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() if __name__ == ''__main__'': names = [''Brown'', ''Wilson'', ''Bartlett'', ''Rivera'', ''Molloy'', ''Opie''] with poolcontext(processes=3) as pool: results = pool.map(merge_names_unpack, product(names, repeat=2)) print(results) # Output: [''Brown & Brown'', ''Brown & Wilson'', ''Brown & Bartlett'', ...

En casos más simples, con un segundo argumento fijo, también puede usar partial , pero solo en Python 2.7+.

import multiprocessing from functools import partial from contextlib import contextmanager @contextmanager def poolcontext(*args, **kwargs): pool = multiprocessing.Pool(*args, **kwargs) yield pool pool.terminate() def merge_names(a, b): return ''{} & {}''.format(a, b) if __name__ == ''__main__'': names = [''Brown'', ''Wilson'', ''Bartlett'', ''Rivera'', ''Molloy'', ''Opie''] with poolcontext(processes=3) as pool: results = pool.map(partial(merge_names, b=''Sons''), names) print(results) # Output: [''Brown & Sons'', ''Wilson & Sons'', ''Bartlett & Sons'', ...

1. Gran parte de esto se inspiró en su respuesta, que probablemente debería haber sido aceptada en su lugar. Pero como esta está atascada en la parte superior, parecía mejor mejorarla para futuros lectores.


Otra alternativa simple es envolver los parámetros de su función en una tupla y luego envolver los parámetros que también deben pasarse en tuplas. Quizás esto no sea ideal cuando se trata de grandes datos. Creo que haría copias para cada tupla.

0 1 2 3 1 2 3 4 2 3 4 5 3 4 5 6 4 5 6 7 5 6 7 8 7 8 9 10 6 7 8 9 8 9 10 11 9 10 11 12 [6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

Da la salida en algún orden aleatorio:

from multiprocessing import Pool def func((i, (a, b))): print i, a, b return a + b pool = Pool(3) pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])


Otra forma es pasar una lista de listas a una rutina de un argumento:

import os from multiprocessing import Pool def task(args): print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1] pool = Pool() pool.map(task, [ [1,2], [3,4], [5,6], [7,8] ])

Uno puede construir una lista de argumentos con su método favorito.


Puede usar las siguientes dos funciones para evitar escribir un contenedor para cada nueva función:

import itertools from multiprocessing import Pool def universal_worker(input_pair): function, args = input_pair return function(*args) def pool_args(function, *args): return zip(itertools.repeat(function), zip(*args))

Utilice la función de function con las listas de argumentos arg_0 , arg_1 y arg_2 siguiente manera:

pool = Pool(n_core) list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2) pool.close() pool.join()


Una mejor manera es usar el decorador en lugar de escribir la función de envoltura a mano. Especialmente cuando tiene muchas funciones para asignar, el decorador le ahorrará tiempo al evitar escribir envoltorio para cada función. Por lo general, una función decorada no es seleccionable, sin embargo, podemos usar functools para sortearla. Más disscusiones se pueden encontrar here .

Aqui el ejemplo

def unpack_args(func): from functools import wraps @wraps(func) def wrapper(args): if isinstance(args, dict): return func(**args) else: return func(*args) return wrapper @unpack_args def func(x, y): return x + y

Entonces puedes mapearlo con argumentos comprimidos

np, xlist, ylist = 2, range(10), range(10) pool = Pool(np) res = pool.map(func, zip(xlist, ylist)) pool.close() pool.join()

Por supuesto, siempre puede usar Pool.starmap en Python 3 (> = 3.3) como se menciona en otras respuestas.


Una mejor solución para python2:

from multiprocessing import Pool def f((a,b,c,d)): print a,b,c,d return a + b + c +d if __name__ == ''__main__'': p = Pool(10) data = [(i+0,i+1,i+2,i+3) for i in xrange(10)] print(p.map(f, data)) p.close() p.join()

2 3 4

1 2 3

0 1 2

afuera[]:

[3, 5, 7]


Usando Python 3.3 o superior con pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool def write(i, x): print(i, "---", x) a = ["1","2","3"] b = ["4","5","6"] pool = ThreadPool(2) pool.starmap(write, zip(a,b)) pool.close() pool.join()

Resultado:

1 --- 4 2 --- 5 3 --- 6

También puede zip () más argumentos si lo desea: zip(a,b,c,d,e)

En caso de que quiera que se le pase un valor constante como argumento, debe usar import itertools y luego zip(itertools.repeat(constant), a) por ejemplo.


para python2, puedes usar este truco

def fun(a,b): return a+b pool = multiprocessing.Pool(processes=6) b=233 pool.map(lambda x:fun(x,b),range(1000))


text = "test" def unpack(args): return args[0](*args[1:]) def harvester(text, case): X = case[0] text+ str(X) if __name__ == ''__main__'': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET # args is a list of tuples # with the function to execute as the first item in each tuple args = [(harvester, text, c) for c in case] # doing it this way, we can pass any function # and we don''t need to define a wrapper for each different function # if we need to use more than one pool.map(unpack, args) pool.close() pool.join()