map_partitions - python dask DataFrame, ¿es compatible con la fila(trivialmente paralelizable)?
dask map_partitions (2)
Recientemente encontré el módulo dask que pretende ser un módulo de procesamiento paralelo de Python fácil de usar. El gran punto de venta para mí es que funciona con pandas.
Después de leer un poco en su página de manual, no puedo encontrar una manera de hacer esta tarea trivialmente paralelizable:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
Por el momento, para lograr esto en dask, AFAIK,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
que es una sintaxis fea y en realidad es más lenta que directa
df.apply(func, axis = 1) # for pandas DF row apply
¿Cualquier sugerencia?
Edición: Gracias @MRocklin por la función de mapa. Parece ser más lento de lo que se aplican los pandas simples. ¿Está esto relacionado con el tema de la liberación de pandas GIL o lo estoy haciendo mal?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
map_partitions
Puede aplicar su función a todas las particiones de su marco de datos con la función map_partitions
.
df.map_partitions(func, columns=...)
Tenga en cuenta que a la función se le dará solo una parte del conjunto de datos a la vez, no se pandas apply
todo el conjunto de datos como con pandas apply
(lo que presumiblemente no querría si quisiera hacer paralelismo).
map
/ apply
Puede asignar una función por filas en una serie con map
df.mycolumn.map(func)
Puede asignar una función por filas en un marco de datos con apply
df.apply(func, axis=1)
Hilos vs Procesos
A partir de la versión 0.6.0, dask.dataframes
paraleliza con subprocesos. Las funciones Python personalizadas no recibirán mucho beneficio del paralelismo basado en subprocesos. Podrías probar procesos en su lugar
df = dd.read_csv(...)
from dask.multiprocessing import get
df.map_partitions(func, columns=...).compute(get=get)
Pero evita apply
Sin embargo, realmente debes evitar apply
con funciones de Python personalizadas, tanto en Pandas como en Dask. Esto es a menudo una fuente de mal desempeño. Podría ser que si encuentra una manera de realizar su operación de forma vectorializada, podría ser que su código de Pandas sea 100 veces más rápido y no necesite dask.dataframe en absoluto.
Considerar numba
Para su problema particular puede considerar numba
. Esto mejora significativamente su rendimiento.
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
Descargo de responsabilidad, trabajo para la compañía que produce tanto numba
como dask
y emplea a muchos de los desarrolladores de pandas
.
A partir de v dask.dataframe
.apply delega responsabilidades en map_partitions
:
@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
""" Parallel version of pandas.Series.apply
...
"""
if meta is no_default:
msg = ("`meta` is not specified, inferred from partial data. "
"Please provide `meta` if the result is unexpected./n"
" Before: .apply(func)/n"
" After: .apply(func, meta={''x'': ''f8'', ''y'': ''f8''}) for dataframe result/n"
" or: .apply(func, meta=(''x'', ''f8'')) for series result")
warnings.warn(msg)
meta = _emulate(M.apply, self._meta_nonempty, func,
convert_dtype=convert_dtype,
args=args, **kwds)
return map_partitions(M.apply, self, func,
convert_dtype, args, meta=meta, **kwds)