spark meaning map_partitions pandas dask

pandas - meaning - dask to parquet



¿Cómo paraleliza apply() en Pandas Dataframes utilizando todos los núcleos en una máquina? (2)

La forma más sencilla es usar map_partitions de Dask . Necesitas estas importaciones (necesitarás pip install dask ):

import pandas as pd import dask.dataframe as dd from dask.multiprocessing import get

y la sintaxis es

data = <your_pandas_dataframe> ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y,z, ...): return <whatever> res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)

(Creo que 30 es un número adecuado de particiones si tiene 16 núcleos). Solo para estar completo, cronometré la diferencia en mi máquina (16 núcleos):

data = pd.DataFrame() data[''col1''] = np.random.normal(size = 1500000) data[''col2''] = np.random.normal(size = 1500000) ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y): return y*(x**2+1) def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1) def pandas_apply(): return apply_myfunc_to_DF(data) def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get) def vectorized(): return myfunc(data[''col1''], data[''col2''] ) t_pds = timeit.Timer(lambda: pandas_apply()) print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply()) print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized()) print(t_vec.timeit(number=1))

0.010668013244867325

Dar un factor de 10 de aceleración desde pandas se aplica a dask en particiones. Por supuesto, si tiene una función que puede vectorizar, debería - en este caso, la función ( y*(x**2+1) ) está vectorizada trivialmente, pero hay muchas cosas que son imposibles de vectorizar.

A partir de agosto de 2017, Pandas DataFame.apply() todavía está limitado a trabajar con un solo núcleo, lo que significa que una máquina de múltiples núcleos perderá la mayor parte de su tiempo de cálculo cuando ejecute df.apply(myfunc, axis=1) .

¿Cómo puede utilizar todos sus núcleos para ejecutar aplicar en un marco de datos en paralelo?


Puede utilizar el paquete swifter :

pip install swifter

Funciona como un complemento para pandas, lo que le permite reutilizar la función de apply :

import swifter def some_function(data): return data * 10 data[''out''] = data[''in''].swifter.apply(some_function)

Descubrirá automáticamente la forma más eficiente de paralelizar la función, sin importar si está vectorizada (como en el ejemplo anterior) o no.

Más ejemplos y una comparación de rendimiento están disponibles en GitHub. Tenga en cuenta que el paquete está en desarrollo activo, por lo que la API puede cambiar.