hadoop - read - Cómo leer una colección anidada en Spark
read parquet pyspark (4)
Daré una respuesta basada en Python ya que eso es lo que estoy usando. Creo que Scala tiene algo similar.
La función de explode
se agregó en Spark 1.4.0 para manejar matrices anidadas en DataFrames, de acuerdo con los documentos API de Python .
Crear un marco de datos de prueba:
from pyspark.sql import Row
df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()
## +-+--------------------+
## |a| intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+
Utilice explode
para aplanar la columna de la lista:
from pyspark.sql.functions import explode
df.select(df.a, explode(df.intlist)).show()
## +-+---+
## |a|_c0|
## +-+---+
## |1| 1|
## |1| 2|
## |1| 3|
## |2| 4|
## |2| 5|
## |2| 6|
## +-+---+
Tengo una mesa de parquet con una de las columnas siendo
, array <struct <col1, col2, .. colN >>
Puede ejecutar consultas en esta tabla en Hive usando la sintaxis de VISTA LATERAL.
¿Cómo leer esta tabla en un RDD y, lo que es más importante, cómo filtrar, mapear, etc. esta colección anidada en Spark?
No se pudo encontrar ninguna referencia a esto en la documentación de Spark. ¡Gracias de antemano por cualquier información!
PD. El fieltro podría ser útil para dar algunas estadísticas sobre la mesa. Número de columnas en la tabla principal ~ 600. Número de filas ~ 200m. Número de "columnas" en la colección anidada ~ 10. Número medio de registros en la colección anidada ~ 35.
Las respuestas anteriores son todas respuestas geniales y abordan esta pregunta desde diferentes lados; Spark SQL también es una forma bastante útil de acceder a datos anidados.
Este es un ejemplo de cómo utilizar explode () en SQL directamente para consultar una colección anidada.
SELECT hholdid, tsp.person_seq_no
FROM ( SELECT hholdid, explode(tsp_ids) as tsp
FROM disc_mrt.unified_fact uf
)
tsp_ids es una estructura anidada de estructuras, que tiene muchos atributos, incluyendo person_seq_no, que selecciono en la consulta externa anterior.
Lo anterior fue probado en Spark 2.0. Hice una pequeña prueba y no funciona en Spark 1.6. Esta pregunta se hizo cuando Spark 2 no estaba presente, por lo que esta respuesta se agrega muy bien a la lista de opciones disponibles para tratar con estructuras anidadas.
Notable no resuelto JIRAs en explode () para el acceso de SQL:
Otro enfoque sería utilizar la coincidencia de patrones como este:
val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match {
case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
case List(a:String, b: String) => (a, b)
}).toList
})
Puede hacer un patrón de coincidencia directamente en la fila, pero es probable que falle por algunas razones.
No hay magia en el caso de la colección anidada. Spark manejará de la misma manera un RDD[(String, String)]
y un RDD[(String, Seq[String])]
.
Sin embargo, leer dicha colección anidada de archivos de Parquet puede ser complicado.
Tomemos un ejemplo de la spark-shell
(1.3.1):
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
Escribe el archivo de parquet:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
Lea el archivo de parquet:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
La parte importante es row.getAs[Seq[Row]](1)
. La representación interna de una secuencia de struct
anidada es ArrayBuffer[Row]
, puede usar cualquier ArrayBuffer[Row]
en lugar de Seq[Row]
. El 1
es el índice de la columna en la fila exterior. Utilicé el método getAs
aquí, pero hay alternativas en las últimas versiones de Spark. Ver el código fuente del rasgo Row .
Ahora que tiene un RDD[Outer]
, puede aplicar cualquier transformación o acción deseada.
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
Tenga en cuenta que usamos la biblioteca de chispa-SQL solo para leer el archivo de parquet. Por ejemplo, puede seleccionar solo las columnas deseadas directamente en el DataFrame, antes de asignarlo a un RDD.
dataFrame.select(''col1, ''col2).map { row => ... }