una suma recorrer procesamiento pegar matrices imagenes imagen crear convertir con binario numpy pandas pytables h5py blaze

numpy - suma - Simulador de partículas de Python: procesamiento fuera del núcleo



recorrer una imagen opencv python (5)

Solución PyTable

Dado que la funcionalidad proporcionada por Pandas no es necesaria, y el procesamiento es mucho más lento (consulte el cuaderno a continuación), el mejor enfoque es utilizar PyTables o h5py directamente. He intentado sólo el enfoque pytables hasta ahora.

Todas las pruebas se realizaron en este cuaderno:

Introducción a las estructuras de datos de pytables.

Referencia: Documentos oficiales de PyTables

Pytables permite almacenar datos en archivos HDF5 en 2 tipos de formatos: matrices y tablas.

Arrays

Hay 3 tipos de matrices Array , CArray y EArray . Todos permiten almacenar y recuperar segmentos (multidimensionales) con una notación similar a la división de números.

# Write data to store (broadcasting works) array1[:] = 3 # Read data from store in_ram_array = array1[:]

Para la optimización en algunos casos de uso, CArray se guarda en "trozos", cuyo tamaño se puede elegir con chunk_shape en el momento de la creación.

CArray tamaño de la Array y la CArray se fija en el momento de la creación. Sin embargo, puede rellenar / escribir la matriz trozo por trozo después de la creación. A la inversa, EArray se puede ampliar con el método .append() .

Mesas

La table es una bestia bastante diferente. Es básicamente una "mesa". Solo tiene un índice 1D y cada elemento es una fila. Dentro de cada fila están los tipos de datos de "columnas", cada columna puede tener un tipo diferente. Si está familiarizado con las matrices de registros numpy , una tabla es básicamente una matriz de registros 1D, y cada elemento tiene muchos campos como columnas.

Las matrices numpy 1D o 2D se pueden almacenar en tablas, pero es un poco más complicado: necesitamos crear un tipo de datos de fila. Por ejemplo, para almacenar una matriz numpy 1D uint8 tenemos que hacer:

table_uint8 = np.dtype([(''field1'', ''u1'')]) table_1d = data_file.create_table(''/'', ''array_1d'', description=table_uint8)

Entonces, ¿por qué usar tablas? Porque, a diferencia de los arreglos, las tablas pueden ser consultadas eficientemente. Por ejemplo, si queremos buscar elementos> 3 en una enorme tabla basada en disco, podemos hacerlo:

index = table_1d.get_where_list(''field1 > 3'')

No solo es simple (en comparación con las matrices en las que necesitamos escanear todo el archivo en fragmentos y generar el index en un bucle), sino que también es extremadamente rápido.

Cómo almacenar parámetros de simulación

La mejor manera de almacenar los parámetros de simulación es usar un grupo (es decir, /parameters ), convertir cada escala escalar en una matriz numpy y almacenarla como CArray .

Array para " emission "

emission es la matriz más grande que se genera y se lee de forma secuencial. Para este patrón de uso, una buena estructura de datos es EArray . En los datos "simulados" con ~ 50% de ceros elementos, la compresión blosc ( level=5 ) alcanza una relación de compresión de 2.2x. Descubrí que un tamaño de trozo de 2 ^ 18 (256 k) tiene el tiempo de procesamiento mínimo.

Almacenando " counts "

El almacenamiento también de " counts " aumentará el tamaño del archivo en un 10% y tomará un 40% más de tiempo para calcular las marcas de tiempo. Tener counts almacenadas no es una ventaja en sí misma porque solo se necesitan las marcas de tiempo al final.

La ventaja es que recostrucir el índice (marcas de tiempo) es más sencillo porque consultamos el eje de tiempo completo en un solo comando ( .get_where_list(''counts >= 1'') ). A la inversa, con el procesamiento fragmentado, necesitamos realizar algunos análisis de índice que son un poco complicados, y tal vez una carga que mantener.

