parallel how example python pandas multiprocessing threadpool python-multiprocessing

example - how to parallel in python



¿Cómo resolver problemas de problemas de memoria mientras se multiprocesan utilizando Pool.map()? (4)

He escrito el programa (abajo) para:

  • leer un archivo de texto enorme como pandas dataframe
  • luego, groupby utilizando un valor de columna específico para dividir los datos y almacenarlos como una lista de marcos de datos.
  • luego multiprocess Pool.map() los datos a multiprocess Pool.map() para procesar cada marco de datos en paralelo.

Todo está bien, el programa funciona bien en mi pequeño conjunto de datos de prueba. Pero, cuando canalizo mis datos grandes (alrededor de 14 GB), el consumo de memoria aumenta exponencialmente y luego congela la computadora o muere (en el clúster HPC).

He añadido códigos para borrar la memoria tan pronto como los datos / variable no son útiles. También estoy cerrando la piscina tan pronto como esté hecha. Aún con la entrada de 14 GB, solo esperaba 2 * 14 GB de carga de memoria, pero parece que hay mucha actividad. También traté de modificar con chunkSize and maxTaskPerChild, etc pero no veo ninguna diferencia en la optimización tanto en el archivo de prueba como en el archivo grande.

Creo que las mejoras en este código son necesarias en esta posición del código, cuando comienzo el multiprocessing .

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) pero, estoy publicando todo el código.

Ejemplo de prueba: creé un archivo de prueba ("genome_matrix_final-chr1234-1mb.txt") de hasta 250 mb y ejecuté el programa. Cuando reviso el monitor del sistema, puedo ver que el consumo de memoria se incrementó en aproximadamente 6 GB. No estoy tan claro por qué tanto espacio de memoria es ocupado por un archivo de 250 mb más algunas salidas. He compartido ese archivo a través del cuadro desplegable si ayuda a ver el problema real. https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

Alguien puede sugerir, ¿Cómo puedo deshacerme del problema?

Mi script en python:

