scala - Spark DataFrames cuando las funciones udf no aceptan variables de entrada lo suficientemente grandes
apache-spark apache-spark-sql (2)
Estoy preparando un DataFrame con una identificación y un vector de mis características para usar más adelante para hacer predicciones. Hago un groupBy en mi dataframe, y en mi groupBy estoy fusionando un par de columnas como listas en una nueva columna:
def mergeFunction(...) // with 14 input variables
val myudffunction( mergeFunction ) // Spark doesn''t support this
df.groupBy("id").agg(
collect_list(df(...)) as ...
... // too many of these (something like 14 of them)
).withColumn("features_labels",
myudffunction(
col(...)
, col(...) )
.select("id", "feature_labels")
Así es como estoy creando mis vectores de características y sus etiquetas. Ha funcionado para mí hasta ahora, pero esta es la primera vez que mi vector de características con este método se está volviendo más grande que el número 10, que es lo que acepta al máximo una función udf en Spark.
No estoy seguro de qué otra manera puedo solucionar esto. ¿Va a aumentar el tamaño de las entradas de udf en Spark? ¿Las he entendido incorrectamente o hay una mejor manera?
Las funciones definidas por el usuario se definen para hasta 22 parámetros.
Solo
udf
helper se define para un máximo de 10 argumentos.
Para manejar funciones con un mayor número de parámetros, puede usar
org.apache.spark.sql.UDFRegistration
.
Por ejemplo
val dummy = ((
x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int,
x8: Int, x9: Int, x10: Int, x11: Int, x12: Int, x13: Int, x14: Int,
x15: Int, x16: Int, x17: Int, x18: Int, x19: Int, x20: Int, x21: Int) => 1)
van a registrarse:
import org.apache.spark.sql.expressions.UserDefinedFunction
val dummyUdf: UserDefinedFunction = spark.udf.register("dummy", dummy)
y usar directamente
val df = spark.range(1)
val exprs = (0 to 21).map(_ => lit(1))
df.select(dummyUdf(exprs: _*))
o por nombre a través de
callUdf
import org.apache.spark.sql.functions.callUDF
df.select(
callUDF("dummy", exprs: _*).alias("dummy")
)
o expresión SQL:
df.selectExpr(s"""dummy(${Seq.fill(22)(1).mkString(",")})""")
También puede crear un objeto
UserDefinedFunction
:
import org.apache.spark.sql.expressions.UserDefinedFunction
Seq(1).toDF.select(UserDefinedFunction(dummy, IntegerType, None)(exprs: _*))
En la práctica, tener una función con 22 argumentos no es muy útil y, a menos que desee utilizar la reflexión de Scala para generarlos, existen pesadillas de mantenimiento.
Consideraría usar colecciones (
array
,
map
) o
struct
como entrada o dividir esto en múltiples módulos.
Por ejemplo:
val aLongArray = array((0 to 256).map(_ => lit(1)): _*)
val udfWitharray = udf((xs: Seq[Int]) => 1)
Seq(1).toDF.select(udfWitharray(aLongArray).alias("dummy"))
Solo para expandir la respuesta de cero, es posible obtener la función
.withColumn()
para trabajar con un UDF que tiene más de 10 parámetros.
Solo necesito
spark.udf.register()
la función y luego usar un
expr
para el argumento para agregar la columna (en lugar de un
udf
).
Por ejemplo, algo como esto debería funcionar:
def mergeFunction(...) // with 14 input variables
spark.udf.register("mergeFunction", mergeFunction) // make available in expressions
df.groupBy("id").agg(
collect_list(df(...)) as ...
... // too many of these (something like 14 of them)
).withColumn("features_labels",
expr("mergeFunction(col1, col2, col3, col4, ...)") ) //pass in the 14 column names
.select("id", "feature_labels")
El analizador de expresiones subyacente parece manejar más de 10 parámetros, así que no creo que tenga que recurrir a pasar matrices para llamar a la función. Además, si los parámetros son tipos de datos diferentes, las matrices no funcionarían muy bien.