python - Paralelismo aplicar después de pandas groupby
parallel-processing rosetta (4)
Esto parece funcionar, aunque realmente debería estar incorporado a los pandas
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df[''c''] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == ''__main__'':
df = pd.DataFrame({''a'': [6, 2, 2], ''b'': [4, 5, 6]},index= [''g1'', ''g1'', ''g2''])
print ''parallel version: ''
print applyParallel(df.groupby(df.index), tmpFunc)
print ''regular version: ''
print df.groupby(df.index).apply(tmpFunc)
print ''ideal version (does not work): ''
print df.groupby(df.index).applyParallel(tmpFunc)
He usado rosetta.parallel.pandas_easy para paralelizar la aplicación después del grupo, por ejemplo:
from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({''a'': [6, 2, 2], ''b'': [4, 5, 6]},index= [''g1'', ''g1'', ''g2''])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
Sin embargo, ¿alguien ha descubierto cómo paralelizar una función que devuelve un marco de datos? Este código falla para rosetta, como se esperaba.
def tmpFunc(df):
df[''c''] = df.a + df.b
return df
df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
La respuesta de Ivan es genial, pero parece que puede simplificarse un poco, eliminando también la necesidad de depender de joblib:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
Por cierto: esto no puede reemplazar a ningún grupo por aplicación (), pero cubrirá los casos típicos: por ejemplo, debería cubrir los casos 2 y 3 en la documentación , mientras que debería obtenerse el comportamiento del caso 1 dando el argumento axis=1
a la pandas.concat()
final pandas.concat()
.
Tengo un truco que utilizo para obtener la paralelización en pandas. Rompo mi marco de datos en fragmentos, pongo cada fragmento en el elemento de una lista y luego uso los bits paralelos de ipython para hacer una aplicación paralela en la lista de marcos de datos. Luego volví a armar la lista usando la función concat
.
Esto no es generalmente aplicable, sin embargo. Funciona para mí porque la función que deseo aplicar a cada fragmento del marco de datos tarda aproximadamente un minuto. Y separar y armar mis datos no toma tanto tiempo. Así que esto es claramente un obstáculo. Dicho esto, aquí hay un ejemplo. Estoy usando el cuaderno Ipython para que veas %%time
magic en mi código:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame({''mygroup'' : np.random.randint(1000, size=n),
''data'' : np.random.rand(n)})
grouped = df.groupby(''mygroup'')
Para este ejemplo, voy a hacer ''trozos'' basados en el grupo anterior, pero no tiene que ser así como se fragmentan los datos. Aunque es un patrón bastante común.
dflist = []
for name, group in grouped:
dflist.append(group)
configurar los bits paralelos
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
escribir una función tonta para aplicar a nuestros datos
def myFunc(inDf):
inDf[''newCol''] = inDf.data ** 10
return inDf
ahora ejecutemos el código en serie y luego en paralelo. primero en serie:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
ahora paralelo
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
luego solo se necesitan unos pocos ms para fusionarlos de nuevo en un marco de datos
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
Estoy ejecutando 6 motores IPython en mi MacBook, pero se puede ver que reduce el tiempo de ejecución a 2 segundos desde 14 segundos.
Para simulaciones estocásticas de ejecución muy larga, puedo utilizar AWS backend StarCluster un clúster con StarCluster . La mayor parte del tiempo, sin embargo, paralelizo solo en 8 CPUs en mi MBP.
Un breve comentario para acompañar la respuesta de JD Long. Descubrí que si la cantidad de grupos es muy grande (digamos cientos de miles), y su función de aplicar está haciendo algo bastante simple y rápido, entonces dividir su marco de datos en pedazos y asignarle cada pedazo a un trabajador para llevar a cabo una groupby-apply (en serie) puede ser mucho más rápido que hacer un groupby-apply paralelo y hacer que los trabajadores lean una cola que contenga una multitud de grupos. Ejemplo:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({''a'': np.random.permutation(np.arange(nrows))})
Entonces nuestro marco de datos se ve así:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Tenga en cuenta que la columna ''a'' tiene muchos grupos (piense en los ids de los clientes):
len(df.a.unique())
15000
Una función para operar en nuestros grupos:
def f1(group):
time.sleep(0.0001)
return group
Comience un grupo:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
Haz un grupo paralelo por aplicación:
%%time
for name, group in df.groupby(''a''):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Agreguemos ahora una columna que particione el df en muchos menos grupos:
df[''b''] = np.random.randint(0, 12, nrows)
Ahora, en lugar de 15000 grupos, solo hay 12:
len(df.b.unique())
12
Particionaremos nuestro df y haremos un groupby-apply en cada fragmento.
ppe = ProcessPoolExecutor(12)
Diversión de envoltura:
def f2(df):
df.groupby(''a'').apply(f1)
return df
Envíe cada fragmento para ser operado en serie:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Tenga en cuenta que la cantidad de tiempo gastado por grupo no ha cambiado. Más bien, lo que ha cambiado es la longitud de la cola desde la que los trabajadores leen. Sospecho que lo que está sucediendo es que los trabajadores no pueden acceder a la memoria compartida de forma simultánea, y están volviendo constantemente para leer la cola, y por lo tanto están pisándose los pies. Con trozos más grandes para operar, los trabajadores regresan con menos frecuencia y por lo tanto este problema se mejora y la ejecución general es más rápida.