#!/home/bin/python3 import pandas as pd import collections from multiprocessing import Pool import io import time import resource print() print(''Checking required modules'') print() '''''' change this input file name and/or path as need be '''''' genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt" # test file 01 genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt" # test file 02 #genome_matrix_file = "genome_matrix_final.txt" # large file def main(): with open("genome_matrix_header.txt") as header: header = header.read().rstrip(''/n'').split(''/t'') print() time01 = time.time() print(''starting time: '', time01) ''''''load the genome matrix file onto pandas as dataframe. This makes is more easy for multiprocessing'''''' gen_matrix_df = pd.read_csv(genome_matrix_file, sep=''/t'', names=header) # now, group the dataframe by chromosome/contig - so it can be multiprocessed gen_matrix_df = gen_matrix_df.groupby(''CHROM'') # store the splitted dataframes as list of key, values(pandas dataframe) pairs # this list of dataframe will be used while multiprocessing gen_matrix_df_list = collections.OrderedDict() for chr_, data in gen_matrix_df: gen_matrix_df_list[chr_] = data # clear memory del gen_matrix_df ''''''Now, pipe each dataframe from the list using map.Pool() '''''' p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) del gen_matrix_df_list # clear memory p.close() p.join() # concat the results from pool.map() and write it to a file result_merged = pd.concat(result) del result # clear memory pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep=''/t'', header=True, index=False) print() print(''completed all process in "%s" sec. '' % (time.time() - time01)) print(''Global maximum memory usage: %.2f (mb)'' % current_mem_usage()) print() ''''''function to convert the dataframe from genome matrix to desired output '''''' def matrix_to_vcf(matrix_df): print() time02 = time.time() # index position of the samples in genome matrix file sample_idx = [{''10a'': 33, ''10b'': 18}, {''13a'': 3, ''13b'': 19}, {''14a'': 20, ''14b'': 4}, {''16a'': 5, ''16b'': 21}, {''17a'': 6, ''17b'': 22}, {''23a'': 7, ''23b'': 23}, {''24a'': 8, ''24b'': 24}, {''25a'': 25, ''25b'': 9}, {''26a'': 10, ''26b'': 26}, {''34a'': 11, ''34b'': 27}, {''35a'': 12, ''35b'': 28}, {''37a'': 13, ''37b'': 29}, {''38a'': 14, ''38b'': 30}, {''3a'': 31, ''3b'': 15}, {''8a'': 32, ''8b'': 17}] # sample index stored as ordered dictionary sample_idx_ord_list = [] for ids in sample_idx: ids = collections.OrderedDict(sorted(ids.items())) sample_idx_ord_list.append(ids) # for haplotype file header = [''contig'', ''pos'', ''ref'', ''alt''] # adding some suffixes "PI" to available sample names for item in sample_idx_ord_list: ks_update = '''' for ks in item.keys(): ks_update += ks header.append(ks_update+''_PI'') header.append(ks_update+''_PG_al'') #final variable store the haplotype data # write the header lines first haplotype_output = ''/t''.join(header) + ''/n'' # to store the value of parsed the line and update the "PI", "PG" value for each sample updated_line = '''' # read the piped in data back to text like file matrix_df = pd.DataFrame.to_csv(matrix_df, sep=''/t'', index=False) matrix_df = matrix_df.rstrip(''/n'').split(''/n'') for line in matrix_df: if line.startswith(''CHROM''): continue line_split = line.split(''/t'') chr_ = line_split[0] ref = line_split[2] alt = list(set(line_split[3:])) # remove the alleles "N" missing and "ref" from the alt-alleles alt_up = list(filter(lambda x: x!=''N'' and x!=ref, alt)) # if no alt alleles are found, just continue # - i.e : don''t write that line in output file if len(alt_up) == 0: continue #print(''/nMining data for chromosome/contig "%s" '' %(chr_ )) #so, we have data for CHR, POS, REF, ALT so far # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag) sample_data_for_vcf = [] for ids in sample_idx_ord_list: sample_data = [] for key, val in ids.items(): sample_value = line_split[val] sample_data.append(sample_value) # now, update the phased state for each sample # also replacing the missing allele i.e "N" and "-" with ref-allele sample_data = (''|''.join(sample_data)).replace(''N'', ref).replace(''-'', ref) sample_data_for_vcf.append(str(chr_)) sample_data_for_vcf.append(sample_data) # add data for all the samples in that line, append it with former columns (chrom, pos ..) .. # and .. write it to final haplotype file sample_data_for_vcf = ''/t''.join(sample_data_for_vcf) updated_line = ''/t''.join(line_split[0:3]) + ''/t'' + '',''.join(alt_up) + / ''/t'' + sample_data_for_vcf + ''/n'' haplotype_output += updated_line del matrix_df # clear memory print(''completed haplotype preparation for chromosome/contig "%s" '' ''in "%s" sec. '' %(chr_, time.time()-time02)) print(''/tWorker maximum memory usage: %.2f (mb)'' %(current_mem_usage())) # return the data back to the pool return pd.read_csv(io.StringIO(haplotype_output), sep=''/t'') '''''' to monitor memory '''''' def current_mem_usage(): return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024. if __name__ == ''__main__'': main()

Actualización para cazarrecompensas:

He logrado el multiprocesamiento con Pool.map() pero el código está causando una gran carga de memoria (archivo de prueba de entrada de ~ 300 mb, pero la carga de la memoria es de aproximadamente 6 GB). Solo esperaba una carga de memoria de 3 * 300 mb al máximo.

  • Alguien puede explicar: ¿Qué está causando un requerimiento de memoria tan grande para un archivo tan pequeño y para una computación tan pequeña?
  • Además, estoy tratando de tomar la respuesta y usarla para mejorar el multiprocesamiento en mi gran programa. Por lo tanto, además de cualquier método, el módulo que no cambia la estructura de la parte de cómputo (proceso vinculado a la CPU) debería estar bien.
  • He incluido dos archivos de prueba para los fines de prueba para jugar con el código.
  • El código adjunto es código completo, por lo que debería funcionar como se esperaba cuando se copió y pegó. Cualquier cambio debe usarse solo para mejorar la optimización en los pasos de multiprocesamiento.

