python - set_start_method - multiprocessing.Pool() más lento que simplemente usando funciones ordinarias
python pool process (3)
Estos problemas generalmente se reducen a lo siguiente:
¡La función que intenta paralelizar no requiere suficientes recursos de CPU (es decir, tiempo de CPU) para racionalizar la paralelización!
Claro, cuando se paraliza con el multiprocessing.Pool(8)
, teóricamente ( pero no en la práctica) podría obtener una aceleración de 8x .
Sin embargo, tenga en cuenta que esto no es gratis: usted obtiene esta paralelización a expensas de los siguientes gastos generales:
- Creando una
task
para cadachunk
(de tamañochunksize
) en suiter
pasado aPool.map(f, iter)
- Para cada
task
- Serialice la
task
y el valor de retorno de latask''s
( piense enpickle.dumps()
) - Deserialice la
task
y el valor de retorno de latask''s
( piense enpickle.loads()
) - Pierda un tiempo significativo en espera de
Locks
enQueues
memoria compartida, mientras que los procesos de trabajo y los procesos primariosget()
yput()
de / a estasQueues
.
- Serialice la
- Costo por única vez de llamadas a
os.fork()
para cada proceso de trabajo, lo cual es costoso.
En esencia, cuando usas Pool()
quieres:
- Altos requisitos de recursos de CPU
- Baja huella de datos pasada a cada llamada de función
- Itinerario razonablemente largo para justificar el costo único de (3) arriba.
Para una exploración más profunda, esta publicación y charla enlazada explican cómo los grandes datos que se pasan a Pool.map()
( y amigos) lo meten en problemas.
Raymond Hettinger también habla sobre el uso adecuado de la concurrencia de Python aquí.
(Esta pregunta es sobre cómo hacer que el multiprocesamiento.Pool () ejecute el código más rápido. Finalmente lo resolví, y la solución final se puede encontrar en la parte inferior de la publicación).
Pregunta original:
Estoy tratando de usar Python para comparar una palabra con muchas otras palabras en una lista y recuperar una lista de las más similares. Para hacer eso estoy usando la función difflib.get_close_matches. Estoy en una computadora portátil con Windows 7 relativamente nueva y poderosa, con Python 2.6.5.
Lo que quiero es acelerar el proceso de comparación porque mi lista de palabras de comparación es muy larga y tengo que repetir el proceso de comparación varias veces. Cuando me enteré del módulo de multiprocesamiento, parecía lógico que si la comparación pudiera dividirse en tareas de trabajo y ejecutarse simultáneamente (y por lo tanto haciendo uso de la potencia de la máquina a cambio de una velocidad más rápida) mi tarea de comparación terminaría más rápido.
Sin embargo, incluso después de haber probado muchos métodos diferentes, y se usaron métodos que se han mostrado en los documentos y sugerido en las publicaciones del foro, el método Pool parece ser increíblemente lento, mucho más lento que simplemente ejecutar la función get_close_matches original en la lista completa una vez. Me gustaría ayudar a entender por qué Pool () está siendo tan lento y si lo estoy usando correctamente. Solo uso este escenario de comparación de cadenas como ejemplo porque ese es el ejemplo más reciente en el que pude pensar en dónde no podía entender o hacer que el multiprocesamiento funcionara en lugar de hacerlo en mi contra. A continuación se muestra un código de ejemplo del escenario difflib que muestra las diferencias de tiempo entre los métodos ordinarios y los combinados:
from multiprocessing import Pool
import random, time, difflib
# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)]
mainword = "hello"
# comparison function
def findclosematch(subwordlist):
matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
if matches <> []:
return matches
# pool
print "pool method"
if __name__ == ''__main__'':
pool = Pool(processes=3)
t=time.time()
result = pool.map_async(findclosematch, wordlist, chunksize=100)
#do something with result
for r in result.get():
pass
print time.time()-t
# normal
print "normal method"
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
pass
print time.time()-t
La palabra que se debe encontrar es "hola", y la lista de palabras para encontrar coincidencias cercanas es una larga lista de 1 millón de 5 caracteres unidos al azar (solo con fines ilustrativos). Utilizo 3 núcleos de procesador y la función de mapa con un tamaño de trozo de 100 (¿qué elementos se procesarán por trabajador, creo?) (También probé los tamaños de 1000 y 10 000, pero no hubo una diferencia real). Tenga en cuenta que, en ambos métodos, comienzo el temporizador justo antes de activar mi función y lo finalizo justo después de haber repasado los resultados. Como puede ver a continuación, los resultados de los tiempos están claramente a favor del método original que no es Pool:
>>>
pool method
37.1690001488 seconds
normal method
10.5329999924 seconds
>>>
El método Pool es casi 4 veces más lento que el método original. ¿Hay algo que me estoy perdiendo aquí, o tal vez un malentendido sobre cómo funciona el Pooling / multiprocesamiento? Sospecho que parte del problema aquí podría ser que la función de mapa devuelve Ninguna y, por lo tanto, agrega miles de elementos innecesarios a la lista de resultados, aunque solo quiero que las coincidencias reales se devuelvan a los resultados y la he escrito como tal en la función. Por lo que entiendo es cómo funciona el mapa. He escuchado acerca de otras funciones como el filtro que solo recopila resultados no falsos, pero no creo que el multiprocesamiento / agrupación admita el método de filtro. ¿Hay otras funciones además del mapa / imap en el módulo de multiprocesamiento que podrían ayudarme a devolver solo lo que devuelve mi función? Aplicar la función es más para dar múltiples argumentos como lo entiendo.
Sé que también existe la función imap, que probé pero sin mejoras de tiempo. La razón es la misma razón por la que he tenido problemas para entender qué tiene de bueno el módulo de itertools, supuestamente "rápido como un rayo", lo que he notado es cierto para llamar a la función, pero en mi experiencia y por lo que he leído, eso es porque llamar a la función en realidad no hace ningún cálculo, por lo que cuando es tiempo de recorrer los resultados para recopilarlos y analizarlos (sin los cuales no tendría sentido llamar a la unidad), se necesita tanto o, a veces, más tiempo que un Simplemente usando la versión normal de la función straightup. Pero supongo que eso es para otro post.
De todos modos, estoy emocionado de ver si alguien puede empujarme en la dirección correcta aquí, y realmente aprecio cualquier ayuda en esto. Estoy más interesado en comprender el multiprocesamiento en general que hacer que este ejemplo funcione, aunque sería útil con algunos ejemplos de sugerencias de códigos de soluciones para ayudarme en mi comprensión.
La respuesta:
Parece que la desaceleración tuvo que ver con el lento tiempo de inicio de procesos adicionales. No pude conseguir que la función .Pool () fuera lo suficientemente rápida. Mi solución final para hacerlo más rápido fue dividir manualmente la lista de cargas de trabajo, usar múltiples .Process () en lugar de .Pool () y devolver las soluciones en una cola. Pero me pregunto si tal vez el cambio más crucial podría haber sido dividir la carga de trabajo en términos de la palabra principal a buscar en lugar de las palabras con las que comparar, quizás porque la función de búsqueda difflib ya es muy rápida. Aquí está el nuevo código que ejecuta 5 procesos al mismo tiempo, y resultó aproximadamente 10 veces más rápido que ejecutar un código simple (6 segundos frente a 55 segundos). Muy útil para búsquedas rápidas y difusas, además de lo rápido que es difflib.
from multiprocessing import Process, Queue
import difflib, random, time
def f2(wordlist, mainwordlist, q):
for mainword in mainwordlist:
matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
q.put(matches)
if __name__ == ''__main__'':
# constants (for 50 input words, find closest match in list of 100 000 comparison words)
q = Queue()
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)]
mainword = "hello"
mainwordlist = [mainword for each in xrange(50)]
# normal approach
t = time.time()
for mainword in mainwordlist:
matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
q.put(matches)
print time.time()-t
# split work into 5 or 10 processes
processes = 5
def splitlist(inlist, chunksize):
return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)]
print len(mainwordlist)/processes
mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes)
print "list ready"
t = time.time()
for submainwordlist in mainwordlistsplitted:
print "sub"
p = Process(target=f2, args=(wordlist,submainwordlist,q,))
p.Daemon = True
p.start()
for submainwordlist in mainwordlistsplitted:
p.join()
print time.time()-t
while True:
print q.get()
Experimenté algo similar con la piscina en un problema diferente. No estoy seguro de la causa real en este punto ...
La edición de respuestas de OP Karim Bahgat es la misma solución que funcionó para mí. Después de cambiar a un sistema Process & Queue, pude ver aceleraciones en línea con la cantidad de núcleos de una máquina.
Aquí hay un ejemplo.
def do_something(data):
return data * 2
def consumer(inQ, outQ):
while True:
try:
# get a new message
val = inQ.get()
# this is the ''TERM'' signal
if val is None:
break;
# unpack the message
pos = val[0] # its helpful to pass in/out the pos in the array
data = val[1]
# process the data
ret = do_something(data)
# send the response / results
outQ.put( (pos, ret) )
except Exception, e:
print "error!", e
break
def process_data(data_list, inQ, outQ):
# send pos/data to workers
for i,dat in enumerate(data_list):
inQ.put( (i,dat) )
# process results
for i in range(len(data_list)):
ret = outQ.get()
pos = ret[0]
dat = ret[1]
data_list[pos] = dat
def main():
# initialize things
n_workers = 4
inQ = mp.Queue()
outQ = mp.Queue()
# instantiate workers
workers = [mp.Process(target=consumer, args=(inQ,outQ))
for i in range(n_workers)]
# start the workers
for w in workers:
w.start()
# gather some data
data_list = [ d for d in range(1000)]
# lets process the data a few times
for i in range(4):
process_data(data_list)
# tell all workers, no more data (one msg for each)
for i in range(n_workers):
inQ.put(None)
# join on the workers
for w in workers:
w.join()
# print out final results (i*16)
for i,dat in enumerate(data_list):
print i, dat
Mi mejor conjetura es la sobrecarga de comunicación entre procesos (IPC). En la instancia de proceso único, el proceso único tiene la lista de palabras. Al delegar en varios otros procesos, el proceso principal debe llevar constantemente secciones de la lista a otros procesos.
Por lo tanto, se deduce que un mejor enfoque podría ser escindir n procesos, cada uno de los cuales es responsable de cargar / generar 1 / n segmento de la lista y verificar si la palabra está en esa parte de la lista.
Sin embargo, no estoy seguro de cómo hacerlo con la biblioteca de multiprocesamiento de Python.