tutorial sqlcontext spark functions example espaƱol sql scala apache-spark dataframe spark-dataframe apache-spark-sql

sqlcontext - spark sql tutorial



Consultar Spark SQL DataFrame con tipos complejos (3)

¿Cómo puedo consultar un RDD con tipos complejos como mapas / matrices? por ejemplo, cuando estaba escribiendo este código de prueba:

case class Test(name: String, map: Map[String, String]) val map = Map("hello" -> "world", "hey" -> "there") val map2 = Map("hello" -> "people", "hey" -> "you") val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))

Pensé que la sintaxis sería algo así como:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")

o

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")

pero consigo

No se puede acceder al campo anidado en el tipo MapType (StringType, StringType, true)

y

org.apache.spark.sql.catalyst.errors.package $ TreeNodeException: atributos sin resolver

respectivamente.


Depende de un tipo de columna. Comencemos con algunos datos ficticios:

import org.apache.spark.sql.functions.{udf, lit} import scala.util.Try case class SubRecord(x: Int) case class ArrayElement(foo: String, bar: Int, vals: Array[Double]) case class Record( an_array: Array[Int], a_map: Map[String, String], a_struct: SubRecord, an_array_of_structs: Array[ArrayElement]) val df = sc.parallelize(Seq( Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1), Array( ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)), ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))), Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2), Array(ArrayElement("foz", 3, Array(5.0, 6.0)), ArrayElement("baz", 4, Array(7.0, 8.0)))) )).toDF