Requisito previo

  1. En Python (a continuación uso la compilación de 64 bits de Python 3.6.5) todo es un objeto. Esto tiene su sobrecarga y con getsizeof podemos ver exactamente el tamaño de un objeto en bytes:

    >>> import sys >>> sys.getsizeof(42) 28 >>> sys.getsizeof(''T'') 50

  2. Cuando se usa la llamada al sistema de bifurcación (predeterminado en * nix, vea multiprocessing.get_start_method() ) para crear un proceso hijo, la memoria física de los padres no se copia y se usa la técnica de copy-on-write .
  3. El proceso hijo de bifurcación seguirá informando el RSS completo (tamaño del conjunto residente) del proceso padre. Debido a este hecho, PSS (tamaño de conjunto proporcional) es la métrica más adecuada para estimar el uso de memoria de la aplicación de forking. Aquí hay un ejemplo de la página:
  • El proceso A tiene 50 KiB de memoria no compartida
  • El proceso B tiene 300 KiB de memoria no compartida
  • Tanto el proceso A como el proceso B tienen 100 KiB de la misma región de memoria compartida

Dado que el PSS se define como la suma de la memoria no compartida de un proceso y la proporción de memoria compartida con otros procesos, el PSS para estos dos procesos es el siguiente:

  • PSS del proceso A = 50 KiB + (100 KiB / 2) = 100 KiB
  • PSS del proceso B = 300 KiB + (100 KiB / 2) = 350 KiB

El marco de datos

No echemos un vistazo a su DataFrame solo. memory_profiler nos ayudará.

justpd.py

#!/usr/bin/env python3 import pandas as pd from memory_profiler import profile @profile def main(): with open(''genome_matrix_header.txt'') as header: header = header.read().rstrip(''/n'').split(''/t'') gen_matrix_df = pd.read_csv( ''genome_matrix_final-chr1234-1mb.txt'', sep=''/t'', names=header) gen_matrix_df.info() gen_matrix_df.info(memory_usage=''deep'') if __name__ == ''__main__'': main()

Ahora vamos a usar el perfilador:

mprof run justpd.py mprof plot

Podemos ver la trama:

y trazo línea por línea:

Line # Mem usage Increment Line Contents ================================================ 6 54.3 MiB 54.3 MiB @profile 7 def main(): 8 54.3 MiB 0.0 MiB with open(''genome_matrix_header.txt'') as header: 9 54.3 MiB 0.0 MiB header = header.read().rstrip(''/n'').split(''/t'') 10 11 2072.0 MiB 2017.7 MiB gen_matrix_df = pd.read_csv(''genome_matrix_final-chr1234-1mb.txt'', sep=''/t'', names=header) 12 13 2072.0 MiB 0.0 MiB gen_matrix_df.info() 14 2072.0 MiB 0.0 MiB gen_matrix_df.info(memory_usage=''deep'')

Podemos ver que el marco de datos toma ~ 2 GiB con un pico en ~ 3 GiB mientras se está construyendo. Lo que es más interesante es la salida de info .

<class ''pandas.core.frame.DataFrame''> RangeIndex: 4000000 entries, 0 to 3999999 Data columns (total 34 columns): ... dtypes: int64(2), object(32) memory usage: 1.0+ GB

Pero info(memory_usage=''deep'') ("deep" significa la introspección de los datos en profundidad al interrogar los object dtype , ver más abajo) da:

memory usage: 7.9 GB

Eh Si nos fijamos fuera del proceso, podemos asegurarnos de que las cifras de memory_profiler sean correctas. sys.getsizeof también muestra el mismo valor para el marco (probablemente debido a la costumbre __sizeof__ ) y también lo harán otras herramientas que lo utilizan para estimar gc.get_objects() asignado, por ejemplo, pympler .

# added after read_csv from pympler import tracker tr = tracker.SummaryTracker() tr.print_diff()

Da:

types | # objects | total size ================================================== | =========== | ============ <class ''pandas.core.series.Series | 34 | 7.93 GB <class ''list | 7839 | 732.38 KB <class ''str | 7741 | 550.10 KB <class ''int | 1810 | 49.66 KB <class ''dict | 38 | 7.43 KB <class ''pandas.core.internals.SingleBlockManager | 34 | 3.98 KB <class ''numpy.ndarray | 34 | 3.19 KB

