scala - ¿Cómo puedo pasar parámetros adicionales a UDF en Spark SQL?
apache-spark apache-spark-sql (2)
Puede crear una
Column
literal para pasar a un udf utilizando la función
lit(...)
definida en
org.apache.spark.sql.functions
Por ejemplo:
val takeRight = udf((s: String, i: Int) => s.takeRight(i))
df.select(takeRight($"stringCol", lit(1)))
Quiero analizar las columnas de fecha en un
DataFrame
, y para cada columna de fecha, la resolución de la fecha puede cambiar (es decir, 2011/01/10 => 2011/01 si la resolución se establece en "Mes").
Escribí el siguiente código:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
Sin embargo no funciona.
Parece que solo puedo pasar las
Column
a UDF.
Y me pregunto si será muy lento si convierto el
DataFrame
a
RDD
y aplico la función en cada fila.
¿Alguien sabe la solución correcta? ¡Gracias!
Solo usa un poco de curry:
def convertDateFunc(resolution: DateResolutionType) = udf((x:String) =>
SparkDateTimeConverter.convertDate(x, resolution))
y úsalo de la siguiente manera:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
En una nota al margen, debería echar un vistazo a
sql.functions.trunc
y
sql.functions.date_format
.
Estos deberían al menos parte del trabajo sin utilizar UDF en absoluto.
Nota :
En Spark 2.2 o posterior puede usar la función
typedLit
:
import org.apache.spark.sql.functions.typedLit
que admiten una gama más amplia de literales como
Seq
o
Map
.