with large error data big python mongodb pandas hdf5 large-data

python - large - Flujos de trabajo de "datos grandes" usando pandas



pandas memory error (13)

Como han señalado otros, después de algunos años ha surgido un equivalente de pandas "fuera de núcleo": dask . Aunque dask no es un reemplazo directo de pandas y toda su funcionalidad, se destaca por varias razones:

Dask es una biblioteca de computación paralela flexible para la computación analítica que está optimizada para la programación dinámica de tareas para cargas de trabajo computacionales interactivas de colecciones de "Big Data" como matrices paralelas, marcos de datos y listas que extienden las interfaces comunes como NumPy, Pandas, o iteradores de Python hasta La memoria o los entornos distribuidos y las escalas de las computadoras portátiles a los clústeres.

Dask enfatiza las siguientes virtudes:

  • Familiar: proporciona una matriz NumPy en paralelo y objetos Pandas DataFrame
  • Flexible: proporciona una interfaz de programación de tareas para más cargas de trabajo personalizadas e integración con otros proyectos.
  • Nativo: habilita la computación distribuida en Pure Python con acceso a la pila PyData.
  • Rápido: opera con baja sobrecarga, baja latencia y mínima serialización necesaria para algoritmos numéricos rápidos
  • Escala hacia arriba: se ejecuta de forma resistente en grupos con miles de núcleos Escalas hacia abajo: trivial para configurar y ejecutar en una computadora portátil en un solo proceso
  • Responsivo: diseñado teniendo en cuenta la computación interactiva, brinda retroalimentación y diagnósticos rápidos para ayudar a los humanos

y para agregar un ejemplo de código simple:

import dask.dataframe as dd df = dd.read_csv(''2015-*-*.csv'') df.groupby(df.user_id).value.mean().compute()

Reemplaza un código de pandas como este:

import pandas as pd df = pd.read_csv(''2015-01-01.csv'') df.groupby(df.user_id).value.mean()

y, especialmente notable, proporciona a través de la interfaz concurrent.futures un general para el envío de tareas personalizadas:

from dask.distributed import Client client = Client(''scheduler:port'') futures = [] for fn in filenames: future = client.submit(load, fn) futures.append(future) summary = client.submit(summarize, futures) summary.result()

He intentado descifrar una respuesta a esta pregunta durante muchos meses mientras aprendía pandas. Utilizo SAS para mi trabajo diario y es excelente por su soporte fuera de núcleo. Sin embargo, SAS es horrible como una pieza de software por muchas otras razones.

Un día espero reemplazar mi uso de SAS con python y pandas, pero actualmente carezco de un flujo de trabajo fuera del núcleo para grandes conjuntos de datos. No estoy hablando de "big data" que requiere una red distribuida, sino de archivos demasiado grandes para que quepan en la memoria pero lo suficientemente pequeños para que quepan en un disco duro.

Mi primer pensamiento es usar HDFStore para mantener grandes conjuntos de datos en el disco y jalar solo las piezas que necesito en los marcos de datos para el análisis. Otros han mencionado a MongoDB como una alternativa más fácil de usar. Mi pregunta es la siguiente:

¿Cuáles son algunos flujos de trabajo de mejores prácticas para lograr lo siguiente:

  1. Cargar archivos planos en una estructura de base de datos permanente en disco
  2. Consultar esa base de datos para recuperar datos para alimentar una estructura de datos de pandas
  3. Actualizando la base de datos luego de manipular piezas en pandas.

Los ejemplos del mundo real serían muy apreciados, especialmente de cualquiera que use pandas en "datos grandes".

Editar - un ejemplo de cómo me gustaría que esto funcione:

  1. Importe de forma iterativa un archivo plano grande y almacénelo en una estructura de base de datos permanente en el disco. Estos archivos suelen ser demasiado grandes para caber en la memoria.
  2. Para utilizar Pandas, me gustaría leer subconjuntos de estos datos (generalmente solo unas pocas columnas a la vez) que pueden caber en la memoria.
  3. Yo crearía nuevas columnas realizando varias operaciones en las columnas seleccionadas.
  4. Luego tendría que agregar estas nuevas columnas a la estructura de la base de datos.

