thread lock async python memory multiprocessing pool

lock - python pool thread



El uso de la memoria sigue creciendo con el multiproceso de Python. (5)

Creo que esto es similar a la pregunta que publiqué , pero no estoy seguro de que tenga el mismo retraso. Mi problema era que estaba produciendo resultados del grupo de multiprocesamiento más rápido de lo que los consumía, por lo que se acumularon en la memoria. Para evitar eso, utilicé un semaphore para estrangular las entradas en el grupo para que no se adelantaran demasiado a las salidas que estaba consumiendo.

Aquí está el programa:

#!/usr/bin/python import multiprocessing def dummy_func(r): pass def worker(): pass if __name__ == ''__main__'': pool = multiprocessing.Pool(processes=16) for index in range(0,100000): pool.apply_async(worker, callback=dummy_func) # clean up pool.close() pool.join()

Descubrí que el uso de la memoria (tanto VIRT como RES) siguió creciendo hasta el cierre () / join (), ¿hay alguna solución para deshacerme de esto? Intenté maxtasksperchild con 2.7 pero tampoco ayudó.

Tengo un programa más complicado que calles apply_async () ~ 6M veces, y en ~ 1.5M punto ya tengo 6G + RES, para evitar todos los demás factores, simplifiqué el programa a la versión anterior.

EDITAR:

Resultó que esta versión funciona mejor, gracias por el aporte de todos:

#!/usr/bin/python import multiprocessing ready_list = [] def dummy_func(index): global ready_list ready_list.append(index) def worker(index): return index if __name__ == ''__main__'': pool = multiprocessing.Pool(processes=16) result = {} for index in range(0,1000000): result[index] = (pool.apply_async(worker, (index,), callback=dummy_func)) for ready in ready_list: result[ready].wait() del result[ready] ready_list = [] # clean up pool.close() pool.join()

No puse ningún bloqueo allí porque creo que el proceso principal es de un solo hilo (la devolución de llamada es más o menos como un evento controlado por documentos que leí).

Cambié el rango del índice de v1 a 1,000,000, igual que v2 e hice algunas pruebas; es extraño para mí v2 es incluso un 10% más rápido que v1 (33s vs 37s), tal vez v1 estaba haciendo demasiados trabajos de mantenimiento de listas internas. v2 es definitivamente un ganador en el uso de memoria, nunca superó los 300M (VIRT) y los 50M (RES), mientras que v1 solía ser 370M / 120M, el mejor fue 330M / 85M. Todos los números fueron solo 3 ~ 4 veces de prueba, solo de referencia.


Recientemente tuve problemas de memoria, ya que estaba usando varias veces la función de multiprocesamiento, por lo que mantiene los procesos de desove y los deja en la memoria.

Aquí está la solución que estoy usando ahora:

def myParallelProcess(ahugearray) from multiprocessing import Pool from contextlib import closing with closing( Pool(15) ) as p: res = p.imap_unordered(simple_matching, ahugearray, 100) return res

Yo ❤ con


Simplemente cree el grupo dentro de su bucle y ciérrelo al final del ciclo con pool.close() .


Tengo un conjunto de datos de nube de puntos 3d muy grande que estoy procesando. Intenté usar el módulo de multiprocesamiento para acelerar el procesamiento, pero empecé a salir de los errores de memoria. Después de algunas investigaciones y pruebas, determiné que estaba llenando la cola de tareas para ser procesadas mucho más rápido de lo que los subprocesos podrían vaciarlo. Estoy seguro de que, al fragmentar o usar map_async o algo, podría haber ajustado la carga, pero no quería hacer cambios importantes en la lógica circundante.

La solución tonta a la que llego es comprobar la longitud del pool._cache forma intermitente, y si el caché es demasiado grande, espere a que la cola se vacíe.

En mi mainloop ya tenía un contador y un indicador de estado:

# Update status count += 1 if count%10000 == 0: sys.stdout.write(''.'') if len(pool._cache) > 1e6: print "waiting for cache to clear..." last.wait() # Where last is assigned the latest ApplyResult

Así que cada 10k de inserción en el grupo compruebo si hay más de 1 millón de operaciones en cola (aproximadamente 1 G de memoria utilizada en el proceso principal). Cuando la cola está llena, solo espero a que finalice el último trabajo insertado.

Ahora mi programa puede ejecutarse durante horas sin quedarse sin memoria. El proceso principal solo se detiene ocasionalmente mientras los trabajadores continúan procesando los datos.

Por cierto, el miembro _cache está documentado en el ejemplo del grupo de módulos de multiprocesamiento:

# # Check there are no outstanding tasks # assert not pool._cache, ''cache = %r'' % pool._cache


Use map_async lugar de apply_async para evitar el uso excesivo de la memoria.

Para su primer ejemplo, cambie las siguientes dos líneas:

for index in range(0,100000): pool.apply_async(worker, callback=dummy_func)

a

pool.map_async(worker, range(100000), callback=dummy_func)

Terminará en un parpadeo antes de que puedas ver su uso de memoria en la top . Cambia la lista a una más grande para ver la diferencia. Pero tenga en cuenta que map_async primero convertirá el iterable que le pase a una lista para calcular su longitud si no tiene el método __len__ . Si tiene un iterador de una gran cantidad de elementos, puede usar itertools.islice para procesarlos en trozos más pequeños.

Tuve un problema de memoria en un programa de la vida real con muchos más datos y finalmente encontré que el culpable era apply_async .

PD: con respecto al uso de la memoria, sus dos ejemplos no tienen una diferencia obvia.