procesos paralelizar multiprocesamiento concurrentes python parallel-processing

python - paralelizar - ¿Desglosando datos de un archivo grande para multiprocesamiento?



paralelizar en python (2)

Estoy intentando paralelizar una aplicación que utiliza multiprocesamiento que toma un archivo csv muy grande (de 64 MB a 500 MB), trabaja un poco línea por línea y luego genera un archivo pequeño de tamaño fijo.

Actualmente hago una list(file_obj) , que desafortunadamente se carga completamente en la memoria (creo) y luego la list(file_obj) en n partes, siendo el número de procesos que quiero ejecutar. Luego hago un pool.map() en las listas divididas.

Esto parece tener un tiempo de ejecución muy, muy malo en comparación con una metodología de subprocesamiento simple, solo abrir el archivo e iterar sobre ella. ¿Alguien puede sugerir una mejor solución?

Además, necesito procesar las filas del archivo en grupos que conservan el valor de una columna determinada. Estos grupos de filas se pueden dividir, pero ningún grupo debe contener más de un valor para esta columna.


Lo mantendría simple. Haga que un solo programa abra el archivo y léalo línea por línea. Puede elegir la cantidad de archivos para dividirlo, abrir esa cantidad de archivos de salida y cada línea escribir en el siguiente archivo. Esto dividirá el archivo en n partes iguales. Luego, puede ejecutar un programa Python contra cada uno de los archivos en paralelo.


list(file_obj) puede requerir mucha memoria cuando fileobj es grande. Podemos reducir ese requisito de memoria al usar itertools para extraer trozos de líneas a medida que los necesitemos.

En particular, podemos utilizar

reader = csv.reader(f) chunks = itertools.groupby(reader, keyfunc)

para dividir el archivo en trozos procesables, y

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] result = pool.map(worker, groups)

para que la agrupación de multiprocesamiento funcione en trozos num_chunks a la vez.

Al hacerlo, necesitamos aproximadamente solo la memoria suficiente para almacenar algunos num_chunks ( num_chunks ) en la memoria, en lugar de todo el archivo.

import multiprocessing as mp import itertools import time import csv def worker(chunk): # `chunk` will be a list of CSV rows all with the same name column # replace this with your real computation # print(chunk) return len(chunk) def keyfunc(row): # `row` is one row of the CSV file. # replace this with the name column. return row[0] def main(): pool = mp.Pool() largefile = ''test.dat'' num_chunks = 10 results = [] with open(largefile) as f: reader = csv.reader(f) chunks = itertools.groupby(reader, keyfunc) while True: # make a list of num_chunks chunks groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] if groups: result = pool.map(worker, groups) results.extend(result) else: break pool.close() pool.join() print(results) if __name__ == ''__main__'': main()