set_start_method - python multithreading
Multiprocesamiento y memoria de Python. (2)
Estoy usando multiprocessing.imap_unordered
para realizar un cálculo en una lista de valores:
def process_parallel(fnc, some_list):
pool = multiprocessing.Pool()
for result in pool.imap_unordered(fnc, some_list):
for x in result:
yield x
pool.terminate()
Cada llamada a fnc
devuelve un objeto ENORME como resultado, por diseño. Puedo almacenar N instancias de tal objeto en la RAM, donde N ~ cpu_count, pero no mucho más (no cientos).
Ahora, usar esta función requiere demasiada memoria. La memoria se gasta por completo en el proceso principal, no en los trabajadores.
¿Cómo almacena imap_unordered
los resultados finales? Me refiero a los resultados que ya fueron devueltos por los trabajadores pero que aún no se han transmitido al usuario. Pensé que era inteligente y solo los computé "perezosamente" según fuera necesario, pero aparentemente no.
Parece que, dado que no puedo consumir los resultados de process_parallel
lo suficientemente rápido, la agrupación mantiene en cola estos enormes objetos de fnc
algún lugar, internamente, y luego explota. Hay alguna manera de evitar esto? ¿Limitar su cola interna de alguna manera?
Estoy usando Python2.7. Aclamaciones.
Como puede ver al buscar en el archivo fuente correspondiente ( python2.7/multiprocessing/pool.py
), el IMapUnorderedIterator utiliza una instancia de collections.deque
para almacenar los resultados. Si entra un nuevo elemento, se agrega y elimina en la iteración.
Como sugirió, si entra otro objeto enorme mientras el hilo principal aún está procesando el objeto, también se almacenarán en la memoria.
Lo que podrías intentar es algo como esto:
it = pool.imap_unordered(fnc, some_list)
for result in it:
it._cond.acquire()
for x in result:
yield x
it._cond.release()
Esto debería hacer que la tarea-resultado-receptor-hilo se bloquee mientras procesa un ítem si está tratando de colocar el siguiente objeto en el deque. Por lo tanto, no debe haber más de dos de los grandes objetos en la memoria. Si eso funciona para tu caso, no lo sé;)
La solución más simple que se me ocurre sería agregar un cierre para envolver su función fnc
que usaría un semáforo para controlar el número total de ejecuciones simultáneas de trabajos que pueden ejecutarse al mismo tiempo (supongo que el proceso / subproceso principal incrementará el semáforo). El valor del semáforo podría calcularse según el tamaño del trabajo y la memoria disponible.