Estoy tratando de encontrar la mejor manera de realizar estos pasos. Al leer los enlaces sobre pandas y pytables, parece que agregar una nueva columna podría ser un problema.

Editar - Respondiendo específicamente a las preguntas de Jeff:

  1. Estoy construyendo modelos de riesgo de crédito al consumo. Los tipos de datos incluyen teléfono, SSN y características de dirección; valores de propiedad; información despectiva como antecedentes penales, quiebras, etc. Los conjuntos de datos que uso todos los días tienen un promedio de casi 1.000 a 2.000 campos de datos mixtos: variables continuas, nominales y ordinales de datos numéricos y de caracteres. Rara vez agrego filas, pero sí realizo muchas operaciones que crean nuevas columnas.
  2. Las operaciones típicas implican combinar varias columnas usando la lógica condicional en una nueva columna compuesta. Por ejemplo, if var1 > 2 then newvar = ''A'' elif var2 = 4 then newvar = ''B'' . El resultado de estas operaciones es una nueva columna para cada registro en mi conjunto de datos.
  3. Finalmente, me gustaría agregar estas nuevas columnas a la estructura de datos en disco. Repetiría el paso 2, explorando los datos con tablas de referencias cruzadas y estadísticas descriptivas, tratando de encontrar relaciones interesantes e intuitivas para modelar.
  4. Un archivo de proyecto típico suele ser de aproximadamente 1 GB. Los archivos se organizan de tal manera donde una fila consiste en un registro de datos del consumidor. Cada fila tiene el mismo número de columnas para cada registro. Este siempre será el caso.
  5. Es bastante raro que yo subordiese por filas al crear una nueva columna. Sin embargo, es bastante común para mí crear subconjuntos en filas al crear informes o generar estadísticas descriptivas. Por ejemplo, es posible que desee crear una frecuencia simple para una línea específica de negocios, por ejemplo, las tarjetas de crédito de Retail. Para hacer esto, seleccionaría solo aquellos registros donde la línea de negocio = minorista, además de las columnas sobre las que quiero informar. Sin embargo, al crear nuevas columnas, obtendría todas las filas de datos y solo las columnas que necesito para las operaciones.
  6. El proceso de modelado requiere que analice cada columna, busque relaciones interesantes con alguna variable de resultado y cree nuevas columnas compuestas que describan esas relaciones. Las columnas que exploro se hacen generalmente en pequeños conjuntos. Por ejemplo, me centraré en un conjunto de, por ejemplo, 20 columnas que solo se ocupan de los valores de propiedad y observarán cómo se relacionan con el incumplimiento de un préstamo. Una vez que se exploran y se crean nuevas columnas, paso a otro grupo de columnas, digamos educación universitaria y repito el proceso. Lo que estoy haciendo es crear variables candidatas que expliquen la relación entre mis datos y algún resultado. Al final de este proceso, aplico algunas técnicas de aprendizaje que crean una ecuación a partir de esas columnas compuestas.

Es raro que alguna vez agregue filas al conjunto de datos. Casi siempre crearé nuevas columnas (variables o características en las estadísticas / lenguaje de aprendizaje automático).


Considere Ruffus si sigue la ruta simple de crear un flujo de datos que se divide en varios archivos más pequeños.


Creo que a las respuestas anteriores les falta un enfoque simple que he encontrado muy útil.

Cuando tengo un archivo que es demasiado grande para cargar en la memoria, divido el archivo en varios archivos más pequeños (ya sea por fila o columnas)

