sqlcontext - spark sql tutorial
Consultar Spark SQL DataFrame con tipos complejos (3)
¿Cómo puedo consultar un RDD con tipos complejos como mapas / matrices? por ejemplo, cuando estaba escribiendo este código de prueba:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
Pensé que la sintaxis sería algo así como:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
o
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
pero consigo
No se puede acceder al campo anidado en el tipo MapType (StringType, StringType, true)
y
org.apache.spark.sql.catalyst.errors.package $ TreeNodeException: atributos sin resolver
respectivamente.
Depende de un tipo de columna. Comencemos con algunos datos ficticios:
import org.apache.spark.sql.functions.{udf, lit}
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
case class Record(
an_array: Array[Int], a_map: Map[String, String],
a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
Array(
ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
Array(ArrayElement("foz", 3, Array(5.0, 6.0)),
ArrayElement("baz", 4, Array(7.0, 8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
-
columnas de matriz (
ArrayType
):-
Método
Column.getItem
df.select($"an_array".getItem(1)).show // +-----------+ // |an_array[1]| // +-----------+ // | 2| // | 5| // +-----------+
-
Sintaxis de corchetes de colmena:
sqlContext.sql("SELECT an_array[1] FROM df").show // +---+ // |_c0| // +---+ // | 2| // | 5| // +---+
-
un UDF
val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption) df.select(get_ith($"an_array", lit(1))).show // +---------------+ // |UDF(an_array,1)| // +---------------+ // | 2| // | 5| // +---------------+
-
Además de los métodos enumerados anteriormente, Spark admite una lista creciente de funciones integradas que operan en tipos complejos. Los ejemplos notables incluyen funciones de orden superior como
transform
(solo SQL, 2.4+):df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show // +------------+ // |an_array_inc| // +------------+ // | [2, 3, 4]| // | [5, 6, 7]| // +------------+
-
filter
(solo SQL, 2.4+)df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show // +-------------+ // |an_array_even| // +-------------+ // | [2]| // | [4, 6]| // +-------------+
-
aggregate
(solo SQL, 2.4+):df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show // +------------+ // |an_array_sum| // +------------+ // | 6| // | 15| // +------------+
-
funciones de procesamiento de matriz (
array_*
) comoarray_distinct
(2.4+):import org.apache.spark.sql.functions.array_distinct df.select(array_distinct($"an_array_of_structs.vals"(0))).show // +-------------------------------------------+ // |array_distinct(an_array_of_structs.vals[0])| // +-------------------------------------------+ // | [1.0, 2.0]| // | [5.0, 6.0]| // +-------------------------------------------+
-
array_max
(array_min
, 2.4+):import org.apache.spark.sql.functions.array_max df.select(array_max($"an_array")).show // +-------------------+ // |array_max(an_array)| // +-------------------+ // | 3| // | 6| // +-------------------+
-
flatten
(2.4+)import org.apache.spark.sql.functions.flatten df.select(flatten($"an_array_of_structs.vals")).show // +---------------------------------+ // |flatten(an_array_of_structs.vals)| // +---------------------------------+ // | [1.0, 2.0, 2.0, 3...| // | [5.0, 6.0, 7.0, 8.0]| // +---------------------------------+
-
arrays_zip
(2.4+):import org.apache.spark.sql.functions.arrays_zip df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false) // +--------------------------------------------------------------------+ // |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])| // +--------------------------------------------------------------------+ // |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] | // |[[5.0, 7.0], [6.0, 8.0]] | // +--------------------------------------------------------------------+
-
array_union
(2.4+):import org.apache.spark.sql.functions.array_union df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show // +---------------------------------------------------------------------+ // |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])| // +---------------------------------------------------------------------+ // | [1.0, 2.0, 3.0, 4...| // | [5.0, 6.0, 7.0, 8.0]| // +---------------------------------------------------------------------+
-
slice
(2.4+):import org.apache.spark.sql.functions.slice df.select(slice($"an_array", 2, 2)).show // +---------------------+ // |slice(an_array, 2, 2)| // +---------------------+ // | [2, 3]| // | [5, 6]| // +---------------------+
-
-
columnas del mapa (
MapType
)-
utilizando el método
Column.getField
:df.select($"a_map".getField("foo")).show // +----------+ // |a_map[foo]| // +----------+ // | bar| // | null| // +----------+
-
usando la sintaxis de corchetes de Hive:
sqlContext.sql("SELECT a_map[''foz''] FROM df").show // +----+ // | _c0| // +----+ // |null| // | baz| // +----+
-
usando una ruta completa con sintaxis de puntos:
df.select($"a_map.foo").show // +----+ // | foo| // +----+ // | bar| // |null| // +----+
-
usando un UDF
val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k)) df.select(get_field($"a_map", lit("foo"))).show // +--------------+ // |UDF(a_map,foo)| // +--------------+ // | bar| // | null| // +--------------+
-
Número creciente de funciones
map_*
comomap_keys
(map_keys
)import org.apache.spark.sql.functions.map_keys df.select(map_keys($"a_map")).show // +---------------+ // |map_keys(a_map)| // +---------------+ // | [foo]| // | [foz]| // +---------------+
-
o
map_values
(map_values
)import org.apache.spark.sql.functions.map_values df.select(map_values($"a_map")).show // +-----------------+ // |map_values(a_map)| // +-----------------+ // | [bar]| // | [baz]| // +-----------------+
Consulte SPARK-23899 para obtener una lista detallada.
-
-
columnas de struct (
StructType
) que utilizan la ruta completa con sintaxis de puntos:-
con API DataFrame
df.select($"a_struct.x").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+
-
con SQL sin formato
sqlContext.sql("SELECT a_struct.x FROM df").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+
-
-
Se puede acceder a los campos dentro de la matriz de
structs
utilizando sintaxis de puntos, nombres y métodos deColumn
estándar:df.select($"an_array_of_structs.foo").show // +----------+ // | foo| // +----------+ // |[foo, bar]| // |[foz, baz]| // +----------+ sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show // +---+ // |_c0| // +---+ // |foo| // |foz| // +---+ df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show // +------------------------------+ // |an_array_of_structs.vals[1][1]| // +------------------------------+ // | 4.0| // | 8.0| // +------------------------------+
-
Se puede acceder a los campos de tipos definidos por el usuario (UDT) mediante UDF. Vea los atributos de referencia SparkSQL de UDT para más detalles.
Notas :
-
dependiendo de una versión de Spark, algunos de estos métodos pueden estar disponibles solo con
HiveContext
. Los UDF deberían funcionar independientemente de la versión conSQLContext
estándar yHiveContext
. -
En términos generales, los valores anidados son ciudadanos de segunda clase. No todas las operaciones típicas son compatibles con campos anidados. Dependiendo de un contexto, podría ser mejor aplanar el esquema y / o explotar colecciones
df.select(explode($"an_array_of_structs")).show // +--------------------+ // | col| // +--------------------+ // |[foo,1,WrappedArr...| // |[bar,2,WrappedArr...| // |[foz,3,WrappedArr...| // |[baz,4,WrappedArr...| // +--------------------+
-
La sintaxis de puntos se puede combinar con caracteres comodín (
*
) para seleccionar (posiblemente múltiples) campos sin especificar nombres explícitamente:df.select($"a_struct.*").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+
-
Las columnas JSON pueden consultarse usando las funciones
get_json_object
yfrom_json
. Consulte ¿Cómo consultar la columna de datos JSON usando Spark DataFrames? para detalles.
Una vez que lo convierta a DF, simplemente puede buscar datos como
val rddRow= rdd.map(kv=>{
val k = kv._1
val v = kv._2
Row(k, v)
})
val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true)
val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
val arr = Array( myFld1, myFld2)
val schema = StructType( arr )
val rowrddDF = sqc.createDataFrame(rddRow, schema)
rowrddDF.registerTempTable("rowtbl")
val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
or
val rowrddDFFinal = rowrddDF.select("map.one")
aquí fue lo que hice y funcionó
case class Test(name: String, m: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
val rdddf = rdd.toDF
rdddf.registerTempTable("mytable")
sqlContext.sql("select m.hello from mytable").show
Resultados
+------+
| hello|
+------+
| world|
|people|
+------+