Sin embargo, la complejidad del código puede ser pequeña en comparación con todas las demás operaciones (clasificación y fusión) que se necesitan en ambos casos.

Almacenamiento de " timestamps "

Las marcas de tiempo se pueden acumular en la memoria RAM. Sin embargo, no sabemos el tamaño de las matrices antes de comenzar y se necesita una llamada final a hstack() para "fusionar" los diferentes fragmentos almacenados en una lista. Esto duplica los requisitos de memoria por lo que la RAM puede ser insuficiente

Podemos almacenar las marcas de tiempo a medida que avanzamos en una tabla utilizando .append() . Al final podemos cargar la tabla en la memoria con .read() . Esto es solo un 10% más lento que el cálculo de memoria total pero evita el requisito de "doble RAM". Además, podemos evitar la carga completa final y tener un uso de RAM mínimo.

H5Py

H5py es una biblioteca mucho más simple que pytables . Para este caso de uso de (principalmente) el procesamiento secuencial parece un mejor ajuste que pytables. La única característica que falta es la falta de compresión ''blosc''. Si esto resulta en una gran penalización de rendimiento queda por probar.

Descripción del problema

Al escribir un simulador de partículas de Monte Carlo (movimiento browniano y emisión de fotones) en python / numpy. Necesito guardar la salida de simulación (>> 10GB) en un archivo y procesar los datos en un segundo paso. La compatibilidad con Windows y Linux es importante.

El número de partículas ( n_particles ) es 10-100. El número de pasos de tiempo ( time_size ) es ~ 10 ^ 9.

La simulación tiene 3 pasos (el código a continuación es para una versión de todo en RAM):

  1. Simule (y almacene) una matriz de tasa de emission (contiene muchos elementos casi 0):

    • forma ( n_particles x time_size ), float32, tamaño 80GB
  2. Matriz de counts , (valores aleatorios de un proceso de Poisson con tasas calculadas previamente):

    • forma ( n_particles x time_size ), uint8, tamaño 20GB

      counts = np.random.poisson(lam=emission).astype(np.uint8)

  3. Encuentra marcas de tiempo (o índice) de cuentas. Las cuentas son casi siempre 0, por lo que las matrices de marca de tiempo cabrán en la RAM.

    # Loop across the particles timestamps = [np.nonzero(c) for c in counts]

Hago el paso 1 una vez, luego repito el paso 2-3 muchas veces (~ 100). En el futuro, es posible que tenga que preprocesar la emission (aplicar el cumsum u otras funciones) antes de computar los counts .

Pregunta

Tengo una implementación en memoria de trabajo y estoy tratando de entender cuál es el mejor enfoque para implementar una versión fuera de núcleo que pueda escalar a simulaciones (mucho) más largas.

Lo que me gustaría que existiera

Necesito guardar matrices en un archivo, y me gustaría usar un solo archivo para una simulación. También necesito una forma "simple" de almacenar y recuperar un diccionario de parámetros de simulación (escalares).

Idealmente, me gustaría una matriz numpy con respaldo de archivos que pueda preasignar y rellenar en partes. Luego, me gustaría que los métodos de matriz numpy ( max , cumsum , ...) funcionen de manera transparente, requiriendo solo una palabra clave chunksize para especificar la cantidad de la matriz a cargar en cada iteración.

Aún mejor, me gustaría un Numexpr que funcione no entre el caché y la RAM, sino entre la RAM y el disco duro.

¿Cuáles son las opciones prácticas?

Como una primera opción, comencé a experimentar con pyTables, pero no estoy contento con su complejidad y abstracciones (tan diferentes de numpy). Además, mi solución actual (lea a continuación) es MUY AGRADABLE y no es muy eficiente.

Así que mis opciones para las que busco una respuesta son:

  1. implementar una matriz numpy con la funcionalidad requerida (¿cómo?)

  2. use pytable de una manera más inteligente (diferentes estructuras de datos / métodos)

  3. usa otra biblioteca: h5py, blaze, pandas ... (no he probado ninguno de ellos hasta ahora).

