Cómo usar Python multiprocesamiento Pool.map para rellenar la matriz numpy en un bucle for
arrays multiprocessing (3)
Quiero llenar una matriz 2D-numpy dentro de un ciclo for y ajustar el cálculo usando multiprocesamiento.
import numpy
from multiprocessing import Pool
array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)
def fill_array(start_val):
return range(start_val,start_val+10)
list_start_vals = range(40,60)
for line in xrange(20):
array_2D[line,:] = pool.map(fill_array,list_start_vals)
pool.close()
print array_2D
El efecto de ejecutarlo es que Python ejecuta 4 subprocesos y ocupa 4 núcleos de CPU PERO la ejecución no termina y la matriz no se imprime. Si trato de escribir la matriz en el disco, no pasa nada.
puede alguien decirme por que?
El problema se debe a la ejecución de pool.map
en for loop, El resultado del método map () es funcionalmente equivalente al mapa incorporado (), excepto que las tareas individuales se ejecutan en paralelo. entonces en su caso el pool.map (fill_array, list_start_vals) se llamará 20 veces y comenzará a correr en paralelo para cada iteración de for loop, el código siguiente debería funcionar
Código:
#!/usr/bin/python
import numpy
from multiprocessing import Pool
def fill_array(start_val):
return range(start_val,start_val+10)
if __name__ == "__main__":
array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)
list_start_vals = range(40,60)
# running the pool.map in a for loop is wrong
#for line in xrange(20):
# array_2D[line,:] = pool.map(fill_array,list_start_vals)
# get the result of pool.map (list of values returned by fill_array)
# in a pool_result list
pool_result = pool.map(fill_array,list_start_vals)
# the pool is processing its inputs in parallel, close() and join()
#can be used to synchronize the main process
#with the task processes to ensure proper cleanup.
pool.close()
pool.join()
# Now assign the pool_result to your numpy
for line,result in enumerate(pool_result):
array_2D[line,:] = result
print array_2D
Los siguientes trabajos. Primero, es una buena idea proteger la parte principal de su código dentro de un bloque principal para evitar efectos secundarios extraños. El resultado de poo.map()
es una lista que contiene las evaluaciones para cada valor en el iterador list_start_vals
, de modo que no es necesario crear array_2D
antes.
import numpy as np
from multiprocessing import Pool
def fill_array(start_val):
return list(range(start_val, start_val+10))
if __name__==''__main__'':
pool = Pool(processes=4)
list_start_vals = range(40, 60)
array_2D = np.array(pool.map(fill_array, list_start_vals))
pool.close() # ATTENTION HERE
print array_2D
quizás tengas problemas para usar pool.close()
, de los comentarios de @hpaulj puedes eliminar esta línea en caso de que tengas problemas ...
Si aún desea usar el relleno de matriz, puede usar pool.apply_async
lugar de pool.map
. Trabajando con la respuesta de Saullo:
import numpy as np
from multiprocessing import Pool
def fill_array(start_val):
return range(start_val, start_val+10)
if __name__==''__main__'':
pool = Pool(processes=4)
list_start_vals = range(40, 60)
array_2D = np.zeros((20,10))
for line, val in enumerate(list_start_vals):
result = pool.apply_async(fill_array, [val])
array_2D[line,:] = result.get()
pool.close()
print array_2D
Esto funciona un poco más lento que el map
. Pero no produce un error de tiempo de ejecución como mi prueba de la versión del mapa: Exception RuntimeError: RuntimeError(''cannot join current thread'',) in <Finalize object, dead> ignored