apache spark - spark - Múltiples operaciones agregadas en la misma columna de un marco de datos de chispa
spark sql example (3)
Para aquellos que se preguntan, cómo se puede escribir la respuesta @ zero323 sin una comprensión de la lista en python:
from pyspark.sql.functions import min, max, col
# init your spark dataframe
expr = [min(col("valueName")),max(col("valueName"))]
df.groupBy("keyName").agg(*expr)
Tengo tres matrices de tipo de cadena que contienen la siguiente información:
- matriz groupBy: contiene los nombres de las columnas por las que quiero agrupar mis datos.
- matriz agregada: contiene nombres de columnas que quiero agregar.
- matriz de operaciones: que contiene las operaciones agregadas que quiero realizar
Estoy tratando de usar marcos de datos de chispa para lograr esto. Los marcos de datos de Spark proporcionan un agg () donde puede pasar un Map [String, String] (del nombre de columna y la operación de agregado respectiva) como entrada, sin embargo, quiero realizar diferentes operaciones de agregación en la misma columna de datos. ¿Alguna sugerencia sobre cómo lograr esto?
Scala :
Por ejemplo, puede asignar una lista de funciones con una
mapping
definida de nombre a función:
import org.apache.spark.sql.functions.{col, min, max, mean}
import org.apache.spark.sql.Column
val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v")
val mapping: Map[String, Column => Column] = Map(
"min" -> min, "max" -> max, "mean" -> avg)
val groupBy = Seq("k")
val aggregate = Seq("v")
val operations = Seq("min", "max", "mean")
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))
df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show
// +---+------+------+------+
// | k|min(v)|max(v)|avg(v)|
// +---+------+------+------+
// | 1| 3.0| 3.0| 3.0|
// | 2| -5.0| -5.0| -5.0|
// +---+------+------+------+
o
df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show
Desafortunadamente, el analizador que se usa internamente
SQLContext
no se expone públicamente, pero siempre puede intentar crear consultas SQL simples:
df.registerTempTable("df")
val groupExprs = groupBy.mkString(",")
val aggExprs = aggregate.flatMap(c => operations.map(
f => s"$f($c) AS ${c}_${f}")
).mkString(",")
sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")
Python :
from pyspark.sql.functions import mean, sum, max, col
df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"])
groupBy = ["k"]
aggregate = ["v"]
funs = [mean, sum, max]
exprs = [f(col(c)) for f in funs for c in aggregate]
# or equivalent df.groupby(groupBy).agg(*exprs)
df.groupby(*groupBy).agg(*exprs)
Ver también:
- Spark SQL: aplica funciones agregadas a una lista de columnas
case class soExample(firstName: String, lastName: String, Amount: Int)
val df = Seq(soExample("me", "zack", 100)).toDF
import org.apache.spark.sql.functions._
val groupped = df.groupBy("firstName", "lastName").agg(
sum("Amount"),
mean("Amount"),
stddev("Amount"),
count(lit(1)).alias("numOfRecords")
).toDF()
display(groupped)
// Cortesía de Zach ..
Respuesta simplificada de Zach para un marco de datos de Spala Scala duplicado marcado para tener una agregación múltiple de un solo grupo por