scala apache-spark apache-spark-sql user-defined-functions

scala - ¿Cómo puedo pasar parámetros adicionales a UDF en Spark SQL?



apache-spark apache-spark-sql (2)

Puede crear una Column literal para pasar a un udf utilizando la función lit(...) definida en org.apache.spark.sql.functions

Por ejemplo:

val takeRight = udf((s: String, i: Int) => s.takeRight(i)) df.select(takeRight($"stringCol", lit(1)))

Quiero analizar las columnas de fecha en un DataFrame , y para cada columna de fecha, la resolución de la fecha puede cambiar (es decir, 2011/01/10 => 2011/01 si la resolución se establece en "Mes").

Escribí el siguiente código:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = { import org.apache.spark.sql.functions._ val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)} val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)} val allColNames = dataframe.columns val allCols = allColNames.map(name => dataframe.col(name)) val mappedCols = { for(i <- allCols.indices) yield { schema(i) match { case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i))) case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i)) case _ => allCols(i) } } } dataframe.select(mappedCols:_*) }}

Sin embargo no funciona. Parece que solo puedo pasar las Column a UDF. Y me pregunto si será muy lento si convierto el DataFrame a RDD y aplico la función en cada fila.

¿Alguien sabe la solución correcta? ¡Gracias!


Solo usa un poco de curry:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => SparkDateTimeConverter.convertDate(x, resolution))

y úsalo de la siguiente manera:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))

En una nota al margen, debería echar un vistazo a sql.functions.trunc y sql.functions.date_format . Estos deberían al menos parte del trabajo sin utilizar UDF en absoluto.

Nota :

En Spark 2.2 o posterior puede usar la función typedLit :

import org.apache.spark.sql.functions.typedLit

que admiten una gama más amplia de literales como Seq o Map .