scala - una - Agregación de múltiples columnas con función personalizada en chispa.
propiedad formula vba (4)
Me preguntaba si hay alguna manera de especificar una función de agregación personalizada para los marcos de datos de chispa en varias columnas.
Tengo una tabla como esta del tipo (nombre, artículo, precio):
john | tomato | 1.99
john | carrot | 0.45
bill | apple | 0.99
john | banana | 1.29
bill | taco | 2.59
a:
Me gustaría agregar el artículo y su costo para cada persona en una lista como esta:
john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)
¿Es esto posible en marcos de datos? Recientemente aprendí sobre collect_list
pero parece que solo funciona para una columna.
Aquí hay una opción al convertir el marco de datos en un RDD de Mapa y luego llamar a un groupByKey
en él. El resultado sería una lista de pares clave-valor donde valor es una lista de tuplas.
df.show
+----+------+----+
| _1| _2| _3|
+----+------+----+
|john|tomato|1.99|
|john|carrot|0.45|
|bill| apple|0.99|
|john|banana|1.29|
|bill| taco|2.59|
+----+------+----+
val tuples = df.map(row => row(0) -> (row(1), row(2)))
tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43
tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect
res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))
Considere utilizar la función struct
para agrupar las columnas antes de recopilarlas como una lista:
import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.groupBy($"name")
.agg(collect_list(struct($"food", $"price")).as("foods"))
.show(false)
Salidas:
+----+---------------------------------------------+
|name|foods |
+----+---------------------------------------------+
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]] |
+----+---------------------------------------------+
La forma más fácil de hacer esto como un DataFrame
es recolectar primero dos listas y luego usar un UDF
para zip
las dos listas juntas. Algo como:
import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._
val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
val df2 = df.groupBy("name").agg(
collect_list(col("food")) as "food",
collect_list(col("price")) as "price"
).withColumn("food", zipper(col("food"), col("price"))).drop("price")
df2.show(false)
# +----+---------------------------------------------+
# |name|food |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]] |
# +----+---------------------------------------------+
Quizás una forma mejor que la función zip
( ya que UDF y UDAF son muy malas para el rendimiento ) es envolver las dos columnas en Struct
.
Esto probablemente funcionaría también:
df.select(''name, struct(''food, ''price).as("tuple"))
.groupBy(''name)
.agg(collect_list(''tuple).as("tuples"))