multiple ejemplos async python multiprocessing pool map-function

ejemplos - Pasar múltiples parámetros a la función pool.map() en Python



python pool map multiple arguments (3)

Esta pregunta ya tiene una respuesta aquí:

Necesito alguna forma de usar una función dentro de pool.map () que acepte más de un parámetro. Según mi comprensión, la función objetivo de pool.map () solo puede tener un iterable como parámetro, pero ¿hay alguna forma de que también pueda pasar otros parámetros? En este caso, necesito pasar algunas variables de configuración, como mi Lock () y la información de inicio de sesión a la función de destino.

He intentado investigar y creo que puedo usar funciones parciales para que funcione. Sin embargo, no entiendo completamente cómo funcionan. ¡Cualquier ayuda sería muy apreciada! Aquí hay un ejemplo simple de lo que quiero hacer:

def target(items, lock): for item in items: # Do cool stuff if (... some condition here ...): lock.acquire() # Write to stdout or logfile, etc. lock.release() def main(): iterable = [1, 2, 3, 4, 5] pool = multiprocessing.Pool() pool.map(target(PASS PARAMS HERE), iterable) pool.close() pool.join()


En caso de que no tenga acceso a functools.partial , también podría usar una función de contenedor para esto.

def target(lock): def wrapped_func(items): for item in items: # Do cool stuff if (... some condition here ...): lock.acquire() # Write to stdout or logfile, etc. lock.release() return wrapped_func def main(): iterable = [1, 2, 3, 4, 5] pool = multiprocessing.Pool() lck = multiprocessing.Lock() pool.map(target(lck), iterable) pool.close() pool.join()

Esto hace que target() convierta en una función que acepte un bloqueo (o los parámetros que quiera dar), y devolverá una función que solo acepta un iterable como entrada, pero aún puede usar todos sus otros parámetros. Eso es lo que finalmente se pasa a pool.map() , que luego debería ejecutarse sin problemas.


Podría usar una función de mapa que permita múltiples argumentos, al igual que la bifurcación de multiprocessing encuentra en pathos .

>>> from pathos.multiprocessing import ProcessingPool as Pool >>> >>> def add_and_subtract(x,y): ... return x+y, x-y ... >>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1)) >>> res [(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)] >>> Pool().map(add_and_subtract, *zip(*res)) [(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

pathos permite jerarquizar fácilmente mapas paralelos jerárquicos con múltiples entradas, por lo que podemos extender nuestro ejemplo para demostrar eso.

>>> from pathos.multiprocessing import ThreadingPool as TPool >>> >>> res = TPool().amap(add_and_subtract, *zip(*Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1)))) >>> res.get() [(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

Aún más divertido, es construir una función anidada que podamos pasar al Pool. Esto es posible porque pathos usa dill , que puede serializar casi cualquier cosa en python.

>>> def build_fun_things(f, g): ... def do_fun_things(x, y): ... return f(x,y), g(x,y) ... return do_fun_things ... >>> def add(x,y): ... return x+y ... >>> def sub(x,y): ... return x-y ... >>> neato = build_fun_things(add, sub) >>> >>> res = TPool().imap(neato, *zip(*Pool().map(neato, range(0,20,2), range(-5,5,1)))) >>> list(res) [(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

Sin embargo, si no puede salir de la biblioteca estándar, deberá hacerlo de otra manera. Su mejor apuesta en ese caso es usar multiprocessing.starmap como se ve aquí: Python multiprocessing pool.map para múltiples argumentos (anotado por @Roberto en los comentarios en la publicación del OP)

Obtenga pathos aquí: https://github.com/uqfoundation


Puede usar functools.partial para esto (como sospechaba):

from functools import partial def target(lock, iterable_item): for item in iterable_item: # Do cool stuff if (... some condition here ...): lock.acquire() # Write to stdout or logfile, etc. lock.release() def main(): iterable = [1, 2, 3, 4, 5] pool = multiprocessing.Pool() l = multiprocessing.Lock() func = partial(target, l) pool.map(func, iterable) pool.close() pool.join()

Ejemplo:

def f(a, b, c): print("{} {} {}".format(a, b, c)) def main(): iterable = [1, 2, 3, 4, 5] pool = multiprocessing.Pool() a = "hi" b = "there" func = partial(f, a, b) pool.map(func, iterable) pool.close() pool.join() if __name__ == "__main__": main()

Salida:

hi there 1 hi there 2 hi there 3 hi there 4 hi there 5