python - example - Objetos de memoria compartida en multiprocesamiento
parallel python (2)
Me encuentro con el mismo problema y escribí una pequeña clase de utilidad de memoria compartida para evitarlo.
Estoy usando multiprocesamiento.RawArray (lockfree), y también el acceso a las matrices no está sincronizado (lockfree), tenga cuidado de no disparar a sus propios pies.
Con la solución obtengo aceleraciones por un factor de aproximadamente 3 en un quad-core i7.
Aquí está el código: Siéntase libre de usarlo y mejorarlo, e informe cualquier error.
''''''
Created on 14.05.2013
@author: martin
''''''
import multiprocessing
import ctypes
import numpy as np
class SharedNumpyMemManagerError(Exception):
pass
''''''
Singleton Pattern
''''''
class SharedNumpyMemManager:
_initSize = 1024
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedNumpyMemManager, cls).__new__(
cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.lock = multiprocessing.Lock()
self.cur = 0
self.cnt = 0
self.shared_arrays = [None] * SharedNumpyMemManager._initSize
def __createArray(self, dimensions, ctype=ctypes.c_double):
self.lock.acquire()
# double size if necessary
if (self.cnt >= len(self.shared_arrays)):
self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)
# next handle
self.__getNextFreeHdl()
# create array in shared memory segment
shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))
# convert to numpy array vie ctypeslib
self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)
# do a reshape for correct dimensions
# Returns a masked array containing the same data, but with a new shape.
# The result is a view on the original array
self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)
# update cnt
self.cnt += 1
self.lock.release()
# return handle to the shared memory numpy array
return self.cur
def __getNextFreeHdl(self):
orgCur = self.cur
while self.shared_arrays[self.cur] is not None:
self.cur = (self.cur + 1) % len(self.shared_arrays)
if orgCur == self.cur:
raise SharedNumpyMemManagerError(''Max Number of Shared Numpy Arrays Exceeded!'')
def __freeArray(self, hdl):
self.lock.acquire()
# set reference to None
if self.shared_arrays[hdl] is not None: # consider multiple calls to free
self.shared_arrays[hdl] = None
self.cnt -= 1
self.lock.release()
def __getArray(self, i):
return self.shared_arrays[i]
@staticmethod
def getInstance():
if not SharedNumpyMemManager._instance:
SharedNumpyMemManager._instance = SharedNumpyMemManager()
return SharedNumpyMemManager._instance
@staticmethod
def createArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)
@staticmethod
def getArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)
@staticmethod
def freeArray(*args, **kwargs):
return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)
# Init Singleton on module load
SharedNumpyMemManager.getInstance()
if __name__ == ''__main__'':
import timeit
N_PROC = 8
INNER_LOOP = 10000
N = 1000
def propagate(t):
i, shm_hdl, evidence = t
a = SharedNumpyMemManager.getArray(shm_hdl)
for j in range(INNER_LOOP):
a[i] = i
class Parallel_Dummy_PF:
def __init__(self, N):
self.N = N
self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)
self.pool = multiprocessing.Pool(processes=N_PROC)
def update_par(self, evidence):
self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))
def update_seq(self, evidence):
for i in range(self.N):
propagate((i, self.arrayHdl, evidence))
def getArray(self):
return SharedNumpyMemManager.getArray(self.arrayHdl)
def parallelExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_par(5)
print(pf.getArray())
def sequentialExec():
pf = Parallel_Dummy_PF(N)
print(pf.getArray())
pf.update_seq(5)
print(pf.getArray())
t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")
print("Sequential: ", t1.timeit(number=1))
print("Parallel: ", t2.timeit(number=1))
Supongamos que tengo una gran matriz numpy de memoria, tengo una función func
que toma esta matriz gigante como entrada (junto con algunos otros parámetros). func
con diferentes parámetros se puede ejecutar en paralelo. Por ejemplo:
def func(arr, param):
# do stuff to arr, param
# build array arr
pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Si utilizo la biblioteca de multiprocesamiento, esa matriz gigante se copiará varias veces en diferentes procesos.
¿Hay alguna manera de permitir que diferentes procesos compartan la misma matriz? Este objeto de matriz es de solo lectura y nunca será modificado.
Lo que es más complicado, si arr no es una matriz, sino un objeto python arbitrario, ¿hay alguna forma de compartirlo?
[EDITADO]
Leí la respuesta, pero todavía estoy un poco confundido. Dado que fork () es copy-on-write, no deberíamos invocar ningún costo adicional al generar nuevos procesos en la biblioteca de multiprocesamiento de python. Pero el siguiente código sugiere que hay una gran sobrecarga:
from multiprocessing import Pool, Manager
import numpy as np;
import time
def f(arr):
return len(arr)
t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;
pool = Pool(processes = 6)
t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;
salida (y, por cierto, el costo aumenta a medida que aumenta el tamaño de la matriz, por lo que sospecho que aún hay gastos generales relacionados con la copia de memoria):
construct array = 0.0178790092468
multiprocessing overhead = 0.252444982529
¿Por qué hay una sobrecarga tan grande si no copiamos la matriz? ¿Y qué parte me salva la memoria compartida?
Si utiliza un sistema operativo que utiliza la semántica fork()
write-on-write fork()
(como cualquier Unix común), siempre que nunca altere su estructura de datos estará disponible para todos los procesos secundarios sin ocupar memoria adicional. No tendrá que hacer nada especial (excepto que esté absolutamente seguro de no alterar el objeto).
Lo más eficiente que puede hacer por su problema sería empacar su matriz en una estructura de matriz eficiente (usando numpy
o array
), colocarla en la memoria compartida, envolverla con multiprocessing.Array
, y pasar eso a sus funciones. Esta respuesta muestra cómo hacer eso .
Si desea un objeto compartido que se pueda escribir , deberá envolverlo con algún tipo de sincronización o bloqueo. multiprocessing
proporciona dos métodos para hacer esto : uno que utiliza memoria compartida (adecuada para valores simples, matrices o ctypes) o un proxy de Manager
, donde un proceso retiene la memoria y un administrador arbitra el acceso a ella desde otros procesos (incluso a través de una red) .
El enfoque de Manager
se puede usar con objetos de Python arbitrarios, pero será más lento que el equivalente que utiliza la memoria compartida porque los objetos deben serializarse / deserializarse y enviarse entre procesos.
Existe una gran cantidad de bibliotecas y enfoques de procesamiento paralelo disponibles en Python . multiprocessing
es una biblioteca excelente y completa, pero si tiene necesidades especiales, tal vez uno de los otros enfoques sea mejor.