java - read - spark sql example
Spark sql cómo explotar sin perder valores nulos (3)
Después de la respuesta aceptada, cuando los elementos de la matriz son de un tipo complejo, puede ser difícil definirlo a mano (por ejemplo, con estructuras grandes).
Para hacerlo automáticamente, escribí el siguiente método auxiliar:
def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
val arrayFields = df.schema.fields
.map(field => field.name -> field.dataType)
.collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
.toMap
columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
.otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))
}
Tengo un Dataframe que estoy tratando de aplanar. Como parte del proceso, quiero explotarlo, así que si tengo una columna de matrices, cada valor de la matriz se usará para crear una fila separada. Por ejemplo,
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
debe convertirse
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
Este es mi codigo
private DataFrame explodeDataFrame(DataFrame df) {
DataFrame resultDf = df;
for (StructField field : df.schema().fields()) {
if (field.dataType() instanceof ArrayType) {
resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
resultDf.show();
}
}
return resultDf;
}
El problema es que en mis datos, algunas de las columnas de la matriz tienen valores nulos. En ese caso, se elimina toda la fila. Entonces este marco de datos:
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
2 | Lucy | null
se convierte
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
en vez de
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
2 | Lucy | null
¿Cómo puedo explotar mis matrices para no perder las filas nulas?
Estoy usando Spark 1.5.2 y Java 8
Puede usar la función
explode_outer()
.
Spark 2.2+
Puede usar la función
explode_outer
:
import org.apache.spark.sql.functions.explode_outer
df.withColumn("likes", explode_outer($"likes")).show
// +---+----+--------+
// | id|name| likes|
// +---+----+--------+
// | 1|Luke|baseball|
// | 1|Luke| soccer|
// | 2|Lucy| null|
// +---+----+--------+
Chispa <= 2.1
En Scala, pero el equivalente de Java debe ser casi idéntico (para importar funciones individuales, use
import static
).
import org.apache.spark.sql.functions.{array, col, explode, lit, when}
val df = Seq(
(1, "Luke", Some(Array("baseball", "soccer"))),
(2, "Lucy", None)
).toDF("id", "name", "likes")
df.withColumn("likes", explode(
when(col("likes").isNotNull, col("likes"))
// If null explode an array<string> with a single null
.otherwise(array(lit(null).cast("string")))))
La idea aquí es básicamente reemplazar
NULL
con una
array(NULL)
del tipo deseado.
Para el tipo complejo (también conocido como
structs
), debe proporcionar un esquema completo:
val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")
val st = StructType(Seq(
StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast(st)))))
o
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
Nota :
Si la
Column
matriz se ha creado con
containsNull
establecido en
false
, debe cambiar esto primero (probado con Spark 2.1):
df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))