df.registerTempTable("df") df.printSchema // root // |-- an_array: array (nullable = true) // | |-- element: integer (containsNull = false) // |-- a_map: map (nullable = true) // | |-- key: string // | |-- value: string (valueContainsNull = true) // |-- a_struct: struct (nullable = true) // | |-- x: integer (nullable = false) // |-- an_array_of_structs: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- foo: string (nullable = true) // | | |-- bar: integer (nullable = false) // | | |-- vals: array (nullable = true) // | | | |-- element: double (containsNull = false)

  • columnas de matriz ( ArrayType ):

    • Método Column.getItem

      df.select($"an_array".getItem(1)).show // +-----------+ // |an_array[1]| // +-----------+ // | 2| // | 5| // +-----------+

    • Sintaxis de corchetes de colmena:

      sqlContext.sql("SELECT an_array[1] FROM df").show // +---+ // |_c0| // +---+ // | 2| // | 5| // +---+

    • un UDF

      val get_ith = udf((xs: Seq[Int], i: Int) => Try(xs(i)).toOption) df.select(get_ith($"an_array", lit(1))).show // +---------------+ // |UDF(an_array,1)| // +---------------+ // | 2| // | 5| // +---------------+

    • Además de los métodos enumerados anteriormente, Spark admite una lista creciente de funciones integradas que operan en tipos complejos. Los ejemplos notables incluyen funciones de orden superior como transform (solo SQL, 2.4+):

      df.selectExpr("transform(an_array, x -> x + 1) an_array_inc").show // +------------+ // |an_array_inc| // +------------+ // | [2, 3, 4]| // | [5, 6, 7]| // +------------+

    • filter (solo SQL, 2.4+)

      df.selectExpr("filter(an_array, x -> x % 2 == 0) an_array_even").show // +-------------+ // |an_array_even| // +-------------+ // | [2]| // | [4, 6]| // +-------------+

    • aggregate (solo SQL, 2.4+):

      df.selectExpr("aggregate(an_array, 0, (acc, x) -> acc + x, acc -> acc) an_array_sum").show // +------------+ // |an_array_sum| // +------------+ // | 6| // | 15| // +------------+

    • funciones de procesamiento de matriz ( array_* ) como array_distinct (2.4+):

      import org.apache.spark.sql.functions.array_distinct df.select(array_distinct($"an_array_of_structs.vals"(0))).show // +-------------------------------------------+ // |array_distinct(an_array_of_structs.vals[0])| // +-------------------------------------------+ // | [1.0, 2.0]| // | [5.0, 6.0]| // +-------------------------------------------+

    • array_max ( array_min , 2.4+):

      import org.apache.spark.sql.functions.array_max df.select(array_max($"an_array")).show // +-------------------+ // |array_max(an_array)| // +-------------------+ // | 3| // | 6| // +-------------------+

    • flatten (2.4+)

      import org.apache.spark.sql.functions.flatten df.select(flatten($"an_array_of_structs.vals")).show // +---------------------------------+ // |flatten(an_array_of_structs.vals)| // +---------------------------------+ // | [1.0, 2.0, 2.0, 3...| // | [5.0, 6.0, 7.0, 8.0]| // +---------------------------------+

    • arrays_zip (2.4+):

      import org.apache.spark.sql.functions.arrays_zip df.select(arrays_zip($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show(false) // +--------------------------------------------------------------------+ // |arrays_zip(an_array_of_structs.vals[0], an_array_of_structs.vals[1])| // +--------------------------------------------------------------------+ // |[[1.0, 3.0], [2.0, 4.0], [2.0, 5.0]] | // |[[5.0, 7.0], [6.0, 8.0]] | // +--------------------------------------------------------------------+

    • array_union (2.4+):

      import org.apache.spark.sql.functions.array_union df.select(array_union($"an_array_of_structs.vals"(0), $"an_array_of_structs.vals"(1))).show // +---------------------------------------------------------------------+ // |array_union(an_array_of_structs.vals[0], an_array_of_structs.vals[1])| // +---------------------------------------------------------------------+ // | [1.0, 2.0, 3.0, 4...| // | [5.0, 6.0, 7.0, 8.0]| // +---------------------------------------------------------------------+

    • slice (2.4+):

      import org.apache.spark.sql.functions.slice df.select(slice($"an_array", 2, 2)).show // +---------------------+ // |slice(an_array, 2, 2)| // +---------------------+ // | [2, 3]| // | [5, 6]| // +---------------------+

  • columnas del mapa ( MapType )

    • utilizando el método Column.getField :

      df.select($"a_map".getField("foo")).show // +----------+ // |a_map[foo]| // +----------+ // | bar| // | null| // +----------+

    • usando la sintaxis de corchetes de Hive:

      sqlContext.sql("SELECT a_map[''foz''] FROM df").show // +----+ // | _c0| // +----+ // |null| // | baz| // +----+

    • usando una ruta completa con sintaxis de puntos:

      df.select($"a_map.foo").show // +----+ // | foo| // +----+ // | bar| // |null| // +----+

    • usando un UDF

      val get_field = udf((kvs: Map[String, String], k: String) => kvs.get(k)) df.select(get_field($"a_map", lit("foo"))).show // +--------------+ // |UDF(a_map,foo)| // +--------------+ // | bar| // | null| // +--------------+

    • Número creciente de funciones map_* como map_keys ( map_keys )

      import org.apache.spark.sql.functions.map_keys df.select(map_keys($"a_map")).show // +---------------+ // |map_keys(a_map)| // +---------------+ // | [foo]| // | [foz]| // +---------------+

    • o map_values ( map_values )

      import org.apache.spark.sql.functions.map_values df.select(map_values($"a_map")).show // +-----------------+ // |map_values(a_map)| // +-----------------+ // | [bar]| // | [baz]| // +-----------------+

    Consulte SPARK-23899 para obtener una lista detallada.

  • columnas de struct ( StructType ) que utilizan la ruta completa con sintaxis de puntos:

    • con API DataFrame

      df.select($"a_struct.x").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+

    • con SQL sin formato

      sqlContext.sql("SELECT a_struct.x FROM df").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+

  • Se puede acceder a los campos dentro de la matriz de structs utilizando sintaxis de puntos, nombres y métodos de Column estándar:

    df.select($"an_array_of_structs.foo").show // +----------+ // | foo| // +----------+ // |[foo, bar]| // |[foz, baz]| // +----------+ sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show // +---+ // |_c0| // +---+ // |foo| // |foz| // +---+ df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show // +------------------------------+ // |an_array_of_structs.vals[1][1]| // +------------------------------+ // | 4.0| // | 8.0| // +------------------------------+

  • Se puede acceder a los campos de tipos definidos por el usuario (UDT) mediante UDF. Vea los atributos de referencia SparkSQL de UDT para más detalles.

Notas :

  • dependiendo de una versión de Spark, algunos de estos métodos pueden estar disponibles solo con HiveContext . Los UDF deberían funcionar independientemente de la versión con SQLContext estándar y HiveContext .
  • En términos generales, los valores anidados son ciudadanos de segunda clase. No todas las operaciones típicas son compatibles con campos anidados. Dependiendo de un contexto, podría ser mejor aplanar el esquema y / o explotar colecciones

    df.select(explode($"an_array_of_structs")).show // +--------------------+ // | col| // +--------------------+ // |[foo,1,WrappedArr...| // |[bar,2,WrappedArr...| // |[foz,3,WrappedArr...| // |[baz,4,WrappedArr...| // +--------------------+

  • La sintaxis de puntos se puede combinar con caracteres comodín ( * ) para seleccionar (posiblemente múltiples) campos sin especificar nombres explícitamente:

    df.select($"a_struct.*").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+

  • Las columnas JSON pueden consultarse usando las funciones get_json_object y from_json . Consulte ¿Cómo consultar la columna de datos JSON usando Spark DataFrames? para detalles.


Una vez que lo convierta a DF, simplemente puede buscar datos como

val rddRow= rdd.map(kv=>{ val k = kv._1 val v = kv._2 Row(k, v) }) val myFld1 = StructField("name", org.apache.spark.sql.types.StringType, true) val myFld2 = StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true) val arr = Array( myFld1, myFld2) val schema = StructType( arr ) val rowrddDF = sqc.createDataFrame(rddRow, schema) rowrddDF.registerTempTable("rowtbl") val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one")) or val rowrddDFFinal = rowrddDF.select("map.one")


aquí fue lo que hice y funcionó

case class Test(name: String, m: Map[String, String]) val map = Map("hello" -> "world", "hey" -> "there") val map2 = Map("hello" -> "people", "hey" -> "you") val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2))) val rdddf = rdd.toDF rdddf.registerTempTable("mytable") sqlContext.sql("select m.hello from mytable").show

Resultados

+------+ | hello| +------+ | world| |people| +------+