spark functions define scala apache-spark dataframe apache-spark-sql user-defined-functions

scala - functions - ¿Definir un UDF que acepte una matriz de objetos en un Spark DataFrame?



user defined functions spark scala (1)

Al trabajar con los marcos de datos de Spark, se requieren funciones definidas por el usuario (UDF) para asignar datos en columnas. Las UDF requieren que los tipos de argumento se especifiquen explícitamente. En mi caso, necesito manipular una columna que se compone de matrices de objetos, y no sé qué tipo usar. Aquí hay un ejemplo:

import sqlContext.implicits._ // Start with some data. Each row (here, there''s only one row) // is a topic and a bunch of subjects val data = sqlContext.read.json(sc.parallelize(Seq( """ |{ | "topic" : "pets", | "subjects" : [ | {"type" : "cat", "score" : 10}, | {"type" : "dog", "score" : 1} | ] |} """)))

Es relativamente sencillo utilizar las funciones org.apache.spark.sql.functions incorporadas para realizar operaciones básicas en los datos de las columnas.

import org.apache.spark.sql.functions.size data.select($"topic", size($"subjects")).show +-----+--------------+ |topic|size(subjects)| +-----+--------------+ | pets| 2| +-----+--------------+

y generalmente es fácil escribir UDF personalizados para realizar operaciones arbitrarias

import org.apache.spark.sql.functions.udf val enhance = udf { topic : String => topic.toUpperCase() } data.select(enhance($"topic"), size($"subjects")).show +----------+--------------+ |UDF(topic)|size(subjects)| +----------+--------------+ | PETS| 2| +----------+--------------+

Pero, ¿qué sucede si quiero usar un UDF para manipular la matriz de objetos en la columna "sujetos"? ¿Qué tipo utilizo para el argumento en el UDF? Por ejemplo, si quiero volver a implementar la función de tamaño, en lugar de usar la proporcionada por spark:

val my_size = udf { subjects: Array[Something] => subjects.size } data.select($"topic", my_size($"subjects")).show

Claramente, la Array[Something] no funciona ... ¿qué tipo debo usar? ¿Debo abandonar Array[] completo? scala.collection.mutable.WrappedArray me dice que scala.collection.mutable.WrappedArray puede tener algo que ver con eso, pero aún hay otro tipo que debo proporcionar.


Lo que estás buscando es Seq[oassql.Row] :

import org.apache.spark.sql.Row val my_size = udf { subjects: Seq[Row] => subjects.size }

Explicacion :

  • La representación actual de ArrayType es, como ya sabe, WrappedArray , WrappedArray lo que Array no funcionará y es mejor mantenerse seguro.
  • Según la especificación oficial , el tipo local (externo) para StructType es Row . Desafortunadamente, significa que el acceso a los campos individuales no es de tipo seguro.

Notas :

  • Para crear una struct en Spark <2.3, la función pasada a udf debe devolver el tipo de Product ( Tuple* o case class ), no Row . Esto se debe a que las variantes udf correspondientes dependen de la reflexión de Scala :

    Define un cierre de Scala de n argumentos como función definida por el usuario (UDF). Los tipos de datos se infieren automáticamente en función de la firma del cierre de Scala.

  • En Spark> = 2.3 es posible devolver Row directamente, siempre que se proporcione el esquema .

    def udf(f: AnyRef, dataType: DataType): UserDefinedFunction Define una función determinista definida por el usuario (UDF) utilizando un cierre Scala. Para esta variante, la persona que llama debe especificar el tipo de datos de salida, y no hay coerción de tipo de entrada automática.

    Consulte, por ejemplo, ¿Cómo crear un Spark UDF en Java / Kotlin que devuelve un tipo complejo? .