Entonces, ¿de dónde vienen estos 7.93 GiB? Vamos a tratar de explicar esto. Tenemos 4M filas y 34 columnas, lo que nos da 134M valores. Son int64 u object (que es un puntero de 64 bits; consulte el uso de pandas con datos extensos para obtener una explicación detallada). Por lo tanto, tenemos 134 * 10 ** 6 * 8 / 2 ** 20 ~ 1022 MiB solo para los valores en el marco de datos. ¿Qué pasa con el restante ~ 6.93 GiB?

Interning de cuerdas

Para comprender el comportamiento, es necesario saber que Python realiza el internado de cadenas. Hay dos buenos artículos ( one , two ) sobre el internado de cadenas en Python 2. Además del cambio de Unicode en Python 3 y PEP 393 en Python 3.3, las estructuras C han cambiado, pero la idea es la misma. Básicamente, cada cadena corta que parece un identificador será almacenada en caché por Python en un diccionario interno y las referencias apuntarán a los mismos objetos de Python. En otras palabras podemos decir que se comporta como un singleton. Los artículos que mencioné anteriormente explican las mejoras significativas en el perfil de memoria y el rendimiento que ofrece. Podemos verificar si una cadena está internada usando el campo interned de PyASCIIObject :

import ctypes class PyASCIIObject(ctypes.Structure): _fields_ = [ (''ob_refcnt'', ctypes.c_size_t), (''ob_type'', ctypes.py_object), (''length'', ctypes.c_ssize_t), (''hash'', ctypes.c_int64), (''state'', ctypes.c_int32), (''wstr'', ctypes.c_wchar_p) ]

Entonces:

>>> a = ''name'' >>> b = ''!@#$'' >>> a_struct = PyASCIIObject.from_address(id(a)) >>> a_struct.state & 0b11 1 >>> b_struct = PyASCIIObject.from_address(id(b)) >>> b_struct.state & 0b11 0

Con dos cadenas también podemos hacer comparación de identidad (abordado en comparación de memoria en caso de CPython).

>>> a = ''foo'' >>> b = ''foo'' >>> a is b True >> gen_matrix_df.REF[0] is gen_matrix_df.REF[6] True

Debido a este hecho, con respecto al tipo de object , el marco de datos asigna a lo sumo 20 cadenas (una por aminoácido). Sin embargo, vale la pena señalar que Pandas recomienda tipos categóricos para las enumeraciones.

Memoria de pandas

Así podemos explicar la estimación ingenua de 7.93 GiB como:

>>> rows = 4 * 10 ** 6 >>> int_cols = 2 >>> str_cols = 32 >>> int_size = 8 >>> str_size = 58 >>> ptr_size = 8 >>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30 7.927417755126953

Tenga en cuenta que str_size tiene 58 bytes, no 50 como hemos visto anteriormente para el literal de 1 carácter. Es porque PEP 393 define cadenas compactas y no compactas. Puede verificarlo con sys.getsizeof(gen_matrix_df.REF[0]) .

El consumo real de memoria debe ser ~ 1 GiB, como lo informa gen_matrix_df.info() , es el doble. Podemos asumir que tiene algo que ver con la asignación (pre) de memoria realizada por Pandas o NumPy. El siguiente experimento muestra que no es sin razón (varias ejecuciones muestran la imagen guardada):

Line # Mem usage Increment Line Contents ================================================ 8 53.1 MiB 53.1 MiB @profile 9 def main(): 10 53.1 MiB 0.0 MiB with open("genome_matrix_header.txt") as header: 11 53.1 MiB 0.0 MiB header = header.read().rstrip(''/n'').split(''/t'') 12 13 2070.9 MiB 2017.8 MiB gen_matrix_df = pd.read_csv(''genome_matrix_final-chr1234-1mb.txt'', sep=''/t'', names=header) 14 2071.2 MiB 0.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]]) 15 2071.2 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]]) 16 2040.7 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) ... 23 1827.1 MiB -30.5 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 24 1094.7 MiB -732.4 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 25 1765.9 MiB 671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 26 1094.7 MiB -671.3 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 27 1704.8 MiB 610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 28 1094.7 MiB -610.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 29 1643.9 MiB 549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 30 1094.7 MiB -549.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 31 1582.8 MiB 488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 32 1094.7 MiB -488.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 33 1521.9 MiB 427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 34 1094.7 MiB -427.2 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 35 1460.8 MiB 366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 36 1094.7 MiB -366.1 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) 37 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())]) ... 47 1094.7 MiB 0.0 MiB gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