Solución tentativa (pyTables)

Guardo los parámetros de simulación en el grupo ''/parameters'' : cada parámetro se convierte en un escalar de matriz numpy. Solución detallada pero funciona.

EArray emission como una matriz Extensible ( EArray ), porque genero los datos en trozos y necesito agregar cada trozo nuevo (aunque conozco el tamaño final). Guardar counts es más problemático. Si lo guarda como una matriz pytable, es difícil realizar consultas como "count> = 2". Por lo tanto, guardé las cuentas como tablas múltiples (una por partícula) [UGLY] y .get_where_list(''counts >= 2'') con .get_where_list(''counts >= 2'') . No estoy seguro de que esto sea eficiente en el espacio, y al generar todas estas tablas en lugar de usar una sola matriz, se reduce significativamente el archivo HDF5. Además, por extraño que parezca, crear esas tablas requiere crear un dtype personalizado (incluso para los dtypes numpy estándar):

dt = np.dtype([(''counts'', ''u1'')]) for ip in xrange(n_particles): name = "particle_%d" % ip data_file.create_table( group, name, description=dt, chunkshape=chunksize, expectedrows=time_size, title=''Binned timetrace of emitted ph (bin = t_step)'' '' - particle_%d'' % particle)

Cada "tabla" de conteos de partículas tiene un nombre diferente ( name = "particle_%d" % ip ) y es necesario que las coloque en una lista de python para facilitar la iteración.

EDITAR : El resultado de esta pregunta es un simulador de Brownian Motion llamado PyBroMo .


Consulte here cómo guardar sus parámetros en el archivo HDF5 (los encurtidos, por lo que puede almacenarlos como los tiene; su límite es de 64 kb en el tamaño del encurtido).

import pandas as pd import numpy as np n_particles = 10 chunk_size = 1000 # 1) create a new emission file, compressing as we go emission = pd.HDFStore(''emission.hdf'',mode=''w'',complib=''blosc'') # generate simulated data for i in range(10): df = pd.DataFrame(np.abs(np.random.randn(chunk_size,n_particles)),dtype=''float32'') # create a globally unique index (time) # http://.com/questions/16997048/how-does-one-append-large-amounts-of- data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397 try: nrows = emission.get_storer(''df'').nrows except: nrows = 0 df.index = pd.Series(df.index) + nrows emission.append(''df'',df) emission.close() # 2) create counts cs = pd.HDFStore(''counts.hdf'',mode=''w'',complib=''blosc'') # this is an iterator, can be any size for df in pd.read_hdf(''emission.hdf'',''df'',chunksize=200): counts = pd.DataFrame(np.random.poisson(lam=df).astype(np.uint8)) # set the index as the same counts.index = df.index # store the sum across all particles (as most are zero this will be a # nice sub-selector # better maybe to have multiple of these sums that divide the particle space # you don''t have to do this but prob more efficient # you can do this in another file if you want/need counts[''particles_0_4''] = counts.iloc[:,0:4].sum(1) counts[''particles_5_9''] = counts.iloc[:,5:9].sum(1) # make the non_zero column indexable cs.append(''df'',counts,data_columns=[''particles_0_4'',''particles_5_9'']) cs.close() # 3) find interesting counts print pd.read_hdf(''counts.hdf'',''df'',where=''particles_0_4>0'') print pd.read_hdf(''counts.hdf'',''df'',where=''particles_5_9>0'')

Alternativamente, puede hacer que cada partícula sea un data_column y seleccionarlas individualmente.

y algo de salida (emisión bastante activa en este caso :)

