scala - functions - ¿Definir un UDF que acepte una matriz de objetos en un Spark DataFrame?
user defined functions spark scala (1)
Al trabajar con los marcos de datos de Spark, se requieren funciones definidas por el usuario (UDF) para asignar datos en columnas. Las UDF requieren que los tipos de argumento se especifiquen explícitamente. En mi caso, necesito manipular una columna que se compone de matrices de objetos, y no sé qué tipo usar. Aquí hay un ejemplo:
import sqlContext.implicits._
// Start with some data. Each row (here, there''s only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
Es relativamente sencillo utilizar las funciones
org.apache.spark.sql.functions
incorporadas para realizar operaciones básicas en los datos de las columnas.
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
y generalmente es fácil escribir UDF personalizados para realizar operaciones arbitrarias
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
Pero, ¿qué sucede si quiero usar un UDF para manipular la matriz de objetos en la columna "sujetos"? ¿Qué tipo utilizo para el argumento en el UDF? Por ejemplo, si quiero volver a implementar la función de tamaño, en lugar de usar la proporcionada por spark:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
Claramente, la
Array[Something]
no funciona ... ¿qué tipo debo usar?
¿Debo abandonar
Array[]
completo?
scala.collection.mutable.WrappedArray
me dice que
scala.collection.mutable.WrappedArray
puede tener algo que ver con eso, pero aún hay otro tipo que debo proporcionar.
Lo que estás buscando es
Seq[oassql.Row]
:
import org.apache.spark.sql.Row
val my_size = udf { subjects: Seq[Row] => subjects.size }
Explicacion :
-
La representación actual de
ArrayType
es, como ya sabe,WrappedArray
,WrappedArray
lo queArray
no funcionará y es mejor mantenerse seguro. -
Según la especificación oficial
, el tipo local (externo) para
StructType
esRow
. Desafortunadamente, significa que el acceso a los campos individuales no es de tipo seguro.
Notas :
-
Para crear una
struct
en Spark <2.3, la función pasada audf
debe devolver el tipo deProduct
(Tuple*
ocase class
), noRow
. Esto se debe a que las variantesudf
correspondientes dependen de la reflexión de Scala :Define un cierre de Scala de n argumentos como función definida por el usuario (UDF). Los tipos de datos se infieren automáticamente en función de la firma del cierre de Scala.
-
En Spark> = 2.3 es posible devolver
Row
directamente, siempre que se proporcione el esquema .def udf(f: AnyRef, dataType: DataType): UserDefinedFunction
Define una función determinista definida por el usuario (UDF) utilizando un cierre Scala. Para esta variante, la persona que llama debe especificar el tipo de datos de salida, y no hay coerción de tipo de entrada automática.Consulte, por ejemplo, ¿Cómo crear un Spark UDF en Java / Kotlin que devuelve un tipo complejo? .