Quiero terminar esta sección con una cita del nuevo artículo sobre problemas de diseño y futuras Pandas2 del autor original de Pandas.

regla de oro de los pandas: tenga de 5 a 10 veces más RAM que el tamaño de su conjunto de datos

Árbol de proceso

Vamos a la piscina, finalmente, y veamos si podemos hacer uso de la copia en escritura. Usaremos smemstat (disponible desde un repositorio de Ubuntu) para estimar el uso compartido de la memoria del grupo de procesos y glances para anotar la memoria libre en todo el sistema. Ambos pueden escribir JSON.

Ejecutaremos el script original con Pool(2) . Necesitaremos 3 ventanas de terminal.

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

Entonces mprof plot produce:

El gráfico de suma ( mprof run --nopython --include-children ./script.py ) se ve así:

Tenga en cuenta que los dos gráficos anteriores muestran RSS. La hipótesis es que, debido a la copia en escritura, no refleja el uso real de la memoria. Ahora tenemos dos archivos JSON de smemstat y glances . Voy a la siguiente secuencia de comandos para convertir los archivos JSON a CSV.

#!/usr/bin/env python3 import csv import sys import json def smemstat(): with open(''smemstat.json'') as f: smem = json.load(f) rows = [] fieldnames = set() for s in smem[''smemstat''][''periodic-samples'']: row = {} for ps in s[''smem-per-process'']: if ''script.py'' in ps[''command'']: for k in (''uss'', ''pss'', ''rss''): row[''{}-{}''.format(ps[''pid''], k)] = ps[k] // 2 ** 20 # smemstat produces empty samples, backfill from previous if rows: for k, v in rows[-1].items(): row.setdefault(k, v) rows.append(row) fieldnames.update(row.keys()) with open(''smemstat.csv'', ''w'') as out: dw = csv.DictWriter(out, fieldnames=sorted(fieldnames)) dw.writeheader() list(map(dw.writerow, rows)) def glances(): rows = [] fieldnames = [''available'', ''used'', ''cached'', ''mem_careful'', ''percent'', ''free'', ''mem_critical'', ''inactive'', ''shared'', ''history_size'', ''mem_warning'', ''total'', ''active'', ''buffers''] with open(''glances.csv'', ''w'') as out: dw = csv.DictWriter(out, fieldnames=fieldnames) dw.writeheader() with open(''glances.json'') as f: for l in f: d = json.loads(l) dw.writerow(d[''mem'']) if __name__ == ''__main__'': globals()[sys.argv[1]]()

Primero echemos un vistazo a free memoria free .

La diferencia entre el primero y el mínimo es ~ 4.15 GiB. Y aquí es cómo se ven las cifras de PSS:

Y la suma:

Por lo tanto, podemos ver que debido al consumo de memoria real de copia en escritura es ~ 4.15 GiB. Pero todavía estamos serializando los datos para enviarlos a los procesos de trabajo a través de Pool.map . ¿Podemos aprovechar la copia en escritura aquí también?

Datos compartidos

Para usar copy-on-write necesitamos que la list(gen_matrix_df_list.values()) sea ​​accesible globalmente para que el trabajador después de la bifurcación aún pueda leerla.

  1. Modifiquemos el código después de del gen_matrix_df en main como el siguiente:

    ... global global_gen_matrix_df_values global_gen_matrix_df_values = list(gen_matrix_df_list.values()) del gen_matrix_df_list p = Pool(2) result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values))) ...

  2. Quitar del gen_matrix_df_list que va más tarde.
  3. Y modifica las primeras líneas de matrix_to_vcf como:

    def matrix_to_vcf(i): matrix_df = global_gen_matrix_df_values[i]

