python numpy multiprocessing mmap

python - NumPy vs. multiprocesamiento y mmap



multiprocessing (1)

Mi enfoque habitual (si puede vivir con copias de memoria adicionales) es hacer todo IO en un proceso y luego enviar cosas a un grupo de subprocesos de trabajo. Para cargar una porción de una matriz memmapped en la memoria simplemente haz x = np.array(data[yourslice]) ( data[yourslice].copy() realidad no hace esto, lo que puede llevar a confusión.).

Primero, generemos algunos datos de prueba:

import numpy as np np.random.random(10000).tofile(''data.dat'')

Puede reproducir sus errores con algo como esto:

import numpy as np import multiprocessing def main(): data = np.memmap(''data.dat'', dtype=np.float, mode=''r'') pool = multiprocessing.Pool() results = pool.imap(calculation, chunks(data)) results = np.fromiter(results, dtype=np.float) def chunks(data, chunksize=100): """Overly-simple chunker...""" intervals = range(0, data.size, chunksize) + [None] for start, stop in zip(intervals[:-1], intervals[1:]): yield data[start:stop] def calculation(chunk): """Dummy calculation.""" return chunk.mean() - chunk.std() if __name__ == ''__main__'': main()

Y si simplemente cambia a rendimiento np.array(data[start:stop]) lugar, solucionará el problema:

import numpy as np import multiprocessing def main(): data = np.memmap(''data.dat'', dtype=np.float, mode=''r'') pool = multiprocessing.Pool() results = pool.imap(calculation, chunks(data)) results = np.fromiter(results, dtype=np.float) def chunks(data, chunksize=100): """Overly-simple chunker...""" intervals = range(0, data.size, chunksize) + [None] for start, stop in zip(intervals[:-1], intervals[1:]): yield np.array(data[start:stop]) def calculation(chunk): """Dummy calculation.""" return chunk.mean() - chunk.std() if __name__ == ''__main__'': main()

Por supuesto, esto hace una copia extra en memoria de cada fragmento.

A largo plazo, probablemente descubras que es más fácil alejarse de los archivos memmapped y pasar a algo como HDF. Esto es especialmente cierto si sus datos son multidimensionales. ( h5py , pero pyTables es bueno si sus datos son "similares a una mesa").

¡Buena suerte, en cualquier caso!

Estoy usando el módulo de multiprocessing de Python para procesar grandes matrices numpy en paralelo. Las matrices se mapean en memoria usando numpy.load(mmap_mode=''r'') en el proceso maestro. Después de eso, multiprocessing.Pool() divide el proceso (supongo).

Todo parece funcionar bien, excepto que estoy recibiendo líneas como:

AttributeError (el objeto "NoneType" no tiene ningún atributo "tell" ",) en el <bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)> ignorado

en los registros de prueba de unidad Las pruebas pasan bien, sin embargo.

¿Alguna idea de lo que está pasando allí?

Usando Python 2.7.2, OS X, NumPy 1.6.1.

ACTUALIZAR:

Después de algunas depuraciones, busqué la causa en una ruta de código que estaba usando una (porción pequeña) de esta matriz numpy asignada en memoria como entrada a una llamada Pool.imap .

Aparentemente, el "problema" es con la forma de multiprocessing.Pool.imap pasa su entrada a los nuevos procesos: utiliza pickle. Esto no funciona con las matrices nummap ed numpy, y algo dentro de las interrupciones que conduce al error.

Encontré esta respuesta de Robert Kern que parece abordar el mismo problema. Sugiere crear una ruta de código especial para cuando la entrada de imap proviene de una matriz mapeada en memoria: mapeo de memoria la misma matriz manualmente en el proceso engendrado.

Esto sería tan complicado y feo que preferiría vivir con el error y las copias de memoria adicionales. ¿Hay alguna otra manera que sería más ligero en la modificación del código existente?