Ejemplo: en el caso de datos comerciales de 30 días de tamaño de ~ 30GB, lo rompo en un archivo por día de tamaño de ~ 1GB. Posteriormente, proceso cada archivo por separado y agrego resultados al final.

Una de las mayores ventajas es que permite el procesamiento paralelo de los archivos (ya sea múltiples procesos o subprocesos)

La otra ventaja es que la manipulación de archivos (como agregar / eliminar fechas en el ejemplo) se puede lograr con comandos de shell regulares, lo que no es posible en formatos de archivos más avanzados / complicados.

Este enfoque no cubre todos los escenarios, pero es muy útil en muchos de ellos.


Descubrí esto un poco tarde, pero trabajo con un problema similar (modelos de pago anticipado de hipotecas). Mi solución ha sido omitir la capa HDFStore de pandas y usar pytables rectas. Guardo cada columna como una matriz HDF5 individual en mi archivo final.

Mi flujo de trabajo básico es obtener primero un archivo CSV de la base de datos. Lo gzip, por lo que no es tan grande. Luego lo convierto en un archivo HDF5 orientado a filas, iterándolo sobre Python, convirtiendo cada fila en un tipo de datos real y escribiéndolo en un archivo HDF5. Esto toma algunas decenas de minutos, pero no usa ninguna memoria, ya que solo funciona fila por fila. Luego "transpongo" el archivo HDF5 orientado a filas a un archivo HDF5 orientado a columnas.

La tabla de transposición se ve como:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"): # Get a reference to the input data. tb = h_in.getNode(table_path) # Create the output group to hold the columns. grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1)) for col_name in tb.colnames: logger.debug("Processing %s", col_name) # Get the data. col_data = tb.col(col_name) # Create the output array. arr = h_out.createCArray(grp, col_name, tables.Atom.from_dtype(col_data.dtype), col_data.shape) # Store the data. arr[:] = col_data h_out.flush()

Leyéndolo de nuevo entonces parece:

def read_hdf5(hdf5_path, group_path="/data", columns=None): """Read a transposed data set from a HDF5 file.""" if isinstance(hdf5_path, tables.file.File): hf = hdf5_path else: hf = tables.openFile(hdf5_path) grp = hf.getNode(group_path) if columns is None: data = [(child.name, child[:]) for child in grp] else: data = [(child.name, child[:]) for child in grp if child.name in columns] # Convert any float32 columns to float64 for processing. for i in range(len(data)): name, vec = data[i] if vec.dtype == np.float32: data[i] = (name, vec.astype(np.float64)) if not isinstance(hdf5_path, tables.file.File): hf.close() return pd.DataFrame.from_items(data)

Ahora, generalmente ejecuto esto en una máquina con una tonelada de memoria, así que puede que no sea lo suficientemente cuidadoso con mi uso de memoria. Por ejemplo, de forma predeterminada, la operación de carga lee todo el conjunto de datos.

Esto generalmente funciona para mí, pero es un poco torpe, y no puedo usar la magia de pytables de lujo.

Edición: La ventaja real de este enfoque, sobre la configuración predeterminada de pytables de la matriz de registros, es que luego puedo cargar los datos en R usando h5r, que no puede manejar tablas. O, al menos, no he podido hacer que cargue tablas heterogéneas.


Este es el caso de pymongo. También he hecho un prototipo utilizando un servidor SQL, SQL, HDF, ORM (SQLAlchemy) en Python. En primer lugar, pymongo es un DB basado en documentos, por lo que cada persona sería un documento ( dict de atributos). Muchas personas forman una colección y usted puede tener muchas colecciones (personas, bolsa, ingresos).

pd.dateframe -> pymongo Nota: uso chunksize en read_csv para mantenerlo en registros de 5 a 10k (pymongo deja caer el socket si es más grande)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

consultando: gt = mayor que ...

pd.DataFrame(list(mongoCollection.find({''anAttribute'':{''$gt'':2887000, ''$lt'':2889000}})))