0 1 2 3 4 5 6 7 8 9 particles_0_4 particles_5_9 0 2 2 2 3 2 1 0 2 1 0 9 4 1 1 0 0 0 1 0 1 0 3 0 1 4 2 0 2 0 0 2 0 0 1 2 0 2 3 3 0 0 0 1 1 0 0 2 0 3 1 2 4 3 1 0 2 1 0 0 1 0 0 6 1 5 1 0 0 1 0 0 0 3 0 0 2 3 6 0 0 0 1 1 0 1 0 0 0 1 1 7 0 2 0 2 0 0 0 0 2 0 4 2 8 0 0 0 1 3 0 0 0 0 1 1 0 10 1 0 0 0 0 0 0 0 0 1 1 0 11 0 0 1 1 0 2 0 1 2 1 2 5 12 0 2 2 4 0 0 1 1 0 1 8 2 13 0 2 1 0 0 0 0 1 1 0 3 2 14 1 0 0 0 0 3 0 0 0 0 1 3 15 0 0 0 1 1 0 0 0 0 0 1 0 16 0 0 0 4 3 0 3 0 1 0 4 4 17 0 2 2 3 0 0 2 2 0 2 7 4 18 0 1 2 1 0 0 3 2 1 2 4 6 19 1 1 0 0 0 0 1 2 1 1 2 4 20 0 0 2 1 2 2 1 0 0 1 3 3 22 0 1 2 2 0 0 0 0 1 0 5 1 23 0 2 4 1 0 1 2 0 0 2 7 3 24 1 1 1 0 1 0 0 1 2 0 3 3 26 1 3 0 4 1 0 0 0 2 1 8 2 27 0 1 1 4 0 1 2 0 0 0 6 3 28 0 1 0 0 0 0 0 0 0 0 1 0 29 0 2 0 0 1 0 1 0 0 0 2 1 30 0 1 0 2 1 2 0 2 1 1 3 5 31 0 0 1 1 1 1 1 0 1 1 2 3 32 3 0 2 1 0 0 1 0 1 0 6 2 33 1 3 1 0 4 1 1 0 1 4 5 3 34 1 1 0 0 0 0 0 3 0 1 2 3 35 0 1 0 0 1 1 2 0 1 0 1 4 36 1 0 1 0 1 2 1 2 0 1 2 5 37 0 0 0 1 0 0 0 0 3 0 1 3 38 2 5 0 0 0 3 0 1 0 0 7 4 39 1 0 0 2 1 1 3 0 0 1 3 4 40 0 1 0 0 1 0 0 4 2 2 1 6 41 0 3 3 1 1 2 0 0 2 0 7 4 42 0 1 0 2 0 0 0 0 0 1 3 0 43 0 0 2 0 5 0 3 2 1 1 2 6 44 0 2 0 1 0 0 1 0 0 0 3 1 45 1 0 0 2 0 0 0 1 4 0 3 5 46 0 2 0 0 0 0 0 1 1 0 2 2 48 3 0 0 0 0 1 1 0 0 0 3 2 50 0 1 0 1 0 1 0 0 2 1 2 3 51 0 0 2 0 0 0 2 3 1 1 2 6 52 0 0 2 3 2 3 1 0 1 5 5 5 53 0 0 0 2 1 1 0 0 1 1 2 2 54 0 1 2 2 2 0 1 0 2 0 5 3 55 0 2 1 0 0 0 0 0 3 2 3 3 56 0 1 0 0 0 2 2 0 1 1 1 5 57 0 0 0 1 1 0 0 1 0 0 1 1 58 6 1 2 0 2 2 0 0 0 0 9 2 59 0 1 1 0 0 0 0 0 2 0 2 2 60 2 0 0 0 1 0 0 1 0 1 2 1 61 0 0 3 1 1 2 0 0 1 0 4 3 62 2 0 1 0 0 0 0 1 2 1 3 3 63 2 0 1 0 1 0 1 0 0 0 3 1 65 0 0 1 0 0 0 1 5 0 1 1 6 .. .. .. .. .. .. .. .. .. .. ... ... [9269 rows x 12 columns]


