python pandas parallel-processing rosetta

python - Paralelismo aplicar después de pandas groupby



parallel-processing rosetta (4)

Esto parece funcionar, aunque realmente debería estar incorporado a los pandas

import pandas as pd from joblib import Parallel, delayed import multiprocessing def tmpFunc(df): df[''c''] = df.a + df.b return df def applyParallel(dfGrouped, func): retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) return pd.concat(retLst) if __name__ == ''__main__'': df = pd.DataFrame({''a'': [6, 2, 2], ''b'': [4, 5, 6]},index= [''g1'', ''g1'', ''g2'']) print ''parallel version: '' print applyParallel(df.groupby(df.index), tmpFunc) print ''regular version: '' print df.groupby(df.index).apply(tmpFunc) print ''ideal version (does not work): '' print df.groupby(df.index).applyParallel(tmpFunc)

He usado rosetta.parallel.pandas_easy para paralelizar la aplicación después del grupo, por ejemplo:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame df = pd.DataFrame({''a'': [6, 2, 2], ''b'': [4, 5, 6]},index= [''g1'', ''g1'', ''g2'']) groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)

Sin embargo, ¿alguien ha descubierto cómo paralelizar una función que devuelve un marco de datos? Este código falla para rosetta, como se esperaba.

def tmpFunc(df): df[''c''] = df.a + df.b return df df.groupby(df.index).apply(tmpFunc) groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)


La respuesta de Ivan es genial, pero parece que puede simplificarse un poco, eliminando también la necesidad de depender de joblib:

from multiprocessing import Pool, cpu_count def applyParallel(dfGrouped, func): with Pool(cpu_count()) as p: ret_list = p.map(func, [group for name, group in dfGrouped]) return pandas.concat(ret_list)

Por cierto: esto no puede reemplazar a ningún grupo por aplicación (), pero cubrirá los casos típicos: por ejemplo, debería cubrir los casos 2 y 3 en la documentación , mientras que debería obtenerse el comportamiento del caso 1 dando el argumento axis=1 a la pandas.concat() final pandas.concat() .


Tengo un truco que utilizo para obtener la paralelización en pandas. Rompo mi marco de datos en fragmentos, pongo cada fragmento en el elemento de una lista y luego uso los bits paralelos de ipython para hacer una aplicación paralela en la lista de marcos de datos. Luego volví a armar la lista usando la función concat .

Esto no es generalmente aplicable, sin embargo. Funciona para mí porque la función que deseo aplicar a cada fragmento del marco de datos tarda aproximadamente un minuto. Y separar y armar mis datos no toma tanto tiempo. Así que esto es claramente un obstáculo. Dicho esto, aquí hay un ejemplo. Estoy usando el cuaderno Ipython para que veas %%time magic en mi código:

## make some example data import pandas as pd np.random.seed(1) n=10000 df = pd.DataFrame({''mygroup'' : np.random.randint(1000, size=n), ''data'' : np.random.rand(n)}) grouped = df.groupby(''mygroup'')

Para este ejemplo, voy a hacer ''trozos'' basados ​​en el grupo anterior, pero no tiene que ser así como se fragmentan los datos. Aunque es un patrón bastante común.

dflist = [] for name, group in grouped: dflist.append(group)

configurar los bits paralelos

from IPython.parallel import Client rc = Client() lview = rc.load_balanced_view() lview.block = True

escribir una función tonta para aplicar a nuestros datos

def myFunc(inDf): inDf[''newCol''] = inDf.data ** 10 return inDf

ahora ejecutemos el código en serie y luego en paralelo. primero en serie:

%%time serial_list = map(myFunc, dflist) CPU times: user 14 s, sys: 19.9 ms, total: 14 s Wall time: 14 s

ahora paralelo

%%time parallel_list = lview.map(myFunc, dflist) CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s Wall time: 1.56 s

luego solo se necesitan unos pocos ms para fusionarlos de nuevo en un marco de datos

%%time combinedDf = pd.concat(parallel_list) CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms Wall time: 300 ms

Estoy ejecutando 6 motores IPython en mi MacBook, pero se puede ver que reduce el tiempo de ejecución a 2 segundos desde 14 segundos.

Para simulaciones estocásticas de ejecución muy larga, puedo utilizar AWS backend StarCluster un clúster con StarCluster . La mayor parte del tiempo, sin embargo, paralelizo solo en 8 CPUs en mi MBP.


Un breve comentario para acompañar la respuesta de JD Long. Descubrí que si la cantidad de grupos es muy grande (digamos cientos de miles), y su función de aplicar está haciendo algo bastante simple y rápido, entonces dividir su marco de datos en pedazos y asignarle cada pedazo a un trabajador para llevar a cabo una groupby-apply (en serie) puede ser mucho más rápido que hacer un groupby-apply paralelo y hacer que los trabajadores lean una cola que contenga una multitud de grupos. Ejemplo:

import pandas as pd import numpy as np import time from concurrent.futures import ProcessPoolExecutor, as_completed nrows = 15000 np.random.seed(1980) df = pd.DataFrame({''a'': np.random.permutation(np.arange(nrows))})

Entonces nuestro marco de datos se ve así:

a 0 3425 1 1016 2 8141 3 9263 4 8018

Tenga en cuenta que la columna ''a'' tiene muchos grupos (piense en los ids de los clientes):

len(df.a.unique()) 15000

Una función para operar en nuestros grupos:

def f1(group): time.sleep(0.0001) return group

Comience un grupo:

ppe = ProcessPoolExecutor(12) futures = [] results = []

Haz un grupo paralelo por aplicación:

%%time for name, group in df.groupby(''a''): p = ppe.submit(f1, group) futures.append(p) for future in as_completed(futures): r = future.result() results.append(r) df_output = pd.concat(results) del ppe CPU times: user 18.8 s, sys: 2.15 s, total: 21 s Wall time: 17.9 s

Agreguemos ahora una columna que particione el df en muchos menos grupos:

df[''b''] = np.random.randint(0, 12, nrows)

Ahora, en lugar de 15000 grupos, solo hay 12:

len(df.b.unique()) 12

Particionaremos nuestro df y haremos un groupby-apply en cada fragmento.

ppe = ProcessPoolExecutor(12)

Diversión de envoltura:

def f2(df): df.groupby(''a'').apply(f1) return df

Envíe cada fragmento para ser operado en serie:

%%time for i in df.b.unique(): p = ppe.submit(f2, df[df.b==i]) futures.append(p) for future in as_completed(futures): r = future.result() results.append(r) df_output = pd.concat(results) CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s Wall time: 12.4 s

Tenga en cuenta que la cantidad de tiempo gastado por grupo no ha cambiado. Más bien, lo que ha cambiado es la longitud de la cola desde la que los trabajadores leen. Sospecho que lo que está sucediendo es que los trabajadores no pueden acceder a la memoria compartida de forma simultánea, y están volviendo constantemente para leer la cola, y por lo tanto están pisándose los pies. Con trozos más grandes para operar, los trabajadores regresan con menos frecuencia y por lo tanto este problema se mejora y la ejecución general es más rápida.