scala apache-spark dataframe apache-spark-sql apache-spark-mllib

scala - Spark DataFrames cuando las funciones udf no aceptan variables de entrada lo suficientemente grandes



apache-spark apache-spark-sql (2)

Estoy preparando un DataFrame con una identificación y un vector de mis características para usar más adelante para hacer predicciones. Hago un groupBy en mi dataframe, y en mi groupBy estoy fusionando un par de columnas como listas en una nueva columna:

def mergeFunction(...) // with 14 input variables val myudffunction( mergeFunction ) // Spark doesn''t support this df.groupBy("id").agg( collect_list(df(...)) as ... ... // too many of these (something like 14 of them) ).withColumn("features_labels", myudffunction( col(...) , col(...) ) .select("id", "feature_labels")

Así es como estoy creando mis vectores de características y sus etiquetas. Ha funcionado para mí hasta ahora, pero esta es la primera vez que mi vector de características con este método se está volviendo más grande que el número 10, que es lo que acepta al máximo una función udf en Spark.

No estoy seguro de qué otra manera puedo solucionar esto. ¿Va a aumentar el tamaño de las entradas de udf en Spark? ¿Las he entendido incorrectamente o hay una mejor manera?


Las funciones definidas por el usuario se definen para hasta 22 parámetros. Solo udf helper se define para un máximo de 10 argumentos. Para manejar funciones con un mayor número de parámetros, puede usar org.apache.spark.sql.UDFRegistration .

Por ejemplo

val dummy = (( x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int, x8: Int, x9: Int, x10: Int, x11: Int, x12: Int, x13: Int, x14: Int, x15: Int, x16: Int, x17: Int, x18: Int, x19: Int, x20: Int, x21: Int) => 1)

van a registrarse:

import org.apache.spark.sql.expressions.UserDefinedFunction val dummyUdf: UserDefinedFunction = spark.udf.register("dummy", dummy)

y usar directamente

val df = spark.range(1) val exprs = (0 to 21).map(_ => lit(1)) df.select(dummyUdf(exprs: _*))

o por nombre a través de callUdf

import org.apache.spark.sql.functions.callUDF df.select( callUDF("dummy", exprs: _*).alias("dummy") )

o expresión SQL:

df.selectExpr(s"""dummy(${Seq.fill(22)(1).mkString(",")})""")

También puede crear un objeto UserDefinedFunction :

import org.apache.spark.sql.expressions.UserDefinedFunction Seq(1).toDF.select(UserDefinedFunction(dummy, IntegerType, None)(exprs: _*))

En la práctica, tener una función con 22 argumentos no es muy útil y, a menos que desee utilizar la reflexión de Scala para generarlos, existen pesadillas de mantenimiento.

Consideraría usar colecciones ( array , map ) o struct como entrada o dividir esto en múltiples módulos. Por ejemplo:

val aLongArray = array((0 to 256).map(_ => lit(1)): _*) val udfWitharray = udf((xs: Seq[Int]) => 1) Seq(1).toDF.select(udfWitharray(aLongArray).alias("dummy"))


Solo para expandir la respuesta de cero, es posible obtener la función .withColumn() para trabajar con un UDF que tiene más de 10 parámetros. Solo necesito spark.udf.register() la función y luego usar un expr para el argumento para agregar la columna (en lugar de un udf ).

Por ejemplo, algo como esto debería funcionar:

def mergeFunction(...) // with 14 input variables spark.udf.register("mergeFunction", mergeFunction) // make available in expressions df.groupBy("id").agg( collect_list(df(...)) as ... ... // too many of these (something like 14 of them) ).withColumn("features_labels", expr("mergeFunction(col1, col2, col3, col4, ...)") ) //pass in the 14 column names .select("id", "feature_labels")

El analizador de expresiones subyacente parece manejar más de 10 parámetros, así que no creo que tenga que recurrir a pasar matrices para llamar a la función. Además, si los parámetros son tipos de datos diferentes, las matrices no funcionarían muy bien.