.find() devuelve un iterador por lo que comúnmente uso ichunked para cortar iteradores más pequeños.

¿Qué tal una unión ya que normalmente obtengo 10 fuentes de datos para pegar juntas?

aJoinDF = pandas.DataFrame(list(mongoCollection.find({''anAttribute'':{''$in'':Att_Keys}})))

luego (en mi caso, a veces tengo que aJoinDF en aJoinDF primero antes de que sea "fusionable").

df = pandas.merge(df, aJoinDF, on=aKey, how=''left'')

Y luego puede escribir la nueva información en su colección principal a través del método de actualización a continuación. (colección lógica vs fuentes de datos físicas).

collection.update({primarykey:foo},{key:change})

En búsquedas más pequeñas, simplemente denormalize. Por ejemplo, tiene un código en el documento y simplemente agrega el texto del código de campo y realiza una búsqueda de dict mientras crea documentos.

Ahora que tiene un buen conjunto de datos basado en una persona, puede desatar su lógica en cada caso y crear más atributos. Finalmente, puede leer en pandas sus 3 a los indicadores clave de memoria máxima y hacer pivots / agg / exploración de datos. Esto me funciona para 3 millones de registros con números / texto grande / categorías / códigos / flotadores / ...

También puede utilizar los dos métodos integrados en MongoDB (MapReduce y el marco agregado). Consulte aquí para obtener más información sobre el marco agregado , ya que parece ser más fácil que MapReduce y parece útil para un rápido trabajo agregado. Tenga en cuenta que no necesitaba definir mis campos o relaciones, y puedo agregar elementos a un documento. En el estado actual del rápido cambio de números, pandas, conjunto de herramientas de Python, MongoDB me ayuda a trabajar :)


Hay ahora, dos años después de la pregunta, un equivalente de pandas ''fuera de núcleo'': dask . ¡Es excelente! Aunque no es compatible con toda la funcionalidad de los pandas, puedes llegar muy lejos con ella.


Recientemente me encontré con un problema similar. Encontré simplemente leer los datos en trozos y adjuntarlos a medida que los escribo en trozos para que el mismo csv funcione bien. Mi problema fue agregar una columna de fecha basada en la información en otra tabla, usando el valor de ciertas columnas de la siguiente manera. Esto puede ayudar a aquellos confundidos por dask y hdf5 pero más familiarizados con pandas como yo.

def addDateColumn(): """Adds time to the daily rainfall data. Reads the csv as chunks of 100k rows at a time and outputs them, appending as needed, to a single csv. Uses the column of the raster names to get the date. """ df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, chunksize=100000) #read csv file as 100k chunks ''''''Do some stuff'''''' count = 1 #for indexing item in time list for chunk in df: #for each 100k rows newtime = [] #empty list to append repeating times for different rows toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time while count <= toiterate.max(): for i in toiterate: if i ==count: newtime.append(newyears[count]) count+=1 print "Finished", str(chunknum), "chunks" chunk["time"] = newtime #create new column in dataframe based on time outname = "CHIRPS_tanz_time2.csv" #append each output to same csv, using no header chunk.to_csv(pathlist[2]+outname, mode=''a'', header=None, index=None)


Rutinariamente utilizo decenas de gigabytes de datos de esta manera, por ejemplo, tengo tablas en el disco que leo a través de consultas, creo datos y agrego de nuevo.

Vale la pena leer los documentos y más adelante en este hilo para varias sugerencias sobre cómo almacenar sus datos.

