scala - tutorial - apache spark vs hadoop
Explosión anidada Struct en el marco de datos de Spark (3)
Estoy trabajando a través del example Databricks. El esquema para el marco de datos se ve así:
> parquetDF.printSchema
root
|-- department: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- salary: integer (nullable = true)
En el ejemplo, muestran cómo explotar la columna de empleados en 4 columnas adicionales:
val explodeDF = parquetDF.explode($"employees") {
case Row(employee: Seq[Row]) => employee.map{ employee =>
val firstName = employee(0).asInstanceOf[String]
val lastName = employee(1).asInstanceOf[String]
val email = employee(2).asInstanceOf[String]
val salary = employee(3).asInstanceOf[Int]
Employee(firstName, lastName, email, salary)
}
}.cache()
display(explodeDF)
¿Cómo haría algo similar con la columna del departamento (es decir, agregar dos columnas adicionales al marco de datos llamado "id" y "nombre")? Los métodos no son exactamente iguales, y solo puedo descubrir cómo crear un nuevo marco de datos utilizando:
val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)
Si lo intento
val explodeDF = parquetDF.explode($"department") {
case Row(dept: Seq[String]) => dept.map{dept =>
val id = dept(0)
val name = dept(1)
}
}.cache()
display(explodeDF)
Me sale la advertencia y el error:
<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
case Row(dept: Seq[String]) => dept.map{dept =>
^
<console>:37: error: inferred type arguments [Unit] do not conform to method explode''s type parameter bounds [A <: Product]
val explodeDF = parquetDF.explode($"department") {
^
En mi opinión, la solución más elegante es comenzar a expandir un Struct utilizando un operador selecto como se muestra a continuación:
var explodedDf2 = explodedDf.select("department.*","*")
https://docs.databricks.com/spark/latest/spark-sql/complex-types.html
Esto parece funcionar (aunque tal vez no sea la solución más elegante).
var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name"))
Podrías usar algo así:
var explodeDF = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name"))
en lo que me ayudaste y estas preguntas: