example - spark sql que es
Explotar(¿transponer?) Varias columnas en la tabla Spark SQL (2)
Estoy usando Spark SQL (menciono que está en Spark en caso de que afecte la sintaxis de SQL; todavía no estoy lo suficientemente familiarizado para estar seguro) y tengo una tabla que estoy tratando de reestructurar, pero estoy quedarse atascado tratando de transponer varias columnas al mismo tiempo.
Básicamente tengo datos que se parecen a:
userId someString varA varB
1 "example1" [0,2,5] [1,2,9]
2 "example2" [1,20,5] [9,null,6]
y me gustaría explotar tanto varA como varB simultáneamente (la longitud siempre será consistente), para que el resultado final se vea así:
userId someString varA varB
1 "example1" 0 1
1 "example1" 2 2
1 "example1" 5 9
2 "example2" 1 9
2 "example2" 20 null
2 "example2" 5 6
pero solo puedo parecer que una sola declaración de explosión (var) funcione en un comando, y si trato de encadenarlos (es decir, crear una tabla temporal después del primer comando de explosión), obviamente obtengo una gran cantidad de duplicados innecesarios filas
¡Muchas gracias!
También podrías intentar
case class Input(
userId: Integer,
someString: String,
varA: Array[Integer],
varB: Array[Integer])
case class Result(
userId: Integer,
someString: String,
varA: Integer,
varB: Integer)
def getResult(row : Input) : Iterable[Result] = {
val user_id = row.user_id
val someString = row.someString
val varA = row.varA
val varB = row.varB
val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))}
seq
}
val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9))
val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10))
val input_df = sc.parallelize(Seq(obj1, obj2)).toDS
val res = input_df.flatMap{ row => getResult(row) }
res.show
// +------+----------+----+-----+
// |userId|someString|varA|varB |
// +------+----------+----+-----+
// | 1| string1 | 0| 1 |
// | 1| string1 | 2| 2 |
// | 1| string1 | 5| 9 |
// | 2| string2 | 1| 2 |
// | 2| string2 | 3| 3 |
// | 2| string2 | 6| 10|
// +------+----------+----+-----+
Chispa> = 2.4
Puede omitir
zip
udf
y usar la función
arrays_zip
:
df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars.varA", $"vars.varB").show
Chispa <2.4
Lo que desea no es posible sin un UDF personalizado. En Scala podrías hacer algo como esto:
val data = sc.parallelize(Seq(
"""{"userId": 1, "someString": "example1",
"varA": [0, 2, 5], "varB": [1, 2, 9]}""",
"""{"userId": 2, "someString": "example2",
"varA": [1, 20, 5], "varB": [9, null, 6]}"""
))
val df = spark.read.json(data)
df.printSchema
// root
// |-- someString: string (nullable = true)
// |-- userId: long (nullable = true)
// |-- varA: array (nullable = true)
// | |-- element: long (containsNull = true)
// |-- varB: array (nullable = true)
// | |-- element: long (containsNull = true)
Ahora podemos definir
zip
udf:
import org.apache.spark.sql.functions.{udf, explode}
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// | 1| example1| 0| 1|
// | 1| example1| 2| 2|
// | 1| example1| 5| 9|
// | 2| example2| 1| 9|
// | 2| example2| 20|null|
// | 2| example2| 5| 6|
// +------+----------+----+----+
Con SQL sin procesar:
sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")