Dask.array puede realizar operaciones fragmentadas como max , cumsum , etc. en una matriz en disco como PyTables o h5py.

import h5py d = h5py.File(''myfile.hdf5'')[''/data''] import dask.array as da x = da.from_array(d, chunks=(1000, 1000))

X se ve y se siente como una matriz numpy y copia gran parte de la API. Las operaciones en x crearán un DAG de operaciones en memoria que se ejecutarán de manera eficiente utilizando la transmisión de múltiples núcleos desde el disco según sea necesario

da.exp(x).mean(axis=0).compute()

http://dask.pydata.org/en/latest/

conda install dask or pip install dask


Las pruebas de pytables vs pandas.HDFStore en la respuesta aceptada son completamente engañosas:

  • El primer error crítico es que el tiempo no aplicó os.fsync después del lavado, lo que hace que la prueba de velocidad sea inestable. Así que en algún momento, la función de pytables es accidentalmente mucho más rápida.

  • El segundo problema es que las versiones de pytables y pandas tienen formas completamente diferentes debido a una mala interpretación de la forma de pytables.EArray ''arg. El autor intenta agregar la columna a la versión de pytables, pero agregar la fila a la versión de pandas.

  • El 3er problema es que el autor usó diferentes chunkshape durante la comparación.

  • El autor también se olvidó de deshabilitar la generación del índice de la tabla durante store.append() que es un proceso lento.

La siguiente tabla muestra los resultados de rendimiento de su versión y mis arreglos. tbold es su versión de pytables, pdold es su versión de pandas. tbsync y pdsync son su versión con fsync() después de flush() y también deshabilitan la generación del índice de la tabla durante la adición. tbopt y pdopt son mi versión optimizada, con blosc:lz4 y complevel 9.

| name | dt | data size [MB] | comp ratio % | chunkshape | shape | clib | indexed | |:-------|-----:|-----------------:|---------------:|:-------------|:--------------|:----------------|:----------| | tbold | 5.11 | 300.00 | 84.63 | (15, 262144) | (15, 5242880) | blosc[5][1] | False | | pdold | 8.39 | 340.00 | 39.26 | (1927,) | (5242880,) | blosc[5][1] | True | | tbsync | 7.47 | 300.00 | 84.63 | (15, 262144) | (15, 5242880) | blosc[5][1] | False | | pdsync | 6.97 | 340.00 | 39.27 | (1927,) | (5242880,) | blosc[5][1] | False | | tbopt | 4.78 | 300.00 | 43.07 | (4369, 15) | (5242880, 15) | blosc:lz4[9][1] | False | | pdopt | 5.73 | 340.00 | 38.53 | (3855,) | (5242880,) | blosc:lz4[9][1] | False |

El pandas.HDFStore utiliza pytables bajo el capó. Por lo tanto, si los usamos correctamente, no deberían tener ninguna diferencia.

Podemos ver que la versión pandas tiene mayor tamaño de datos. Esto se debe a que los pandas utilizan pytables.Table en lugar de EArray. Y los pandas.DataFrame siempre tienen una columna de índice. La primera columna del objeto Tabla es este índice de marco de datos que requiere un espacio extra para guardar. Esto solo afecta un poco el rendimiento de IO pero proporciona más funciones, como consultas fuera de núcleo. Así que todavía recomiendo pandas aquí. @MRocklin también mencionó un paquete más agradable de núcleo fuera de núcleo, si la mayoría de las funciones que usó son solo operaciones de matriz en lugar de una consulta de tipo tabla. Pero el rendimiento de IO no tendrá diferencia distinguible.