Ahora volvamos a ejecutarlo. Memoria libre:

Árbol de proceso:

Y su suma:

Por lo tanto, estamos en un máximo de ~ 2.9 GiB de uso de la memoria real (el proceso principal de máxima actividad al crear el marco de datos) y la copia en escritura ha ayudado.

Como nota al margen, existe el llamado "copia en lectura", el comportamiento del recolector de basura del ciclo de referencia de Python, descrito en la Ingeniería de Instagram (que llevó a gc.freeze en la issue31558 ). Pero gc.disable() no tiene un impacto en este caso particular.

Actualizar

Una alternativa al intercambio de datos sin copia con copia en escritura puede delegarlo en el kernel desde el principio utilizando numpy.memmap . Aquí hay una implementación de ejemplo de High Performance Data Processing en Python Talk. La parte difícil es, entonces, hacer que Pandas use la matriz Numpy en formato mm.


Cuando use multiprocessing.Pool Se creará una serie de procesos secundarios utilizando la llamada al sistema fork() . Cada uno de esos procesos comienza con una copia exacta de la memoria del proceso principal en ese momento. Debido a que está cargando el csv antes de crear el Pool de tamaño 3, cada uno de esos 3 procesos en el grupo tendrá innecesariamente una copia del marco de datos. (tanto gen_matrix_df como gen_matrix_df_list existirán en el proceso actual así como en cada uno de los 3 procesos secundarios, por lo que 4 copias de cada una de estas estructuras estarán en la memoria)

Intente crear el Pool antes de cargar el archivo (en realidad al principio) Eso debería reducir el uso de la memoria.

Si todavía es demasiado alto, puedes:

  1. Volcar gen_matrix_df_list en un archivo, 1 elemento por línea, por ejemplo:

    import os import cPickle with open(''tempfile.txt'', ''w'') as f: for item in gen_matrix_df_list.items(): cPickle.dump(item, f) f.write(os.linesep)

  2. Use Pool.imap() en un iterador sobre las líneas que volcó en este archivo, por ejemplo:

    with open(''tempfile.txt'', ''r'') as f: p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))

    (Tenga en cuenta que matrix_to_vcf toma una tupla (key, value) en el ejemplo anterior, no solo un valor)

Espero que eso ayude.

NB: No he probado el código anterior. Sólo tiene la intención de demostrar la idea.


Tuve el mismo problema. Necesitaba procesar un gran corpus de texto mientras mantenía una base de conocimientos de algunos DataFrames de millones de filas cargadas en la memoria. Creo que este problema es común, así que mantendré mi respuesta orientada para fines generales.

Una combinación de configuraciones resolvió el problema por mí (1 y 3 y 5 solo pueden hacerlo por usted):

  1. Use Pool.imap (o imap_unordered ) en lugar de Pool.map . Esto iterará sobre los datos perezosamente que cargarlos todos en la memoria antes de comenzar el procesamiento.

  2. Establecer un valor para chunksize parámetro. Esto hará que imap más rápido también.

  3. Establecer un valor para el parámetro maxtasksperchild .

  4. Agregue la salida al disco que en la memoria. Al instante o en cualquier momento cuando alcanza un cierto tamaño.

  5. Ejecutar el código en diferentes lotes. Puede usar itertools.islice si tiene un iterador. La idea es dividir su list(gen_matrix_df_list.values()) en tres o más listas, luego pasa el primer tercio solo para map o imap , luego el segundo tercio en otra carrera, etc. Ya que tiene una lista, simplemente puede Cortarlo en la misma línea de código.


RESPUESTA GENERAL SOBRE LA MEMORIA CON MULTIPROCESAMIENTO

Usted preguntó: "¿Qué está causando que se asigne tanta memoria?" La respuesta se basa en dos partes.

Primero , como ya notó, cada trabajador de multiprocessing obtiene su propia copia de los datos (citados aquí ), por lo que debe dividir los argumentos grandes. O para archivos grandes, léalos de a poco a la vez, si es posible.

