set_start_method - python pool process
¿Cómo combinar Pool.map con Array(memoria compartida) en el multiproceso de Python? (4)
Tengo una gran cantidad de datos (solo lectura) de datos que quiero que sean procesados por múltiples procesos en paralelo.
Me gusta la función Pool.map y me gustaría usarla para calcular funciones en esos datos en paralelo.
Vi que se puede usar la clase Value o Array para usar datos de memoria compartida entre procesos. Pero cuando trato de usar esto obtengo un RuntimeError: ''Los objetos SynchronizedString solo deberían compartirse entre procesos a través de la herencia cuando se utiliza la función Pool.map:
Aquí hay un ejemplo simplificado de lo que estoy tratando de hacer:
from sys import stdin
from multiprocessing import Pool, Array
def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == ''__main__'':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array(''c'', testData)
# this works
print count_it( toShare, "a" )
pool = Pool()
# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
¿Alguien puede decirme qué estoy haciendo mal aquí?
Entonces, lo que me gustaría hacer es pasar información sobre una matriz asignada recientemente compartida de memoria compartida a los procesos después de que se hayan creado en el grupo de procesos.
Entonces su uso de sharedctypes
es incorrecto. ¿Desea heredar esta matriz del proceso principal o prefiere pasarla explícitamente? En el primer caso, debe crear una variable global como sugieren otras respuestas. Pero no necesita usar sharedctypes
para pasarlo explícitamente, simplemente pase testData
original.
Por cierto, tu uso de Pool.map()
es incorrecto. Tiene la misma interfaz que la función incorporada de map()
(¿la starmap()
con starmap()
?). A continuación se muestra un ejemplo de trabajo con, pasando matriz explícitamente:
from multiprocessing import Pool
def count_it( (arr, key) ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == ''__main__'':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
El problema que veo es que Pool no admite el decapado de datos compartidos a través de su lista de argumentos. Eso es lo que significa el mensaje de error por "los objetos solo deben compartirse entre procesos a través de la herencia". Los datos compartidos deben heredarse, es decir, ser globales si desea compartirlos utilizando la clase Pool.
Si necesita pasarlos explícitamente, puede que tenga que usar multiprocesamiento. Procesar. Aquí está su ejemplo revisado:
from multiprocessing import Process, Array, Queue
def count_it( q, arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
q.put((key, count))
if __name__ == ''__main__'':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array(''c'', testData)
q = Queue()
keys = [''a'', ''b'', ''s'', ''d'']
workers = [Process(target=count_it, args = (q, toShare, key))
for key in keys]
for p in workers:
p.start()
for p in workers:
p.join()
while not q.empty():
print q.get(),
Salida: (''s'', 9) (''a'', 2) (''b'', 3) (''d'', 12)
El orden de los elementos de la cola puede variar.
Para hacer esto más genérico y similar a Pool, podría crear un número fijo de N Processes, dividir la lista de claves en N partes y luego usar una función de contenedor como el objetivo de proceso, que invocará count_it para cada clave de la lista. se pasa, como:
def wrapper( q, arr, keys ):
for k in keys:
count_it(q, arr, k)
Intentando de nuevo ya que acabo de ver la recompensa;)
Básicamente, creo que el mensaje de error significa lo que dice: multiprocesamiento de memoria compartida Las matrices no se pueden pasar como argumentos (encurtidos). No tiene sentido serializar los datos; el punto es que los datos son memoria compartida. Entonces tienes que hacer que la matriz compartida sea global. Creo que es mejor ponerlo como el atributo de un módulo, como en mi primera respuesta, pero simplemente dejarlo como una variable global en tu ejemplo también funciona bien. Tomando en cuenta su punto de no querer establecer los datos antes del tenedor, aquí hay un ejemplo modificado. Si quisiera tener más de una matriz compartida posible (y es por eso que quería pasar aShare como argumento) podría hacer una lista global de matrices compartidas, y simplemente pasar el índice a count_it (que se convertiría for c in toShare[i]:
.
from sys import stdin
from multiprocessing import Pool, Array, Process
def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count
if __name__ == ''__main__'':
# allocate shared array - want lock=False in this case since we
# aren''t writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array(''c'', maxLength, lock=False)
# fork
pool = Pool()
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[EDITAR: Lo anterior no funciona en Windows porque no se usa fork. Sin embargo, a continuación funciona en Windows, sigue usando Pool, por lo que creo que este es el más cercano a lo que desea:
from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule
def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count
def initProcess(share):
mymodule.toShare = share
if __name__ == ''__main__'':
# allocate shared array - want lock=False in this case since we
# aren''t writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array(''c'', maxLength, lock=False)
# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
No estoy seguro de por qué el mapa no recortará la matriz, pero Process and Pool lo hará, creo que tal vez se haya transferido en el punto de la inicialización del subproceso en Windows. Tenga en cuenta que los datos aún están configurados después de la horquilla.
Si los datos solo se leen, conviértalo en una variable en un módulo antes del tenedor de Pool. Entonces, todos los procesos secundarios deberían poder acceder a él, y no se copiará siempre que no se escriba en él.
import myglobals # anything (empty .py file)
myglobals.data = []
def count_it( key ):
count = 0
for c in myglobals.data:
if c == key:
count += 1
return count
if __name__ == ''__main__'':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )
Si quiere intentar utilizar Array, podría intentarlo con el argumento de palabra clave lock=False
(es verdadero de manera predeterminada).