tutorial spark read example español create apache-spark dataframe apache-spark-sql

apache spark - spark - Múltiples operaciones agregadas en la misma columna de un marco de datos de chispa



spark sql example (3)

Para aquellos que se preguntan, cómo se puede escribir la respuesta @ zero323 sin una comprensión de la lista en python:

from pyspark.sql.functions import min, max, col # init your spark dataframe expr = [min(col("valueName")),max(col("valueName"))] df.groupBy("keyName").agg(*expr)

Tengo tres matrices de tipo de cadena que contienen la siguiente información:

  • matriz groupBy: contiene los nombres de las columnas por las que quiero agrupar mis datos.
  • matriz agregada: contiene nombres de columnas que quiero agregar.
  • matriz de operaciones: que contiene las operaciones agregadas que quiero realizar

Estoy tratando de usar marcos de datos de chispa para lograr esto. Los marcos de datos de Spark proporcionan un agg () donde puede pasar un Map [String, String] (del nombre de columna y la operación de agregado respectiva) como entrada, sin embargo, quiero realizar diferentes operaciones de agregación en la misma columna de datos. ¿Alguna sugerencia sobre cómo lograr esto?


Scala :

Por ejemplo, puede asignar una lista de funciones con una mapping definida de nombre a función:

import org.apache.spark.sql.functions.{col, min, max, mean} import org.apache.spark.sql.Column val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v") val mapping: Map[String, Column => Column] = Map( "min" -> min, "max" -> max, "mean" -> avg) val groupBy = Seq("k") val aggregate = Seq("v") val operations = Seq("min", "max", "mean") val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c)))) df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show // +---+------+------+------+ // | k|min(v)|max(v)|avg(v)| // +---+------+------+------+ // | 1| 3.0| 3.0| 3.0| // | 2| -5.0| -5.0| -5.0| // +---+------+------+------+

o

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show

Desafortunadamente, el analizador que se usa internamente SQLContext no se expone públicamente, pero siempre puede intentar crear consultas SQL simples:

df.registerTempTable("df") val groupExprs = groupBy.mkString(",") val aggExprs = aggregate.flatMap(c => operations.map( f => s"$f($c) AS ${c}_${f}") ).mkString(",") sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")

Python :

from pyspark.sql.functions import mean, sum, max, col df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"]) groupBy = ["k"] aggregate = ["v"] funs = [mean, sum, max] exprs = [f(col(c)) for f in funs for c in aggregate] # or equivalent df.groupby(groupBy).agg(*exprs) df.groupby(*groupBy).agg(*exprs)

Ver también:

  • Spark SQL: aplica funciones agregadas a una lista de columnas

case class soExample(firstName: String, lastName: String, Amount: Int) val df = Seq(soExample("me", "zack", 100)).toDF import org.apache.spark.sql.functions._ val groupped = df.groupBy("firstName", "lastName").agg( sum("Amount"), mean("Amount"), stddev("Amount"), count(lit(1)).alias("numOfRecords") ).toDF() display(groupped)

// Cortesía de Zach ..

Respuesta simplificada de Zach para un marco de datos de Spala Scala duplicado marcado para tener una agregación múltiple de un solo grupo por