Detalles que afectarán la forma en que almacena sus datos, como:
Dé tantos detalles como pueda; Y puedo ayudarte a desarrollar una estructura.

  1. Tamaño de los datos, # de filas, columnas, tipos de columnas; ¿estás agregando filas, o simplemente columnas?
  2. ¿Cómo serán las operaciones típicas? Por ejemplo, haga una consulta en las columnas para seleccionar un grupo de filas y columnas específicas, luego realice una operación (en memoria), cree nuevas columnas, guárdelas.
    (Dar un ejemplo de juguete podría permitirnos ofrecer recomendaciones más específicas).
  3. Después de ese procesamiento, ¿entonces qué haces? ¿El paso 2 es ad hoc o repetible?
  4. Ingrese archivos planos: cuántos, tamaño total aproximado en Gb. ¿Cómo se organizan, por ejemplo, por registros? ¿Cada uno contiene campos diferentes, o tienen algunos registros por archivo con todos los campos en cada archivo?
  5. ¿Alguna vez seleccionó subconjuntos de filas (registros) según criterios (por ejemplo, seleccione las filas con el campo A> 5)? y luego hacer algo, o simplemente seleccionas los campos A, B, C con todos los registros (y luego haces algo)?
  6. ¿Trabaja en todas sus columnas (en grupos) o existe una buena proporción que solo puede usar para los informes (por ejemplo, desea mantener los datos, pero no necesita extraer esa columna de forma explícita hasta resultados finales tiempo)?

Solución

Asegúrate de tener pandas al menos 0.10.1 instalado.

Lea los archivos iterados por partes y las consultas de varias tablas .

Dado que pytables está optimizado para operar en filas (que es lo que usted consulta), crearemos una tabla para cada grupo de campos. De esta manera, es fácil seleccionar un pequeño grupo de campos (que funcionará con una mesa grande, pero es más eficiente hacerlo de esta manera ... Creo que puedo solucionar esta limitación en el futuro ... esto es más intuitivo de todos modos):
(El siguiente es pseudocódigo.)

import numpy as np import pandas as pd # create a store store = pd.HDFStore(''mystore.h5'') # this is the key to your storage: # this maps your fields to a specific group, and defines # what you want to have as data_columns. # you might want to create a nice class wrapping this # (as you will want to have this map and its inversion) group_map = dict( A = dict(fields = [''field_1'',''field_2'',.....], dc = [''field_1'',....,''field_5'']), B = dict(fields = [''field_10'',...... ], dc = [''field_10'']), ..... REPORTING_ONLY = dict(fields = [''field_1000'',''field_1001'',...], dc = []), ) group_map_inverted = dict() for g, v in group_map.items(): group_map_inverted.update(dict([ (f,g) for f in v[''fields''] ]))

Leyendo los archivos y creando el almacenamiento (básicamente haciendo lo que hace append_to_multiple ):

for f in files: # read in the file, additional options hmay be necessary here # the chunksize is not strictly necessary, you may be able to slurp each # file into memory in which case just eliminate this part of the loop # (you can also change chunksize if necessary) for chunk in pd.read_table(f, chunksize=50000): # we are going to append to each table by group # we are not going to create indexes at this time # but we *ARE* going to create (some) data_columns # figure out the field groupings for g, v in group_map.items(): # create the frame for this group frame = chunk.reindex(columns = v[''fields''], copy = False) # append it store.append(g, frame, index=False, data_columns = v[''dc''])

Ahora tiene todas las tablas en el archivo (en realidad, podría almacenarlas en archivos separados si lo desea, probablemente tendría que agregar el nombre de archivo al mapa_grupo, pero probablemente no sea necesario).

Así es como obtienes columnas y creas nuevas:

frame = store.select(group_that_I_want) # you can optionally specify: # columns = a list of the columns IN THAT GROUP (if you wanted to # select only say 3 out of the 20 columns in this sub-table) # and a where clause if you want a subset of the rows # do calculations on this frame new_frame = cool_function_on_frame(frame) # to ''add columns'', create a new group (you probably want to # limit the columns in this new_group to be only NEW ones # (e.g. so you don''t overlap from the other tables) # add this info to the group_map store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

Cuando esté listo para postproceso:

