multiplicar - sumar elementos de una lista python
Cómo paralelizar un cálculo de suma en python numpy? (2)
Tengo una suma que intento calcular y tengo dificultades para paralelizar el código. El cálculo que intento paralelizar es bastante complejo (usa matrices numpy y matrices dispersas dispersas). Escupe una matriz numpy, y quiero sumar las matrices de salida de alrededor de 1000 cálculos. Idealmente, mantendría una suma continua sobre todas las iteraciones. Sin embargo, no he podido averiguar cómo hacer esto.
Hasta ahora, he intentado usar la función Parallel de joblib y la función pool.map con el paquete de multiprocesamiento de python. Para ambos, utilizo una función interna que devuelve una matriz numpy. Estas funciones devuelven una lista, que convierto en una matriz numpy y luego se suman.
Sin embargo, después de que la función paralela de joblib complete todas las iteraciones, el programa principal nunca continúa ejecutándose (parece que el trabajo original está en estado suspendido, usando 0% de CPU). Cuando uso pool.map, recibo errores de memoria después de que se completan todas las iteraciones.
¿Hay alguna manera de simplemente paralelizar una suma continua de matrices?
Editar : El objetivo es hacer algo como lo siguiente, excepto en paralelo.
def summers(num_iters):
sumArr = np.zeros((1,512*512)) #initialize sum
for index in range(num_iters):
sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array
return sumArr
Descubrí cómo hacer la paralelización de una suma de matrices con multiprocesamiento, apply_async y callbacks, por lo que estoy publicando esto aquí para otras personas. Usé la página de ejemplo para Parallel Python para la clase de devolución de Sum, aunque en realidad no utilicé ese paquete para la implementación. Sin embargo, me dio la idea de usar callbacks. Aquí está el código simplificado de lo que terminé usando, y hace lo que yo quería que hiciera.
import multiprocessing
import numpy as np
import thread
class Sum: #again, this class is from ParallelPython''s example code (I modified for an array and added comments)
def __init__(self):
self.value = np.zeros((1,512*512)) #this is the initialization of the sum
self.lock = thread.allocate_lock()
self.count = 0
def add(self,value):
self.count += 1
self.lock.acquire() #lock so sum is correct if two processes return at same time
self.value += value #the actual summation
self.lock.release()
def computation(index):
array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
return array1
def summers(num_iters):
pool = multiprocessing.Pool(processes=8)
sumArr = Sum() #create an instance of callback class and zero the sum
for index in range(num_iters):
singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)
pool.close()
pool.join() #waits for all the processes to finish
return sumArr.value
También pude hacer que funcionara usando un mapa paralelizado, que se sugirió en otra respuesta. Lo había intentado antes, pero no lo estaba implementando correctamente. Ambas formas funcionan, y creo que esta respuesta explica bastante bien el problema de qué método usar (asignar o aplicar.asínc). Para la versión de mapa, no es necesario definir la clase Sum y la función de veranos se convierte
def summers(num_iters):
pool = multiprocessing.Pool(processes=8)
outputArr = np.zeros((num_iters,1,512*512)) #you wouldn''t have to initialize these
sumArr = np.zeros((1,512*512)) #but I do to make sure I have the memory
outputArr = np.array(pool.map(computation, range(num_iters)))
sumArr = outputArr.sum(0)
pool.close() #not sure if this is still needed since map waits for all iterations
return sumArr
No estoy seguro de entender el problema. ¿Estás tratando de dividir una lista en un grupo de trabajadores, hacer que mantengan una suma continua de sus cálculos y sumar el resultado?
#!/bin/env python
import sys
import random
import time
import multiprocessing
import numpy as np
numpows = 5
numitems = 25
nprocs = 4
def expensiveComputation( i ):
time.sleep( random.random() * 10 )
return np.array([i**j for j in range(numpows)])
def listsum( l ):
sum = np.zeros_like(l[0])
for item in l:
sum = sum + item
return sum
def partition(lst, n):
division = len(lst) / float(n)
return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ]
def myRunningSum( l ):
sum = np.zeros(numpows)
for item in l:
sum = sum + expensiveComputation(item)
return sum
if __name__ == ''__main__'':
random.seed(1)
data = range(numitems)
pool = multiprocessing.Pool(processes=4,)
calculations = pool.map(myRunningSum, partition(data,nprocs))
print ''Answer is:'', listsum(calculations)
print ''Expected answer: '', np.array([25.,300.,4900.,90000.,1763020.])
(la función de partición proviene de Python: dividir una lista en n particiones de igual longitud )