usar una propiedad para macro insertar formulalocal español celda scala apache-spark spark-dataframe orc

scala - una - Agregación de múltiples columnas con función personalizada en chispa.



propiedad formula vba (4)

Me preguntaba si hay alguna manera de especificar una función de agregación personalizada para los marcos de datos de chispa en varias columnas.

Tengo una tabla como esta del tipo (nombre, artículo, precio):

john | tomato | 1.99 john | carrot | 0.45 bill | apple | 0.99 john | banana | 1.29 bill | taco | 2.59

a:

Me gustaría agregar el artículo y su costo para cada persona en una lista como esta:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29) bill | (apple, 0.99), (taco, 2.59)

¿Es esto posible en marcos de datos? Recientemente aprendí sobre collect_list pero parece que solo funciona para una columna.


Aquí hay una opción al convertir el marco de datos en un RDD de Mapa y luego llamar a un groupByKey en él. El resultado sería una lista de pares clave-valor donde valor es una lista de tuplas.

df.show +----+------+----+ | _1| _2| _3| +----+------+----+ |john|tomato|1.99| |john|carrot|0.45| |bill| apple|0.99| |john|banana|1.29| |bill| taco|2.59| +----+------+----+ val tuples = df.map(row => row(0) -> (row(1), row(2))) tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43 tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))


Considere utilizar la función struct para agrupar las columnas antes de recopilarlas como una lista:

import org.apache.spark.sql.functions.{collect_list, struct} import sqlContext.implicits._ val df = Seq( ("john", "tomato", 1.99), ("john", "carrot", 0.45), ("bill", "apple", 0.99), ("john", "banana", 1.29), ("bill", "taco", 2.59) ).toDF("name", "food", "price") df.groupBy($"name") .agg(collect_list(struct($"food", $"price")).as("foods")) .show(false)

Salidas:

+----+---------------------------------------------+ |name|foods | +----+---------------------------------------------+ |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| |bill|[[apple,0.99], [taco,2.59]] | +----+---------------------------------------------+


La forma más fácil de hacer esto como un DataFrame es recolectar primero dos listas y luego usar un UDF para zip las dos listas juntas. Algo como:

import org.apache.spark.sql.functions.{collect_list, udf} import sqlContext.implicits._ val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_)) val df = Seq( ("john", "tomato", 1.99), ("john", "carrot", 0.45), ("bill", "apple", 0.99), ("john", "banana", 1.29), ("bill", "taco", 2.59) ).toDF("name", "food", "price") val df2 = df.groupBy("name").agg( collect_list(col("food")) as "food", collect_list(col("price")) as "price" ).withColumn("food", zipper(col("food"), col("price"))).drop("price") df2.show(false) # +----+---------------------------------------------+ # |name|food | # +----+---------------------------------------------+ # |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| # |bill|[[apple,0.99], [taco,2.59]] | # +----+---------------------------------------------+


Quizás una forma mejor que la función zip ( ya que UDF y UDAF son muy malas para el rendimiento ) es envolver las dos columnas en Struct .

Esto probablemente funcionaría también:

df.select(''name, struct(''food, ''price).as("tuple")) .groupBy(''name) .agg(collect_list(''tuple).as("tuples"))