# This may be a bit tricky; and depends what you are actually doing. # I may need to modify this function to be a bit more general: report_data = store.select_as_multiple([groups_1,groups_2,.....], where =[''field_1>0'', ''field_1000=foo''], selector = group_1)

Acerca de data_columns, no es necesario definir CUALQUIER data_columns; le permiten sub-seleccionar filas basadas en la columna. Por ejemplo, algo como:

store.select(group, where = [''field_1000=foo'', ''field_1001>0''])

Es posible que sean más interesantes para usted en la etapa final de generación de informes (esencialmente, una columna de datos se separa de otras columnas, lo que podría afectar un poco a la eficiencia si define mucho).

También es posible que desee:

  • cree una función que tome una lista de campos, busque los grupos en groups_map, luego los seleccione y concatene los resultados para que obtenga el marco resultante (esto es esencialmente lo que hace select_as_multiple). De esta manera la estructura sería bastante transparente para ti.
  • índices en ciertas columnas de datos (hace que el subconjunto de filas sea mucho más rápido).
  • habilitar la compresión.

¡Avísame cuando tengas preguntas!


Sé que este es un tema antiguo, pero creo que vale la pena echar un vistazo a la biblioteca de Blaze . Está construido para este tipo de situaciones.

De la documentación:

Blaze extiende la usabilidad de NumPy y Pandas a la computación distribuida y fuera de núcleo. Blaze proporciona una interfaz similar a la de NumPy ND-Array o Pandas DataFrame, pero asigna estas interfaces familiares a una variedad de otros motores computacionales como Postgres o Spark.

Edición: Por cierto, es compatible con ContinuumIO y Travis Oliphant, autor de NumPy.


Si sus conjuntos de datos tienen entre 1 y 20 GB, debe obtener una estación de trabajo con 48 GB de RAM. Entonces Pandas puede mantener todo el conjunto de datos en la memoria RAM. Sé que no es la respuesta que está buscando aquí, pero hacer computación científica en una notebook con 4GB de RAM no es razonable.


Un truco que encontré útil para los casos de uso de datos grandes es reducir el volumen de los datos al reducir la precisión de flotación a 32 bits. No es aplicable en todos los casos, pero en muchas aplicaciones la precisión de 64 bits es excesiva y los ahorros de memoria 2x valen la pena. Para hacer un punto obvio aún más obvio:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5)) >>> df.info() <class ''pandas.core.frame.DataFrame''> RangeIndex: 100000000 entries, 0 to 99999999 Data columns (total 5 columns): ... dtypes: float64(5) memory usage: 3.7 GB >>> df.astype(np.float32).info() <class ''pandas.core.frame.DataFrame''> RangeIndex: 100000000 entries, 0 to 99999999 Data columns (total 5 columns): ... dtypes: float32(5) memory usage: 1.9 GB


Una variación más

Muchas de las operaciones realizadas en pandas también se pueden realizar como una consulta db (sql, mongo)

El uso de un RDBMS o mongodb le permite realizar algunas de las agregaciones en la consulta de base de datos (que está optimizada para datos grandes, y utiliza la caché y los índices de manera eficiente)

Más tarde, puede realizar el procesamiento posterior utilizando pandas.

La ventaja de este método es que obtiene las optimizaciones de la base de datos para trabajar con datos grandes, mientras sigue definiendo la lógica en una sintaxis declarativa de alto nivel, y sin tener que lidiar con los detalles de decidir qué hacer en la memoria y qué hacer. de núcleo.

Y aunque el lenguaje de consulta y los pandas son diferentes, generalmente no es complicado traducir parte de la lógica de uno a otro.


Vale la pena mencionar aquí Ray también,
Es un marco de cálculo distribuido, que tiene su propia implementación para pandas de manera distribuida.

Simplemente reemplace la importación de pandas, y el código debería funcionar como está:

# import pandas as pd import ray.dataframe as pd #use pd as usual

Puede leer más detalles aquí:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/