python performance dataframe dask

python - Función len lenta en dataframe distribuido por dask



dask dataframe api (1)

Buena pregunta, esto nos lleva a algunos puntos sobre cuándo los datos se están moviendo hacia el clúster y regresando al cliente (su sesión de python). Veamos algunas etapas de tu compuación

Cargar datos con pandas

Este es un marco de datos de Pandas en su sesión de Python, por lo que obviamente todavía está en su proceso local.

log = pd.read_csv(''800000test'', sep=''/t'') # on client

Convierta a un Dask.dataframe perezoso

Esto descompone su marco de datos de Pandas en veinte marcos de datos de Pandas, sin embargo, estos todavía están en el cliente. Los dataframes Dask no envían datos con entusiasmo al clúster.

logd = dd.from_pandas(log,npartitions=20) # still on client

Compute len

Llamar a len realmente causa cómputo aquí (normalmente usaría df.some_aggregation().compute() Entonces ahora Dask entra en acción. Primero mueve sus datos al clúster (lento) y luego llama a len en todas las 20 particiones ( rápido), agrega esos (rápido) y luego mueve el resultado a su cliente para que pueda imprimir.

print(len(logd)) # costly roundtrip client -> cluster -> client

Análisis

Entonces, el problema aquí es que nuestro dask.dataframe aún tenía todos sus datos en la sesión python local.

Habría sido mucho más rápido usar, por ejemplo, el programador local de hilos en lugar del programador distribuido. Esto debe calcularse en milisegundos

with dask.set_options(get=dask.threaded.get): # no cluster, just local threads print(len(logd)) # stays on client

Pero, presumiblemente, desea saber cómo escalar a conjuntos de datos más grandes, así que hagamos esto de la manera correcta.

Carga tus datos en los trabajadores

En lugar de cargar con Pandas en su sesión cliente / local, permita que los trabajadores de Dask carguen bits del archivo csv. De esta forma, no es necesaria la comunicación cliente-trabajador.

# log = pd.read_csv(''800000test'', sep=''/t'') # on client log = dd.read_csv(''800000test'', sep=''/t'') # on cluster workers

Sin embargo, a diferencia de pd.read_csv , dd.read_csv es flojo, por lo que debería volver casi de inmediato. Podemos forzar a Dask a hacer el cálculo con el método persist

log = client.persist(log) # triggers computation asynchronously

Ahora el clúster entra en acción y carga sus datos directamente en los trabajadores. Esto es relativamente rápido. Tenga en cuenta que este método regresa inmediatamente mientras el trabajo ocurre en segundo plano. Si quiere esperar hasta que termine, llame a wait .

from dask.distributed import wait wait(log) # blocks until read is done

Si está probando con un pequeño conjunto de datos y desea obtener más particiones, intente cambiar el tamaño del bloque.

log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks

De todos modos, las operaciones en el log ahora deberían ser rápidas

len(log) # fast

Editar

En respuesta a una pregunta en este blog aquí, están las suposiciones que estamos haciendo sobre dónde vive el archivo.

En general, cuando proporciona un nombre de archivo a dd.read_csv , asume que ese archivo es visible de todos los trabajadores. Esto es cierto si está utilizando un sistema de archivos de red o una tienda global como S3 o HDFS. Si está utilizando un sistema de archivos de red, querrá usar rutas absolutas (como /path/to/myfile.*.csv ) o bien asegurarse de que sus trabajadores y clientes tengan el mismo directorio de trabajo.

Si este no es el caso, y sus datos solo están en su máquina cliente, entonces tendrá que cargarlos y dispersarlos.

Simple pero subóptimo

La manera simple es hacer lo que hiciste originalmente, pero persiste tu dask.dataframe

log = pd.read_csv(''800000test'', sep=''/t'') # on client logd = dd.from_pandas(log,npartitions=20) # still on client logd = client.persist(logd) # moves to workers

Esto está bien, pero da como resultado una comunicación un poco menos que ideal.

Complejo pero óptimo

En cambio, es posible que disemine sus datos al clúster explícitamente

[future] = client.scatter([log])

Esto se mete en una API más compleja, así que solo te guiaré a los documentos

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/ retrayed-collections.html

He estado probando cómo usar dask (clúster con 20 núcleos) y me sorprende la velocidad con la que llamo llamar a una función len frente a cortar a través de loc.

import dask.dataframe as dd from dask.distributed import Client client = Client(''192.168.1.220:8786'') log = pd.read_csv(''800000test'', sep=''/t'') logd = dd.from_pandas(log,npartitions=20) #This is the code than runs slowly #(2.9 seconds whilst I would expect no more than a few hundred millisencods) print(len(logd)) #Instead this code is actually running almost 20 times faster than pandas logd.loc[:''Host''].count().compute()

¿Alguna idea de por qué esto podría estar pasando? No es importante para mí que Len funcione rápido, pero siento que al no entender este comportamiento, hay algo que no entiendo de la biblioteca.

Todos los cuadros verdes corresponden a "from_pandas" mientras que en este artículo de Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes el gráfico de llamadas se ve mejor (se llama len_chunk que es significativamente más rápido y las llamadas no parecen estar bloqueadas y esperan a que un trabajador termine su tarea antes de comenzar el otro)