De forma predeterminada, los trabajadores del grupo son procesos reales de Python bifurcados utilizando el módulo de multiprocesamiento de la biblioteca estándar de Python cuando n_jobs! = 1. Los argumentos pasados ​​como entrada a la llamada paralela se serializan y reasignan en la memoria de cada proceso de trabajador.

Esto puede ser problemático para grandes argumentos, ya que los trabajadores los reasignarán n_jobs veces.

En segundo lugar , si está tratando de recuperar la memoria, debe comprender que Python funciona de manera diferente a otros idiomas, y confía en que para liberar la memoria cuando no lo haga . No sé si es lo mejor, pero en mi propio código, he superado esto al reasignar la variable a un objeto Ninguno o vacío.

PARA SU EJEMPLO ESPECÍFICO - EDICIÓN DE CÓDIGO MÍNIMO

Siempre que pueda guardar sus datos grandes en la memoria dos veces , creo que puede hacer lo que está tratando de hacer simplemente cambiando una sola línea. Escribí un código muy similar y me funcionó cuando reasigné la variable (vice call del o cualquier tipo de recolección de basura). Si esto no funciona, es posible que deba seguir las sugerencias anteriores y usar la E / S del disco:

#### earlier code all the same # clear memory by reassignment (not del or gc) gen_matrix_df = {} ''''''Now, pipe each dataframe from the list using map.Pool() '''''' p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) #del gen_matrix_df_list # I suspect you don''t even need this, memory will free when the pool is closed p.close() p.join() #### later code all the same

PARA SU EJEMPLO ESPECÍFICO - USO ÓPTIMO DE MEMORIA

Siempre y cuando puedas guardar tus datos grandes en la memoria una vez , y tengas alguna idea de qué tan grande es tu archivo, puedes usar la read_csv parcial de archivos de Pandas read_csv , para leer solo nows a la vez si realmente quieres micro-administrar la cantidad de datos que se leen o una [cantidad fija de memoria a la vez usando chunksize], que devuelve un iterador 5 . Con eso quiero decir, el parámetro nrows es solo una lectura: puede usar eso para obtener un vistazo a un archivo, o si por alguna razón quisiera que cada parte tuviera exactamente el mismo número de filas (porque, por ejemplo, Si alguno de sus datos son cadenas de longitud variable, cada fila no ocupará la misma cantidad de memoria). Pero creo que a los efectos de preparar un archivo para multiprocesamiento, será mucho más fácil usar fragmentos, porque eso se relaciona directamente con la memoria, que es su preocupación. Será más fácil usar el método de prueba y error para que quepa en la memoria en función de porciones de tamaño específico que el número de filas, lo que cambiará la cantidad de uso de la memoria según la cantidad de datos que haya en las filas. La única otra parte difícil es que por alguna razón específica de la aplicación, estás agrupando algunas filas, por lo que solo hace que sea un poco más complicado. Usando su código como ejemplo:

''''''load the genome matrix file onto pandas as dataframe. This makes is more easy for multiprocessing'''''' # store the splitted dataframes as list of key, values(pandas dataframe) pairs # this list of dataframe will be used while multiprocessing #not sure why you need the ordered dict here, might add memory overhead #gen_matrix_df_list = collections.OrderedDict() #a defaultdict won''t throw an exception when we try to append to it the first time. if you don''t want a default dict for some reason, you have to initialize each entry you care about. gen_matrix_df_list = collections.defaultdict(list) chunksize = 10 ** 6 for chunk in pd.read_csv(genome_matrix_file, sep=''/t'', names=header, chunksize=chunksize) # now, group the dataframe by chromosome/contig - so it can be multiprocessed gen_matrix_df = chunk.groupby(''CHROM'') for chr_, data in gen_matrix_df: gen_matrix_df_list[chr_].append(data) ''''''Having sorted chunks on read to a list of df, now create single data frames for each chr_'''''' #The dict contains a list of small df objects, so now concatenate them #by reassigning to the same dict, the memory footprint is not increasing for chr_ in gen_matrix_df_list.keys(): gen_matrix_df_list[chr_]=pd.concat(gen_matrix_df_list[chr_]) ''''''Now, pipe each dataframe from the list using map.Pool() '''''' p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) p.close() p.join()