spark read_parquet example create python pandas dask

python - read_parquet - dask vs spark



Aplicar la función al marco de datos agrupados en Dask: ¿Cómo se especifica el marco de datos agrupado como argumento en la función? (2)

Con un poco de conjeturas, creo que lo siguiente es lo que estás buscando.

def mapper(d): def contraster(x, DF=d): matches = DF.apply(lambda row: fuzz.partial_ratio(row[''last_name''], x) >= 50, axis = 1) return [d.ID.iloc[i] for i, x in enumerate(matches) if x] d[''out''] = d.apply(lambda row: contraster(row[''last_name'']), axis =1) return d df.groupby(''first_name'').apply(mapper).compute()

Aplicado a tus datos, obtienes:

ID first_name last_name out 2 X Danae Smith [X] 4 12 Jacke Toro [12] 0 X Jake Del Toro [X] 1 U John Foster [U] 5 13 Jon Froster [13] 3 Y Beatriz Patterson [Y]

es decir, debido a que se agrupa por el primer nombre, cada grupo solo contiene un elemento, que coincide solo con sí mismo.

Sin embargo, si tiene algunos valores de primer nombre que estaban en varias filas, obtendría coincidencias:

entities = pd.DataFrame( {''first_name'':[''Jake'',''Jake'', ''Jake'', ''John''], ''last_name'': [''Del Toro'', ''Toro'', ''Smith'' ''Froster''], ''ID'':[''Z'',''U'',''X'',''Y'']})

Salida:

ID first_name last_name out 0 Z Jake Del Toro [Z, U] 1 U Jake Toro [Z, U] 2 X Jake Smith [X] 3 Y John Froster [Y]

Si no necesita coincidencias exactas en el primer nombre, entonces tal vez necesite ordenar / establecer el índice por el primer nombre y usar map_partitions de una manera similar. En ese caso, necesitarás reformar tu pregunta.

Tengo un dask dataframe agrupado por el índice ( first_name ).

import pandas as pd import numpy as np from multiprocessing import cpu_count from dask import dataframe as dd from dask.multiprocessing import get from dask.distributed import Client NCORES = cpu_count() client = Client() entities = pd.DataFrame({''first_name'':[''Jake'',''John'',''Danae'',''Beatriz'', ''Jacke'', ''Jon''],''last_name'': [''Del Toro'', ''Foster'', ''Smith'', ''Patterson'', ''Toro'', ''Froster''], ''ID'':[''X'',''U'',''X'',''Y'', ''12'',''13'']}) df = dd.from_pandas(entities, npartitions=NCORES) df = client.persist(df.set_index(''first_name''))

(Obviamente las entities en la vida real son varios miles de filas)

Quiero aplicar una función definida por el usuario a cada marco de datos agrupado. Quiero comparar cada fila con todas las demás filas del grupo (algo similar a Pandas compara cada fila con todas las filas en el marco de datos y guardar los resultados en la lista para cada fila ).

La siguiente es la función que trato de aplicar:

def contraster(x, DF): matches = DF.apply(lambda row: fuzz.partial_ratio(row[''last_name''], x) >= 50, axis = 1) return [i for i, x in enumerate(matches) if x]

Para el marco de datos de las entities prueba, podría aplicar la función como de costumbre:

entities.apply(lambda row: contraster(row[''last_name''], entities), axis =1)

Y el resultado esperado es:

Out[35]: 0 [0, 4] 1 [1, 5] 2 [2] 3 [3] 4 [0, 4] 5 [1, 5] dtype: object

Cuando las entities son enormes, la solución es usar dask . Tenga en cuenta que el DF en la función de control debe ser el marco de datos agrupado.

Estoy tratando de usar lo siguiente:

df.groupby(''first_name'').apply(func=contraster, args=????)

Pero, ¿cómo debo especificar el marco de datos agrupado (es decir, DF en contraster ?)


La función que proporciona a groupby-apply debe tomar un marco de datos o una serie de Pandas como entrada y, idealmente, devolver uno (o un valor escalar) como salida. Los parámetros adicionales están bien, pero deberían ser secundarios, no el primer argumento. Esto es lo mismo en los marcos de datos de Pandas y Dask.

def func(df, x=None): # do whatever you want here # the input to this function will have all the same first name return pd.DataFrame({''x'': [x] * len(df), ''count'': len(df), ''first_name'': df.first_name})

A continuación, puede llamar a df.groupby normalmente

import pandas as pd import dask.dataframe as dd df = pd.DataFrame({''first_name'':[''Alice'', ''Alice'', ''Bob''], ''last_name'': [''Adams'', ''Jones'', ''Smith'']}) ddf = dd.from_pandas(df, npartitions=2) ddf.groupby(''first_name'').apply(func, x=3).compute()

Esto producirá la misma salida en pandas o dask.dataframe

count first_name x 0 2 Alice 3 1 2 Alice 3 2 1 Bob 3