ylab color categoryorder python set multiprocessing

python - color - plotly layout



¿Se puede compartir un set() entre los procesos de Python? (1)

Un cambio obvio es utilizar un mp.Manager.dict() lugar del conjunto, y usar valores arbitrarios (por ejemplo, establecer the_dict[result_int] = 1 para indicar la membresía en el conjunto). Por cierto, así es como "todos" implementaron los conjuntos antes de que Python agregara el tipo de set , e incluso ahora los dictados y conjuntos se implementan básicamente por el mismo código bajo las cubiertas.

Agregado más tarde: confieso que no entiendo por qué usó tanto un conjunto como una lista en el código original, ya que las claves del conjunto son idénticas a los contenidos de la lista. Si el orden de entrada no es importante, ¿por qué no olvida la lista por completo? Luego, también podría soltar la capa de bloqueo necesaria en el original para mantener el conjunto y la lista sincronizados.

Al hacerlo, con la sugerencia dictada, toda la función se convertiría en lo siguiente:

def worker(shared_dict): # Do some processing and get an integer result_int = some_other_code() shared_dict[result_int] = 1

Otros procesos podrían hacer shared_dict.pop() luego para obtener un valor a la vez (aunque, no, no podían esperar en .pop() como lo hacen para el .get() una cola.

Y uno más: ¿considerar el uso de conjuntos locales (proceso local)? Correrán mucho más rápido. Entonces, cada trabajador no agregará ningún duplicado que conozca, pero puede haber duplicados en todos los procesos. Su código no dio ninguna out_q sobre lo que hace el consumidor out_q , pero si solo hay uno, entonces un conjunto local también podría eliminar los duplicados entre procesos. ¿O tal vez la carga de la memoria es demasiado alta entonces? No puedo adivinar desde aquí ;-)

BIG EDIT

Voy a sugerir un enfoque diferente: no use mp.Manager en absoluto. La mayoría de las veces que veo que la gente lo usa, lo lamentan, porque no está haciendo lo que creen que está haciendo. Lo que piensan: es el suministro de objetos físicamente compartidos. Qué está haciendo: está suministrando objetos compartidos semánticamente . Físicamente, viven en Otro, el proceso debajo de las cubiertas y las operaciones en los objetos se envían a ese último proceso, donde el proceso las realiza en su propio espacio de direcciones. No es físicamente compartido en absoluto. Por lo tanto, si bien puede ser muy conveniente, existen importantes gastos indirectos entre procesos incluso para las operaciones más simples.

Por lo tanto, sugiero utilizar un conjunto único y ordinario en un solo proceso, que será el único código relacionado con la eliminación de duplicados. Los procesos de trabajo producen impresiones sin preocuparse por los duplicados, simplemente pasan los apuntes. Un mp.Queue está bien para eso (una vez más, no hay una necesidad real de un mp.Manager.Queue ).

Como tal, que es un programa ejecutable completo:

N = 20 def worker(outq): from random import randrange from time import sleep while True: i = randrange(N) outq.put(i) sleep(0.1) def uniqueifier(inq, outq): seen = set() while True: i = inq.get() if i not in seen: seen.add(i) outq.put(i) def consumer(inq): n = 0 for _ in range(N): i = inq.get() print(i) if __name__ == "__main__": import multiprocessing as mp q1 = mp.Queue() q2 = mp.Queue() consume = mp.Process(target=consumer, args=(q2,)) consume.start() procs = [mp.Process(target=uniqueifier, args=(q1, q2))] for _ in range(4): procs.append(mp.Process(target=worker, args=(q1,))) for p in procs: p.start() consume.join() for p in procs: p.terminate()

La segunda cola transferida a uniqueifier desempeña el papel de su cola original: solo entrega números enteros únicos. No se intenta "compartir memoria", por lo que no se pagan costos por eso. La única comunicación entre procesos es a través de operaciones mp.Queue fáciles y explícitas. Solo hay un conjunto, y como no se comparte de ninguna manera, se ejecuta lo más rápido posible.

En efecto, esto solo configura una tubería simple, aunque con múltiples entradas.

Estoy usando multiprocesamiento en Python 2.7 para procesar un gran conjunto de datos. A medida que se ejecuta cada proceso, agrega enteros a un mp.Manager.Queue () compartido, pero solo si algún otro proceso no ha agregado el mismo entero. Como no se puede hacer una prueba de membresía al estilo "en" para Colas, la forma en que lo hago es verificar cada int para membresía en un mp.Manager.list compartido (). La lista eventualmente tendrá ~ 30 millones de entradas, por lo que las pruebas de membresía serán extremadamente lentas, anulando la ventaja del multiprocesamiento.

Aquí hay una versión simplificada de lo que estoy haciendo:

import multiprocessing as mp def worker(shared_list, out_q, lock): # Do some processing and get an integer result_int = some_other_code() # Use a lock to ensure nothing is added to the list in the meantime lock.acquire() # This lookup can take forever when the list is large if result_int not in shared_list: out_q.put(result_int) shared_list.append(result_int) lock.release() manager = mp.Manager() shared_list = manager.list() lock = manager.lock() out_q = manager.Queue() for i in range(8): p = mp.Process(target=worker, args=(shared_list, out_q, lock)) p.start()

Anteriormente intenté usar un conjunto () en lugar de un mp.Manager.list (), pero parece que cada proceso tiene su propio espacio de memoria, por lo que cuando actualicé el conjunto, no se sincronizó entre los procesos. Por lo tanto, cambié al enfoque actual.

Aquí está más o menos cómo intenté previamente usar un conjunto (): importar multiprocesamiento como mp

def worker(shared_set, out_q, lock): # Do some processing and get an integer result_int = some_other_code() # Use a lock to ensure nothing is added to the set in the meantime lock.acquire() # This lookup is fast, but the set doesn''t reflect additions made by other processes. if result_int not in shared_set: out_q.put(result_int) shared_set.add(result_int) lock.release() manager = mp.Manager() lock = manager.lock() out_q = manager.Queue() # This set will NOT synchronize between processes shared_set = set() for i in range(8): p = mp.Process(target=worker, args=(shared_set, out_q, lock)) p.start()

Nota: estos ejemplos no han sido probados y simplemente representan las partes relevantes de mi código.

¿Hay alguna manera de compartir conjuntos en los procesos, o realizar búsquedas de membresía más rápidas?

EDITAR: Un poco más de información: el out_q es consumido por otro proceso que escribe los datos en un único archivo de salida. No puede haber duplicados. Si genero un número entero y se encuentra que es un duplicado, el proceso debe retroceder y generar el siguiente mejor entero.