programming parallel python python-2.7 while-loop multiprocessing python-multiprocessing

python - parallel - Multiprocesamiento básico con while loop



parallel programming python (1)

Soy nuevo en el paquete de multiprocessing en python y es probable que mi confusión sea fácil para alguien que sabe más para aclarar. He estado leyendo sobre simultaneidad y he buscado otras preguntas como esta y no he encontrado nada. (FYI NO quiero usar multithreading porque el GIL ralentizará mucho mi aplicación).

Estoy pensando en el marco de los eventos. Quiero tener múltiples procesos en ejecución, esperando que ocurra un evento. Si el evento ocurre, se asigna a un proceso particular, que opera y luego vuelve a su estado inactivo. Puede haber una mejor manera de hacerlo, pero mi razonamiento es que debería engendrar todos los procesos una vez y mantenerlos abiertos indefinidamente, en lugar de crear y luego cerrar un proceso cada vez que ocurre un evento. La velocidad es un problema para mí y mis eventos pueden ocurrir miles de veces por segundo.

Se me ocurrió el siguiente ejemplo de juguete, que está destinado a enviar números pares a un proceso y números impares a otro. Ambos procesos son iguales, solo añaden el número a una lista.

from multiprocessing import Process, Queue, Pipe slist=[''even'',''odd''] Q={} Q[''even''] = Queue() Q[''odd''] = Queue() ev,od = [],[] Q[''even''].put(ev) Q[''odd''].put(od) P={} P[''even''] = Pipe() P[''odd''] = Pipe() def add_num(s): """ The worker function, invoked in a process. The results are placed in a list that''s pushed to a queue.""" # while True : if not P[s][1].recv(): print s,''- do nothing'' else: d = Q[s].get() print d d.append(P[s][1].recv()) Q[s].put(d) print Q[s].get() P[s][0].send(False) print ''ya'' def piper(s,n): P[s][0].send(n) for k in [S for S in slist if S != s]: P[k][0].send(False) add_num(s) procs = [ Process ( target=add_num, args=(i,) ) for i in [''even'',''odd'']] for s in slist: P[s][0].send(False) for p in procs: p.start() p.join() for i in range(10): print i if i%2==0: s = ''even'' else: s = ''odd'' piper(s,i) print ''results:'', Q[''odd''].get(),Q[''even''].get()

Este código produce lo siguiente:

even - do nothing

Cualquier comentario de los sabios sobre este problema, donde mi código o razonamiento no es suficiente, sería muy apreciado.


Este es un enfoque que he usado un par de veces con buen éxito:

  1. Lanzar un grupo de multiprocesamiento .

  2. Utilice un SyncManager de multiprocesamiento para crear múltiples colas (una para cada tipo de datos que debe manejarse de forma diferente).

  3. Utilice apply_async para iniciar las funciones que procesan datos. Al igual que las colas, debe haber una función para cada tipo de datos que debe procesarse de manera diferente. Cada función iniciada obtiene la cola que corresponde a sus datos como un argumento de entrada. Las funciones harán su trabajo en un ciclo infinito que comienza obteniendo datos de la cola.

  4. Comience el procesamiento. Durante el procesamiento, el proceso principal ordena los datos y decide qué función debería manejarlos. Una vez que se toma la decisión, los datos se colocan en la cola que corresponde a esa función.

  5. Después de que se hayan manejado todos los datos, el proceso principal pone un valor llamado "píldora de veneno" en cada cola. La píldora venenosa es un valor que todos los procesos del trabajador reconocen como una señal para salir. Dado que las colas son las primeras en entrar, primero en salir (FIFO), se garantiza que extraerán la píldora venenosa como último elemento de las colas.

  6. Cierre y únase al grupo de multiprocesamiento.

Código

A continuación se muestra un ejemplo de este algoritmo. El objetivo del código de ejemplo es utilizar el algoritmo descrito anteriormente para dividir los números impares por 2, e incluso los números por -2. Todos los resultados se colocan en una lista compartida accesible por el proceso principal.

