python - read_parquet - dask vs spark
Aplicar la función al marco de datos agrupados en Dask: ¿Cómo se especifica el marco de datos agrupado como argumento en la función? (2)
Con un poco de conjeturas, creo que lo siguiente es lo que estás buscando.
def mapper(d):
def contraster(x, DF=d):
matches = DF.apply(lambda row: fuzz.partial_ratio(row[''last_name''], x) >= 50, axis = 1)
return [d.ID.iloc[i] for i, x in enumerate(matches) if x]
d[''out''] = d.apply(lambda row:
contraster(row[''last_name'']), axis =1)
return d
df.groupby(''first_name'').apply(mapper).compute()
Aplicado a tus datos, obtienes:
ID first_name last_name out
2 X Danae Smith [X]
4 12 Jacke Toro [12]
0 X Jake Del Toro [X]
1 U John Foster [U]
5 13 Jon Froster [13]
3 Y Beatriz Patterson [Y]
es decir, debido a que se agrupa por el primer nombre, cada grupo solo contiene un elemento, que coincide solo con sí mismo.
Sin embargo, si tiene algunos valores de primer nombre que estaban en varias filas, obtendría coincidencias:
entities = pd.DataFrame(
{''first_name'':[''Jake'',''Jake'', ''Jake'', ''John''],
''last_name'': [''Del Toro'', ''Toro'', ''Smith''
''Froster''],
''ID'':[''Z'',''U'',''X'',''Y'']})
Salida:
ID first_name last_name out
0 Z Jake Del Toro [Z, U]
1 U Jake Toro [Z, U]
2 X Jake Smith [X]
3 Y John Froster [Y]
Si no necesita coincidencias exactas en el primer nombre, entonces tal vez necesite ordenar / establecer el índice por el primer nombre y usar map_partitions
de una manera similar. En ese caso, necesitarás reformar tu pregunta.
Tengo un dask dataframe
agrupado por el índice ( first_name
).
import pandas as pd
import numpy as np
from multiprocessing import cpu_count
from dask import dataframe as dd
from dask.multiprocessing import get
from dask.distributed import Client
NCORES = cpu_count()
client = Client()
entities = pd.DataFrame({''first_name'':[''Jake'',''John'',''Danae'',''Beatriz'', ''Jacke'', ''Jon''],''last_name'': [''Del Toro'', ''Foster'', ''Smith'', ''Patterson'', ''Toro'', ''Froster''], ''ID'':[''X'',''U'',''X'',''Y'', ''12'',''13'']})
df = dd.from_pandas(entities, npartitions=NCORES)
df = client.persist(df.set_index(''first_name''))
(Obviamente las entities
en la vida real son varios miles de filas)
Quiero aplicar una función definida por el usuario a cada marco de datos agrupado. Quiero comparar cada fila con todas las demás filas del grupo (algo similar a Pandas compara cada fila con todas las filas en el marco de datos y guardar los resultados en la lista para cada fila ).
La siguiente es la función que trato de aplicar:
def contraster(x, DF):
matches = DF.apply(lambda row: fuzz.partial_ratio(row[''last_name''], x) >= 50, axis = 1)
return [i for i, x in enumerate(matches) if x]
Para el marco de datos de las entities
prueba, podría aplicar la función como de costumbre:
entities.apply(lambda row: contraster(row[''last_name''], entities), axis =1)
Y el resultado esperado es:
Out[35]:
0 [0, 4]
1 [1, 5]
2 [2]
3 [3]
4 [0, 4]
5 [1, 5]
dtype: object
Cuando las entities
son enormes, la solución es usar dask
. Tenga en cuenta que el DF
en la función de control debe ser el marco de datos agrupado.
Estoy tratando de usar lo siguiente:
df.groupby(''first_name'').apply(func=contraster, args=????)
Pero, ¿cómo debo especificar el marco de datos agrupado (es decir, DF
en contraster
?)
La función que proporciona a groupby-apply debe tomar un marco de datos o una serie de Pandas como entrada y, idealmente, devolver uno (o un valor escalar) como salida. Los parámetros adicionales están bien, pero deberían ser secundarios, no el primer argumento. Esto es lo mismo en los marcos de datos de Pandas y Dask.
def func(df, x=None):
# do whatever you want here
# the input to this function will have all the same first name
return pd.DataFrame({''x'': [x] * len(df),
''count'': len(df),
''first_name'': df.first_name})
A continuación, puede llamar a df.groupby normalmente
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({''first_name'':[''Alice'', ''Alice'', ''Bob''],
''last_name'': [''Adams'', ''Jones'', ''Smith'']})
ddf = dd.from_pandas(df, npartitions=2)
ddf.groupby(''first_name'').apply(func, x=3).compute()
Esto producirá la misma salida en pandas o dask.dataframe
count first_name x
0 2 Alice 3
1 2 Alice 3
2 1 Bob 3