python apache-spark pyspark apache-spark-sql user-defined-functions

Aplicación de UDF en GroupedData en PySpark(con ejemplo de Python en funcionamiento)



apache-spark apache-spark-sql (3)

Tengo este código de Python que se ejecuta localmente en un marco de datos de pandas:

df_result = pd.DataFrame(df .groupby(''A'') .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

Me gustaría ejecutar esto en PySpark, pero tengo problemas para tratar con el objeto pyspark.sql.group.GroupedData.

He intentado lo siguiente:

sparkDF .groupby(''A'') .agg(myFunction(zip(''B'', ''C''), ''A''))

que vuelve

KeyError: ''A''

Supongo que ''A'' ya no es una columna y no puedo encontrar el equivalente para x.name.

Y entonces

sparkDF .groupby(''A'') .map(lambda row: Row(myFunction(zip(''B'', ''C''), ''A''))) .toDF()

pero recibe el siguiente error:

AttributeError: ''GroupedData'' object has no attribute ''map''

Cualquier sugerencia sería muy apreciada!


Desde Spark 2.3 puedes usar pandas_udf . GROUPED_MAP toma Callable[[pandas.DataFrame], pandas.DataFrame] o, en otras palabras, una función que se asigna desde Pandas DataFrame de la misma forma que la entrada, al DataFrame salida.

Por ejemplo, si los datos se ven así:

df = spark.createDataFrame( [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)], ("key", "value1", "value2") )

y desea calcular el valor promedio de min en pares entre value1 value2 , debe definir el esquema de salida:

from pyspark.sql.types import * schema = StructType([ StructField("key", StringType()), StructField("avg_min", DoubleType()) ])

pandas_udf :

import pandas as pd from pyspark.sql.functions import pandas_udf from pyspark.sql.functions import PandasUDFType @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP) def g(df): result = pd.DataFrame(df.groupby(df.key).apply( lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean() )) result.reset_index(inplace=True, drop=False) return result

y aplicarlo:

df.groupby("key").apply(g).show()

+---+-------+ |key|avg_min| +---+-------+ | b| -1.5| | a| -0.5| +---+-------+

Excluyendo la definición de esquema y el decorador, su código actual de Pandas puede aplicarse tal cual.

Desde Spark 2.4.0 también GROUPED_AGG variante GROUPED_AGG , que toma Callable[[pandas.Series, ...], T] , donde T es un escalar primitivo:

import numpy as np @pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG) def f(x, y): return np.minimum(x, y).mean()

que se puede usar con la group_by estándar group_by / agg :

df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()

+---+-------+ |key|avg_min| +---+-------+ | b| -1.5| | a| -0.5| +---+-------+

Tenga en cuenta que ni GROUPED_MAP ni GROUPPED_AGG pandas_udf comportan de la misma manera que UserDefinedAggregateFunction o Aggregator , y está más cerca de groupByKey o funciones de ventana con marco ilimitado. Los datos se barajan primero, y solo después de eso, se aplica UDF.

Para una ejecución optimizada, debe implementar Scala UserDefinedAggregateFunction y agregar Python wrapper .

Consulte también ¿ Función definida por el usuario que se aplicará a Window en PySpark?


Lo que está intentando es escribir una UDAF (función agregada definida por el usuario) en lugar de una UDF (función definida por el usuario). Las UDAF son funciones que funcionan en datos agrupados por una clave. Específicamente, necesitan definir cómo fusionar múltiples valores en el grupo en una sola partición, y luego cómo combinar los resultados en las particiones para obtener la clave. Actualmente no hay forma en Python de implementar un UDAF, solo se pueden implementar en Scala.

Pero, puedes solucionarlo en Python. Puede usar el conjunto de recopilación para recopilar sus valores agrupados y luego usar un UDF regular para hacer lo que quiera con ellos. La única advertencia es collect_set solo funciona en valores primitivos, por lo que deberá codificarlos en una cadena.

from pyspark.sql.types import StringType from pyspark.sql.functions import col, collect_list, concat_ws, udf def myFunc(data_list): for val in data_list: b, c = data.split('','') # do something return <whatever> myUdf = udf(myFunc, StringType()) df.withColumn(''data'', concat_ws('','', col(''B''), col(''C''))) / .groupBy(''A'').agg(collect_list(''data'').alias(''data'')) .withColumn(''data'', myUdf(''data''))

Use collect_set si desea deducir. Además, si tiene muchos valores para algunas de sus claves, esto será lento porque todos los valores para una clave deberán recopilarse en una única partición en algún lugar de su clúster. Si su resultado final es un valor que construye combinando los valores por clave de alguna manera (por ejemplo, sumándolos), podría ser más rápido implementarlo utilizando el método RDD agregateByKey que le permite construir un valor intermedio para cada clave en una partición antes barajando datos.

EDITAR: 21/11/2018

Como esta respuesta fue escrita, pyspark agregó soporte para UDAF''S usando Pandas. Hay algunas mejoras de rendimiento agradables cuando se usan los UDF y UDAF de Panda sobre las funciones de Python directo con RDD. Debajo del capó, vectoriza las columnas (agrupa los valores de varias filas para optimizar el procesamiento y la compresión). Eche un vistazo here para obtener una mejor explicación o vea la respuesta del continuación para ver un ejemplo.


Voy a extender la respuesta anterior.

Entonces puede implementar la misma lógica como pandas.groupby (). Aplicar en pyspark usando @pandas_udf y que es un método de vectorización y más rápido que udf simple.

from pyspark.sql.functions import pandas_udf,PandasUDFType df3 = spark.createDataFrame( [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)], ("key", "value1", "value2") ) from pyspark.sql.types import * schema = StructType([ StructField("key", StringType()), StructField("avg_value1", DoubleType()), StructField("avg_value2", DoubleType()), StructField("sum_avg", DoubleType()), StructField("sub_avg", DoubleType()) ]) @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP) def g(df): gr = df[''key''].iloc[0] x = df.value1.mean() y = df.value2.mean() w = df.value1.mean() + df.value2.mean() z = df.value1.mean() - df.value2.mean() return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]]) df3.groupby("key").apply(g).show()

Obtendrá el siguiente resultado:

+---+----------+----------+-------+-------+ |key|avg_value1|avg_value2|sum_avg|sub_avg| +---+----------+----------+-------+-------+ | b| 6.5| -1.5| 5.0| 8.0| | a| 0.0| 21.0| 21.0| -21.0| +---+----------+----------+-------+-------+

Por lo tanto, puede hacer más cálculos entre otros campos en datos agrupados y agregarlos al marco de datos en formato de lista.