h5f.root.emission._v_attrs Out[82]: /emission._v_attrs (AttributeSet), 15 attributes: [CLASS := ''GROUP'', TITLE := '''', VERSION := ''1.0'', data_columns := [], encoding := ''UTF-8'', index_cols := [(0, ''index'')], info := {1: {''names'': [None], ''type'': ''RangeIndex''}, ''index'': {}}, levels := 1, metadata := [], nan_rep := ''nan'', non_index_axes := [(1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])], pandas_type := ''frame_table'', pandas_version := ''0.15.2'', table_type := ''appendable_frame'', values_cols := [''values_block_0'']]

Aquí están las funciones:

def generate_emission(shape): """Generate fake emission.""" emission = np.random.randn(*shape).astype(''float32'') - 1 emission.clip(0, 1e6, out=emission) assert (emission >=0).all() return emission def test_puretb_earray(outpath, n_particles = 15, time_chunk_size = 2**18, n_iter = 20, sync = True, clib = ''blosc'', clevel = 5, ): time_size = n_iter * time_chunk_size data_file = pytb.open_file(outpath, mode="w") comp_filter = pytb.Filters(complib=clib, complevel=clevel) emission = data_file.create_earray(''/'', ''emission'', atom=pytb.Float32Atom(), shape=(n_particles, 0), chunkshape=(n_particles, time_chunk_size), expectedrows=n_iter * time_chunk_size, filters=comp_filter) # generate simulated emission data t0 =time() for i in range(n_iter): emission_chunk = generate_emission((n_particles, time_chunk_size)) emission.append(emission_chunk) emission.flush() if sync: os.fsync(data_file.fileno()) data_file.close() t1 = time() return t1-t0 def test_puretb_earray2(outpath, n_particles = 15, time_chunk_size = 2**18, n_iter = 20, sync = True, clib = ''blosc'', clevel = 5, ): time_size = n_iter * time_chunk_size data_file = pytb.open_file(outpath, mode="w") comp_filter = pytb.Filters(complib=clib, complevel=clevel) emission = data_file.create_earray(''/'', ''emission'', atom=pytb.Float32Atom(), shape=(0, n_particles), expectedrows=time_size, filters=comp_filter) # generate simulated emission data t0 =time() for i in range(n_iter): emission_chunk = generate_emission((time_chunk_size, n_particles)) emission.append(emission_chunk) emission.flush() if sync: os.fsync(data_file.fileno()) data_file.close() t1 = time() return t1-t0 def test_purepd_df(outpath, n_particles = 15, time_chunk_size = 2**18, n_iter = 20, sync = True, clib=''blosc'', clevel=5, autocshape=False, oldversion=False, ): time_size = n_iter * time_chunk_size emission = pd.HDFStore(outpath, mode=''w'', complib=clib, complevel=clevel) # generate simulated data t0 =time() for i in range(n_iter): # Generate fake emission emission_chunk = generate_emission((time_chunk_size, n_particles)) df = pd.DataFrame(emission_chunk, dtype=''float32'') # create a globally unique index (time) # http://.com/questions/16997048/how-does-one-append-large- # amounts-of-data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397 try: nrows = emission.get_storer(''emission'').nrows except: nrows = 0 df.index = pd.Series(df.index) + nrows if autocshape: emission.append(''emission'', df, index=False, expectedrows=time_size ) else: if oldversion: emission.append(''emission'', df) else: emission.append(''emission'', df, index=False) emission.flush(fsync=sync) emission.close() t1 = time() return t1-t0 def _test_puretb_earray_nosync(outpath): return test_puretb_earray(outpath, sync=False) def _test_purepd_df_nosync(outpath): return test_purepd_df(outpath, sync=False, oldversion=True ) def _test_puretb_earray_opt(outpath): return test_puretb_earray2(outpath, sync=False, clib=''blosc:lz4'', clevel=9 ) def _test_purepd_df_opt(outpath): return test_purepd_df(outpath, sync=False, clib=''blosc:lz4'', clevel=9, autocshape=True ) testfns = { ''tbold'':_test_puretb_earray_nosync, ''pdold'':_test_purepd_df_nosync, ''tbsync'':test_puretb_earray, ''pdsync'':test_purepd_df, ''tbopt'': _test_puretb_earray_opt, ''pdopt'': _test_purepd_df_opt, }