scala apache-spark udf

scala - Spark UDF con varargs



apache-spark (1)

Los UDF no admiten varargs *, pero puede pasar un número arbitrario de columnas ajustadas mediante una función de array :

import org.apache.spark.sql.functions.{udf, array, lit} val myConcatFunc = (xs: Seq[Any], sep: String) => xs.filter(_ != null).mkString(sep) val myConcat = udf(myConcatFunc)

Un ejemplo de uso:

val df = sc.parallelize(Seq( (null, "a", "b", "c"), ("d", null, null, "e") )).toDF("x1", "x2", "x3", "x4") val cols = array($"x1", $"x2", $"x3", $"x4") val sep = lit("-") df.select(myConcat(cols, sep).alias("concatenated")).show // +------------+ // |concatenated| // +------------+ // | a-b-c| // | d-e| // +------------+

Con SQL sin procesar:

df.registerTempTable("df") sqlContext.udf.register("myConcat", myConcatFunc) sqlContext.sql( "SELECT myConcat(array(x1, x2, x4), ''.'') AS concatenated FROM df" ).show // +------------+ // |concatenated| // +------------+ // | a.c| // | d.e| // +------------+

Un enfoque un poco más complicado es no usar UDF en absoluto y componer expresiones SQL con algo parecido a esto:

import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column def myConcatExpr(sep: String, cols: Column*) = regexp_replace(concat( cols.foldLeft(lit(""))( (acc, c) => when(c.isNotNull, concat(acc, c, lit(sep))).otherwise(acc) ) ), s"($sep)?$$", "") df.select( myConcatExpr("-", $"x1", $"x2", $"x3", $"x4").alias("concatenated") ).show // +------------+ // |concatenated| // +------------+ // | a-b-c| // | d-e| // +------------+

pero dudo que valga la pena a menos que trabajes con PySpark.

* Si pasa una función usando varargs, se eliminará de todo el azúcar sintáctico y el UDF resultante esperará un ArrayType . Por ejemplo:

def f(s: String*) = s.mkString udf(f _)

será de tipo:

UserDefinedFunction(<function1>,StringType,List(ArrayType(StringType,true)))

¿Es una única opción enumerar todos los argumentos hasta 22 como se muestra en la documentación?

https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.UDFRegistration

¿Alguien descubrió cómo hacer algo similar a esto?

sc.udf.register("func", (s: String*) => s......

(escribiendo la función concat personalizada que omite los valores nulos, tenía que 2 argumentos en el momento)

Gracias