python - Use numpy array en la memoria compartida para multiprocesamiento
numpy python 3 (5)
El objeto Array
tiene get_obj()
un método get_obj()
, que devuelve la matriz ctypes que presenta una interfaz de buffer. Creo que lo siguiente debería funcionar ...
from multiprocessing import Process, Array
import scipy
import numpy
def f(a):
a[0] = -a[0]
if __name__ == ''__main__'':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
a = Array(''d'', unshared_arr)
print "Originally, the first two elements of arr = %s"%(a[:2])
# Create, start, and finish the child process
p = Process(target=f, args=(a,))
p.start()
p.join()
# Print out the changed values
print "Now, the first two elements of arr = %s"%a[:2]
b = numpy.frombuffer(a.get_obj())
b[0] = 10.0
print a[0]
Cuando se ejecuta, se imprime el primer elemento de a
ahora que es 10.0, que muestra a
y b
son solo dos vistas en la misma memoria.
Para asegurarme de que siga siendo multiprocesador seguro, creo que tendrá que utilizar los métodos de acquire
y release
que existen en el objeto Array
, y su bloqueo incorporado para asegurarse de que todo esté bien accesible (aunque yo no soy un experto en el módulo multiprocesador).
Me gustaría utilizar una matriz numpy en la memoria compartida para usar con el módulo de multiprocesamiento. La dificultad es usarlo como una matriz numpy, y no solo como una matriz ctypes.
from multiprocessing import Process, Array
import scipy
def f(a):
a[0] = -a[0]
if __name__ == ''__main__'':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
arr = Array(''d'', unshared_arr)
print "Originally, the first two elements of arr = %s"%(arr[:2])
# Create, start, and finish the child processes
p = Process(target=f, args=(arr,))
p.start()
p.join()
# Printing out the changed values
print "Now, the first two elements of arr = %s"%arr[:2]
Esto produce resultados como:
Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]
Se puede acceder a la matriz de forma ctypes, por ejemplo, arr[i]
tiene sentido. Sin embargo, no es una matriz numpy, y no puedo realizar operaciones tales como -1*arr
, o arr.sum()
. Supongo que una solución sería convertir la matriz ctypes en una matriz numpy. Sin embargo (además de no poder hacer este trabajo), no creo que se vuelva a compartir.
Parece que habría una solución estándar para lo que tiene que ser un problema común.
Escribí un pequeño módulo de python que usa memoria compartida POSIX para compartir matrices numpy entre intérpretes de Python. Quizás lo encuentres a mano.
https://pypi.python.org/pypi/SharedArray
Así es como funciona:
import numpy as np
import SharedArray as sa
# Create an array in shared memory
a = sa.create("test1", 10)
# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")
# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])
# Destroying a does not affect b.
del a
print(b[0])
# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()
# Now destroy the array "test1" from memory.
sa.delete("test1")
# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
Para agregar a las respuestas de @ unutbu (no disponible más) y @Henry Gomersall. Puede usar shared_arr.get_lock()
para sincronizar el acceso cuando sea necesario:
shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
with shared_arr.get_lock(): # synchronize access
arr = np.frombuffer(shared_arr.get_obj()) # no data copying
arr[i] = -arr[i]
Ejemplo
import ctypes
import logging
import multiprocessing as mp
from contextlib import closing
import numpy as np
info = mp.get_logger().info
def main():
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
# create shared array
N, M = 100, 11
shared_arr = mp.Array(ctypes.c_double, N)
arr = tonumpyarray(shared_arr)
# fill with random values
arr[:] = np.random.uniform(size=N)
arr_orig = arr.copy()
# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
# many processes access the same slice
stop_f = N // 10
p.map_async(f, [slice(stop_f)]*M)
# many processes access different slices of the same array
assert M % 2 # odd
step = N // 10
p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
p.join()
assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)
def init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def f(i):
"""synchronized."""
with shared_arr.get_lock(): # synchronize access
g(i)
def g(i):
"""no synchronization."""
info("start %s" % (i,))
arr = tonumpyarray(shared_arr)
arr[i] = -1 * arr[i]
info("end %s" % (i,))
if __name__ == ''__main__'':
mp.freeze_support()
main()
Si no necesita acceso sincronizado o crea sus propios bloqueos, entonces mp.Array()
es innecesario. Puede usar mp.sharedctypes.RawArray
en este caso.
Puede usar el módulo sharedmem
: https://bitbucket.org/cleemesser/numpy-sharedmem
Aquí está su código original, esta vez usando memoria compartida que se comporta como una matriz NumPy (observe la última instrucción adicional que llama a una función NumPy sum()
):
from multiprocessing import Process
import sharedmem
import scipy
def f(a):
a[0] = -a[0]
if __name__ == ''__main__'':
# Create the array
N = int(10)
unshared_arr = scipy.rand(N)
arr = sharedmem.empty(N)
arr[:] = unshared_arr.copy()
print "Originally, the first two elements of arr = %s"%(arr[:2])
# Create, start, and finish the child process
p = Process(target=f, args=(arr,))
p.start()
p.join()
# Print out the changed values
print "Now, the first two elements of arr = %s"%arr[:2]
# Perform some NumPy operation
print arr.sum()
Si bien las respuestas ya dadas son buenas, hay una solución mucho más fácil para este problema siempre que se cumplan dos condiciones:
- Está en un sistema operativo compatible con POSIX (por ejemplo, Linux, Mac OSX); y
- Los procesos secundarios necesitan acceso de solo lectura a la matriz compartida.
En este caso, no es necesario que manipule explícitamente las variables compartidas, ya que los procesos secundarios se crearán utilizando un tenedor. Un niño bifurcado comparte automáticamente el espacio de memoria del padre. En el contexto del multiprocesamiento de Python, esto significa que comparte todas las variables de nivel de módulo ; tenga en cuenta que esto no se aplica a los argumentos que pasa explícitamente a los procesos secundarios o a las funciones que llama en un multiprocessing.Pool
. multiprocessing.Pool
o menos.
Un simple ejemplo:
import multiprocessing
import numpy as np
# will hold the (implicitly mem-shared) data
data_array = None
# child worker function
def job_handler(num):
# built-in id() returns unique memory ID of a variable
return id(data_array), np.sum(data_array)
def launch_jobs(data, num_jobs=5, num_worker=4):
global data_array
data_array = data
pool = multiprocessing.Pool(num_worker)
return pool.map(job_handler, range(num_jobs))
# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))
# this will print ''True'' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))