threading set_start_method multiple ejemplos python multiprocessing shared-memory pool

set_start_method - python pool process



¿Cómo combinar Pool.map con Array(memoria compartida) en el multiproceso de Python? (4)

Tengo una gran cantidad de datos (solo lectura) de datos que quiero que sean procesados ​​por múltiples procesos en paralelo.

Me gusta la función Pool.map y me gustaría usarla para calcular funciones en esos datos en paralelo.

Vi que se puede usar la clase Value o Array para usar datos de memoria compartida entre procesos. Pero cuando trato de usar esto obtengo un RuntimeError: ''Los objetos SynchronizedString solo deberían compartirse entre procesos a través de la herencia cuando se utiliza la función Pool.map:

Aquí hay un ejemplo simplificado de lo que estoy tratando de hacer:

from sys import stdin from multiprocessing import Pool, Array def count_it( arr, key ): count = 0 for c in arr: if c == key: count += 1 return count if __name__ == ''__main__'': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" # want to share it using shared memory toShare = Array(''c'', testData) # this works print count_it( toShare, "a" ) pool = Pool() # RuntimeError here print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

¿Alguien puede decirme qué estoy haciendo mal aquí?

Entonces, lo que me gustaría hacer es pasar información sobre una matriz asignada recientemente compartida de memoria compartida a los procesos después de que se hayan creado en el grupo de procesos.


El módulo multiprocessing.sharedctypes proporciona funciones para asignar objetos ctypes desde la memoria compartida que pueden ser heredados por procesos secundarios.

Entonces su uso de sharedctypes es incorrecto. ¿Desea heredar esta matriz del proceso principal o prefiere pasarla explícitamente? En el primer caso, debe crear una variable global como sugieren otras respuestas. Pero no necesita usar sharedctypes para pasarlo explícitamente, simplemente pase testData original.

Por cierto, tu uso de Pool.map() es incorrecto. Tiene la misma interfaz que la función incorporada de map() (¿la starmap() con starmap() ?). A continuación se muestra un ejemplo de trabajo con, pasando matriz explícitamente:

from multiprocessing import Pool def count_it( (arr, key) ): count = 0 for c in arr: if c == key: count += 1 return count if __name__ == ''__main__'': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" pool = Pool() print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])


El problema que veo es que Pool no admite el decapado de datos compartidos a través de su lista de argumentos. Eso es lo que significa el mensaje de error por "los objetos solo deben compartirse entre procesos a través de la herencia". Los datos compartidos deben heredarse, es decir, ser globales si desea compartirlos utilizando la clase Pool.

Si necesita pasarlos explícitamente, puede que tenga que usar multiprocesamiento. Procesar. Aquí está su ejemplo revisado:

from multiprocessing import Process, Array, Queue def count_it( q, arr, key ): count = 0 for c in arr: if c == key: count += 1 q.put((key, count)) if __name__ == ''__main__'': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" # want to share it using shared memory toShare = Array(''c'', testData) q = Queue() keys = [''a'', ''b'', ''s'', ''d''] workers = [Process(target=count_it, args = (q, toShare, key)) for key in keys] for p in workers: p.start() for p in workers: p.join() while not q.empty(): print q.get(),

Salida: (''s'', 9) (''a'', 2) (''b'', 3) (''d'', 12)

El orden de los elementos de la cola puede variar.

Para hacer esto más genérico y similar a Pool, podría crear un número fijo de N Processes, dividir la lista de claves en N partes y luego usar una función de contenedor como el objetivo de proceso, que invocará count_it para cada clave de la lista. se pasa, como:

def wrapper( q, arr, keys ): for k in keys: count_it(q, arr, k)


Intentando de nuevo ya que acabo de ver la recompensa;)

Básicamente, creo que el mensaje de error significa lo que dice: multiprocesamiento de memoria compartida Las matrices no se pueden pasar como argumentos (encurtidos). No tiene sentido serializar los datos; el punto es que los datos son memoria compartida. Entonces tienes que hacer que la matriz compartida sea global. Creo que es mejor ponerlo como el atributo de un módulo, como en mi primera respuesta, pero simplemente dejarlo como una variable global en tu ejemplo también funciona bien. Tomando en cuenta su punto de no querer establecer los datos antes del tenedor, aquí hay un ejemplo modificado. Si quisiera tener más de una matriz compartida posible (y es por eso que quería pasar aShare como argumento) podría hacer una lista global de matrices compartidas, y simplemente pasar el índice a count_it (que se convertiría for c in toShare[i]: .

from sys import stdin from multiprocessing import Pool, Array, Process def count_it( key ): count = 0 for c in toShare: if c == key: count += 1 return count if __name__ == ''__main__'': # allocate shared array - want lock=False in this case since we # aren''t writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array(''c'', maxLength, lock=False) # fork pool = Pool() # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] )

[EDITAR: Lo anterior no funciona en Windows porque no se usa fork. Sin embargo, a continuación funciona en Windows, sigue usando Pool, por lo que creo que este es el más cercano a lo que desea:

from sys import stdin from multiprocessing import Pool, Array, Process import mymodule def count_it( key ): count = 0 for c in mymodule.toShare: if c == key: count += 1 return count def initProcess(share): mymodule.toShare = share if __name__ == ''__main__'': # allocate shared array - want lock=False in this case since we # aren''t writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array(''c'', maxLength, lock=False) # fork pool = Pool(initializer=initProcess,initargs=(toShare,)) # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] )

No estoy seguro de por qué el mapa no recortará la matriz, pero Process and Pool lo hará, creo que tal vez se haya transferido en el punto de la inicialización del subproceso en Windows. Tenga en cuenta que los datos aún están configurados después de la horquilla.


Si los datos solo se leen, conviértalo en una variable en un módulo antes del tenedor de Pool. Entonces, todos los procesos secundarios deberían poder acceder a él, y no se copiará siempre que no se escriba en él.

import myglobals # anything (empty .py file) myglobals.data = [] def count_it( key ): count = 0 for c in myglobals.data: if c == key: count += 1 return count if __name__ == ''__main__'': myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" pool = Pool() print pool.map( count_it, ["a", "b", "s", "d"] )

Si quiere intentar utilizar Array, podría intentarlo con el argumento de palabra clave lock=False (es verdadero de manera predeterminada).