to_date spark functions scala apache-spark apache-spark-sql user-defined-functions nullable

scala - functions - spark udf



SparkSQL: ¿Cómo lidiar con valores nulos en la función definida por el usuario? (3)

Dada la Tabla 1 con una columna "x" de tipo String. Quiero crear la Tabla 2 con una columna "y" que es una representación entera de las cadenas de fecha dadas en "x".

Esencial es mantener valores null en la columna "y".

Tabla 1 (Dataframe df1):

+----------+ | x| +----------+ |2015-09-12| |2015-09-13| | null| | null| +----------+ root |-- x: string (nullable = true)

Tabla 2 (Dataframe df2):

+----------+--------+ | x| y| +----------+--------+ | null| null| | null| null| |2015-09-12|20150912| |2015-09-13|20150913| +----------+--------+ root |-- x: string (nullable = true) |-- y: integer (nullable = true)

Mientras que la función definida por el usuario (udf) para convertir valores de la columna "x" en los de la columna "y" es:

val extractDateAsInt = udf[Int, String] ( (d:String) => d.substring(0, 10) .filterNot( "-".toSet) .toInt )

y funciona, tratar con valores nulos no es posible.

Aunque puedo hacer algo como

val extractDateAsIntWithNull = udf[Int, String] ( (d:String) => if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt else 1 )

No he encontrado la manera de "producir" valores null través de udfs (por supuesto, ya que Int s no puede ser null ).

Mi solución actual para la creación de df2 (Tabla 2) es la siguiente:

// holds data of table 1 val df1 = ... // filter entries from df1, that are not null val dfNotNulls = df1.filter(df1("x") .isNotNull) .withColumn("y", extractDateAsInt(df1("x"))) .withColumnRenamed("x", "right_x") // create df2 via a left join on df1 and dfNotNull having val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")

Preguntas :

  • La solución actual parece engorrosa (y probablemente no eficiente en términos de rendimiento). ¿Hay una mejor manera?
  • @ Spark-developers: ¿Hay un tipo NullableInt planificado / disponible, de modo que sea posible el siguiente udf (ver extracto del Código)?

Extracto de código

val extractDateAsNullableInt = udf[NullableInt, String] ( (d:String) => if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt else null )


Código suplementario

Con la buena respuesta de @ zero323, creé el siguiente código, para tener disponibles funciones definidas por el usuario que manejan valores nulos como se describe. ¡Espero que sea útil para otros!

/** * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that * handle `null` values. */ object NullableFunctions { import org.apache.spark.sql.functions._ import scala.reflect.runtime.universe.{TypeTag} import org.apache.spark.sql.UserDefinedFunction /** * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that * * if fnc input is null, None is returned. This will create a null value in the output Spark column. * * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column. * @param f function from A1 => RT * @tparam RT return type * @tparam A1 input parameter type * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above */ def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { udf[Option[RT],A1]( (i: A1) => i match { case null => None case s => Some(f(i)) }) } /** * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that * * if on of the function input parameters is null, None is returned. * This will create a null value in the output Spark column. * * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2) * as value in the output column. * @param f function from A1 => RT * @tparam RT return type * @tparam A1 input parameter type * @tparam A2 input parameter type * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above */ def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = { udf[Option[RT], A1, A2]( (i1: A1, i2: A2) => (i1, i2) match { case (null, _) => None case (_, null) => None case (s1, s2) => Some((f(s1,s2))) } ) } }


Aquí es donde la Option es útil:

val extractDateAsOptionInt = udf((d: String) => d match { case null => None case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt) })

o para hacerlo un poco más seguro en el caso general:

import scala.util.Try val extractDateAsOptionInt = udf((d: String) => Try( d.substring(0, 10).filterNot("-".toSet).toInt ).toOption)

Todo el crédito va para Dmitriy Selivanov, quien señaló esta solución como una edición (¿falta?) here .

La alternativa es manejar null fuera del UDF:

import org.apache.spark.sql.functions.{lit, when} import org.apache.spark.sql.types.IntegerType val extractDateAsInt = udf( (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt ) df.withColumn("y", when($"x".isNull, lit(null)) .otherwise(extractDateAsInt($"x")) .cast(IntegerType) )


Scala en realidad tiene una buena función de fábrica, Option (), que puede hacer que esto sea aún más conciso:

val extractDateAsOptionInt = udf((d: String) => Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))

Internamente, el método de aplicación del objeto Opción solo está haciendo la comprobación nula por usted:

def apply[A](x: A): Option[A] = if (x == null) None else Some(x)