read map_partitions python pandas parallel-processing dask

map_partitions - python dask DataFrame, ¿es compatible con la fila(trivialmente paralelizable)?



dask map_partitions (2)

Recientemente encontré el módulo dask que pretende ser un módulo de procesamiento paralelo de Python fácil de usar. El gran punto de venta para mí es que funciona con pandas.

Después de leer un poco en su página de manual, no puedo encontrar una manera de hacer esta tarea trivialmente paralelizable:

ts.apply(func) # for pandas series df.apply(func, axis = 1) # for pandas DF row apply

Por el momento, para lograr esto en dask, AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

que es una sintaxis fea y en realidad es más lenta que directa

df.apply(func, axis = 1) # for pandas DF row apply

¿Cualquier sugerencia?

Edición: Gracias @MRocklin por la función de mapa. Parece ser más lento de lo que se aplican los pandas simples. ¿Está esto relacionado con el tema de la liberación de pandas GIL o lo estoy haciendo mal?

import dask.dataframe as dd s = pd.Series([10000]*120) ds = dd.from_pandas(s, npartitions = 3) def slow_func(k): A = np.random.normal(size = k) # k = 10000 s = 0 for a in A: if a > 0: s += 1 else: s -= 1 return s s.apply(slow_func) # 0.43 sec ds.map(slow_func).compute() # 2.04 sec


map_partitions

Puede aplicar su función a todas las particiones de su marco de datos con la función map_partitions .

df.map_partitions(func, columns=...)

Tenga en cuenta que a la función se le dará solo una parte del conjunto de datos a la vez, no se pandas apply todo el conjunto de datos como con pandas apply (lo que presumiblemente no querría si quisiera hacer paralelismo).

map / apply

Puede asignar una función por filas en una serie con map

df.mycolumn.map(func)

Puede asignar una función por filas en un marco de datos con apply

df.apply(func, axis=1)

Hilos vs Procesos

A partir de la versión 0.6.0, dask.dataframes paraleliza con subprocesos. Las funciones Python personalizadas no recibirán mucho beneficio del paralelismo basado en subprocesos. Podrías probar procesos en su lugar

df = dd.read_csv(...) from dask.multiprocessing import get df.map_partitions(func, columns=...).compute(get=get)

Pero evita apply

Sin embargo, realmente debes evitar apply con funciones de Python personalizadas, tanto en Pandas como en Dask. Esto es a menudo una fuente de mal desempeño. Podría ser que si encuentra una manera de realizar su operación de forma vectorializada, podría ser que su código de Pandas sea 100 veces más rápido y no necesite dask.dataframe en absoluto.

Considerar numba

Para su problema particular puede considerar numba . Esto mejora significativamente su rendimiento.

In [1]: import numpy as np In [2]: import pandas as pd In [3]: s = pd.Series([10000]*120) In [4]: %paste def slow_func(k): A = np.random.normal(size = k) # k = 10000 s = 0 for a in A: if a > 0: s += 1 else: s -= 1 return s ## -- End pasted text -- In [5]: %time _ = s.apply(slow_func) CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms Wall time: 347 ms In [6]: import numba In [7]: fast_func = numba.jit(slow_func) In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead CPU times: user 179 ms, sys: 0 ns, total: 179 ms Wall time: 175 ms In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms Wall time: 68.7 ms

Descargo de responsabilidad, trabajo para la compañía que produce tanto numba como dask y emplea a muchos de los desarrolladores de pandas .


A partir de v dask.dataframe .apply delega responsabilidades en map_partitions :

@insert_meta_param_description(pad=12) def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds): """ Parallel version of pandas.Series.apply ... """ if meta is no_default: msg = ("`meta` is not specified, inferred from partial data. " "Please provide `meta` if the result is unexpected./n" " Before: .apply(func)/n" " After: .apply(func, meta={''x'': ''f8'', ''y'': ''f8''}) for dataframe result/n" " or: .apply(func, meta=(''x'', ''f8'')) for series result") warnings.warn(msg) meta = _emulate(M.apply, self._meta_nonempty, func, convert_dtype=convert_dtype, args=args, **kwds) return map_partitions(M.apply, self, func, convert_dtype, args, meta=meta, **kwds)