python - Función len lenta en dataframe distribuido por dask
dask dataframe api (1)
Buena pregunta, esto nos lleva a algunos puntos sobre cuándo los datos se están moviendo hacia el clúster y regresando al cliente (su sesión de python). Veamos algunas etapas de tu compuación
Cargar datos con pandas
Este es un marco de datos de Pandas en su sesión de Python, por lo que obviamente todavía está en su proceso local.
log = pd.read_csv(''800000test'', sep=''/t'') # on client
Convierta a un Dask.dataframe perezoso
Esto descompone su marco de datos de Pandas en veinte marcos de datos de Pandas, sin embargo, estos todavía están en el cliente. Los dataframes Dask no envían datos con entusiasmo al clúster.
logd = dd.from_pandas(log,npartitions=20) # still on client
Compute len
Llamar a len
realmente causa cómputo aquí (normalmente usaría df.some_aggregation().compute()
Entonces ahora Dask entra en acción. Primero mueve sus datos al clúster (lento) y luego llama a len en todas las 20 particiones ( rápido), agrega esos (rápido) y luego mueve el resultado a su cliente para que pueda imprimir.
print(len(logd)) # costly roundtrip client -> cluster -> client
Análisis
Entonces, el problema aquí es que nuestro dask.dataframe aún tenía todos sus datos en la sesión python local.
Habría sido mucho más rápido usar, por ejemplo, el programador local de hilos en lugar del programador distribuido. Esto debe calcularse en milisegundos
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads
print(len(logd)) # stays on client
Pero, presumiblemente, desea saber cómo escalar a conjuntos de datos más grandes, así que hagamos esto de la manera correcta.
Carga tus datos en los trabajadores
En lugar de cargar con Pandas en su sesión cliente / local, permita que los trabajadores de Dask carguen bits del archivo csv. De esta forma, no es necesaria la comunicación cliente-trabajador.
# log = pd.read_csv(''800000test'', sep=''/t'') # on client
log = dd.read_csv(''800000test'', sep=''/t'') # on cluster workers
Sin embargo, a diferencia de pd.read_csv
, dd.read_csv
es flojo, por lo que debería volver casi de inmediato. Podemos forzar a Dask a hacer el cálculo con el método persist
log = client.persist(log) # triggers computation asynchronously
Ahora el clúster entra en acción y carga sus datos directamente en los trabajadores. Esto es relativamente rápido. Tenga en cuenta que este método regresa inmediatamente mientras el trabajo ocurre en segundo plano. Si quiere esperar hasta que termine, llame a wait
.
from dask.distributed import wait
wait(log) # blocks until read is done
Si está probando con un pequeño conjunto de datos y desea obtener más particiones, intente cambiar el tamaño del bloque.
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
De todos modos, las operaciones en el log
ahora deberían ser rápidas
len(log) # fast
Editar
En respuesta a una pregunta en este blog aquí, están las suposiciones que estamos haciendo sobre dónde vive el archivo.
En general, cuando proporciona un nombre de archivo a dd.read_csv
, asume que ese archivo es visible de todos los trabajadores. Esto es cierto si está utilizando un sistema de archivos de red o una tienda global como S3 o HDFS. Si está utilizando un sistema de archivos de red, querrá usar rutas absolutas (como /path/to/myfile.*.csv
) o bien asegurarse de que sus trabajadores y clientes tengan el mismo directorio de trabajo.
Si este no es el caso, y sus datos solo están en su máquina cliente, entonces tendrá que cargarlos y dispersarlos.
Simple pero subóptimo
La manera simple es hacer lo que hiciste originalmente, pero persiste tu dask.dataframe
log = pd.read_csv(''800000test'', sep=''/t'') # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers
Esto está bien, pero da como resultado una comunicación un poco menos que ideal.
Complejo pero óptimo
En cambio, es posible que disemine sus datos al clúster explícitamente
[future] = client.scatter([log])
Esto se mete en una API más compleja, así que solo te guiaré a los documentos
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/ retrayed-collections.html
He estado probando cómo usar dask (clúster con 20 núcleos) y me sorprende la velocidad con la que llamo llamar a una función len frente a cortar a través de loc.
import dask.dataframe as dd
from dask.distributed import Client
client = Client(''192.168.1.220:8786'')
log = pd.read_csv(''800000test'', sep=''/t'')
logd = dd.from_pandas(log,npartitions=20)
#This is the code than runs slowly
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)
print(len(logd))
#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:''Host''].count().compute()
¿Alguna idea de por qué esto podría estar pasando? No es importante para mí que Len funcione rápido, pero siento que al no entender este comportamiento, hay algo que no entiendo de la biblioteca.
Todos los cuadros verdes corresponden a "from_pandas" mientras que en este artículo de Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes el gráfico de llamadas se ve mejor (se llama len_chunk que es significativamente más rápido y las llamadas no parecen estar bloqueadas y esperan a que un trabajador termine su tarea antes de comenzar el otro)