spark que example español sql apache-spark apache-spark-sql hiveql

example - spark sql que es



Explotar(¿transponer?) Varias columnas en la tabla Spark SQL (2)

Estoy usando Spark SQL (menciono que está en Spark en caso de que afecte la sintaxis de SQL; todavía no estoy lo suficientemente familiarizado para estar seguro) y tengo una tabla que estoy tratando de reestructurar, pero estoy quedarse atascado tratando de transponer varias columnas al mismo tiempo.

Básicamente tengo datos que se parecen a:

userId someString varA varB 1 "example1" [0,2,5] [1,2,9] 2 "example2" [1,20,5] [9,null,6]

y me gustaría explotar tanto varA como varB simultáneamente (la longitud siempre será consistente), para que el resultado final se vea así:

userId someString varA varB 1 "example1" 0 1 1 "example1" 2 2 1 "example1" 5 9 2 "example2" 1 9 2 "example2" 20 null 2 "example2" 5 6

pero solo puedo parecer que una sola declaración de explosión (var) funcione en un comando, y si trato de encadenarlos (es decir, crear una tabla temporal después del primer comando de explosión), obviamente obtengo una gran cantidad de duplicados innecesarios filas

¡Muchas gracias!


También podrías intentar

case class Input( userId: Integer, someString: String, varA: Array[Integer], varB: Array[Integer]) case class Result( userId: Integer, someString: String, varA: Integer, varB: Integer) def getResult(row : Input) : Iterable[Result] = { val user_id = row.user_id val someString = row.someString val varA = row.varA val varB = row.varB val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))} seq } val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9)) val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10)) val input_df = sc.parallelize(Seq(obj1, obj2)).toDS val res = input_df.flatMap{ row => getResult(row) } res.show // +------+----------+----+-----+ // |userId|someString|varA|varB | // +------+----------+----+-----+ // | 1| string1 | 0| 1 | // | 1| string1 | 2| 2 | // | 1| string1 | 5| 9 | // | 2| string2 | 1| 2 | // | 2| string2 | 3| 3 | // | 2| string2 | 6| 10| // +------+----------+----+-----+


Chispa> = 2.4

Puede omitir zip udf y usar la función arrays_zip :

df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select( $"userId", $"someString", $"vars.varA", $"vars.varB").show

Chispa <2.4

Lo que desea no es posible sin un UDF personalizado. En Scala podrías hacer algo como esto:

val data = sc.parallelize(Seq( """{"userId": 1, "someString": "example1", "varA": [0, 2, 5], "varB": [1, 2, 9]}""", """{"userId": 2, "someString": "example2", "varA": [1, 20, 5], "varB": [9, null, 6]}""" )) val df = spark.read.json(data) df.printSchema // root // |-- someString: string (nullable = true) // |-- userId: long (nullable = true) // |-- varA: array (nullable = true) // | |-- element: long (containsNull = true) // |-- varB: array (nullable = true) // | |-- element: long (containsNull = true)

Ahora podemos definir zip udf:

import org.apache.spark.sql.functions.{udf, explode} val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) df.withColumn("vars", explode(zip($"varA", $"varB"))).select( $"userId", $"someString", $"vars._1".alias("varA"), $"vars._2".alias("varB")).show // +------+----------+----+----+ // |userId|someString|varA|varB| // +------+----------+----+----+ // | 1| example1| 0| 1| // | 1| example1| 2| 2| // | 1| example1| 5| 9| // | 2| example2| 1| 9| // | 2| example2| 20|null| // | 2| example2| 5| 6| // +------+----------+----+----+

Con SQL sin procesar:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) df.registerTempTable("df") sqlContext.sql( """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")