import multiprocessing POISON_PILL = "STOP" def process_odds(in_queue, shared_list): while True: # block until something is placed on the queue new_value = in_queue.get() # check to see if we just got the poison pill if new_value == POISON_PILL: break # we didn''t, so do the processing and put the result in the # shared data structure shared_list.append(new_value/2) return def process_evens(in_queue, shared_list): while True: new_value = in_queue.get() if new_value == POISON_PILL: break shared_list.append(new_value/-2) return def main(): # create a manager - it lets us share native Python object types like # lists and dictionaries without worrying about synchronization - # the manager will take care of it manager = multiprocessing.Manager() # now using the manager, create our shared data structures odd_queue = manager.Queue() even_queue = manager.Queue() shared_list = manager.list() # lastly, create our pool of workers - this spawns the processes, # but they don''t start actually doing anything yet pool = multiprocessing.Pool() # now we''ll assign two functions to the pool for them to run - # one to handle even numbers, one to handle odd numbers odd_result = pool.apply_async(process_odds, (odd_queue, shared_list)) even_result = pool.apply_async(process_evens, (even_queue, shared_list)) # this code doesn''t do anything with the odd_result and even_result # variables, but you have the flexibility to check exit codes # and other such things if you want - see docs for AsyncResult objects # now that the processes are running and waiting for their queues # to have something, lets give them some work to do by iterating # over our data, deciding who should process it, and putting it in # their queue for i in range(6): if (i % 2) == 0: # use mod operator to see if "i" is even even_queue.put(i) else: odd_queue.put(i) # now we''ve finished giving the processes their work, so send the # poison pill to tell them to exit even_queue.put(POISON_PILL) odd_queue.put(POISON_PILL) # wait for them to exit pool.close() pool.join() # now we can check the results print(shared_list) # ...and exit! return if __name__ == "__main__": main()

Salida

Este código produce esta salida:

[0.5, -0.0, 1.5, -1.0, 2.5, -2.0]

Tenga en cuenta que el orden de los resultados es impredecible, porque no podemos garantizar en qué orden las funciones podrán obtener elementos de sus colas y poner los resultados en la lista. Pero sin duda puede hacer cualquier procesamiento posterior que necesite, que podría incluir la clasificación.

Razón fundamental

Creo que esta sería una buena solución para su problema porque:

  1. Estás en lo cierto de que hay una gran sobrecarga para los procesos de desove. Este enfoque de productor único / consumidor múltiple elimina eso cuando usa un grupo para mantener a los trabajadores con vida durante todo el programa.

  2. Responde a sus inquietudes acerca de poder manejar datos de manera diferente según los atributos de los datos. En sus comentarios, expresó su preocupación acerca de poder enviar datos a procesos específicos. En este enfoque, puede elegir a qué procesos dar datos, porque debe elegir qué cola colocar. (Por cierto, creo que estás pensando en la función pool.map , que, como crees correctamente, no te permite realizar diferentes operaciones en el mismo trabajo. apply_async sí).

  3. He encontrado que es muy expandible y flexible. ¿Necesita agregar más tipos de manejo de datos? Simplemente escriba su función de controlador, agregue una cola más y agregue lógica a main para enrutar los datos a su nueva función. ¿Está encontrando que una cola está siendo respaldada y se está convirtiendo en un cuello de botella? Puede llamar a apply_async con la misma función de destino y hacer cola varias veces para que varios trabajadores trabajen en la misma cola. Solo asegúrate de dar a la cola suficientes píldoras venenosas para que todos los trabajadores obtengan una.

Limitaciones

Cualquier información que quiera pasar en una cola debe ser seleccionable (serializable) por el módulo pickle. Mire aquí para ver qué se puede y no se puede escabechar.

Probablemente haya otras limitaciones también, pero no puedo pensar en ninguna otra fuera de mi cabeza.