varios valores seleccionar registros registro promedio primer obtener maximos grupo contar campos cada agrupados sql scala apache-spark spark-dataframe dataframe apache-spark-sql

valores - sql contar registros agrupados



¿Cómo seleccionar la primera fila de cada grupo? (9)

Tengo un DataFrame generado de la siguiente manera:

df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc))

Los resultados se ven así:

+----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 0| cat13| 22.1| | 0| cat95| 19.6| | 0| cat105| 1.3| | 1| cat67| 28.5| | 1| cat4| 26.8| | 1| cat13| 12.6| | 1| cat23| 5.3| | 2| cat56| 39.6| | 2| cat40| 29.7| | 2| cat187| 27.9| | 2| cat68| 9.8| | 3| cat8| 35.6| | ...| ....| ....| +----+--------+----------+

Como puede ver, el DataFrame se ordena por Hour en orden creciente, luego por TotalValue en orden descendente.

Me gustaría seleccionar la fila superior de cada grupo, es decir

  • del grupo de Hora == 0 seleccione (0, cat26,30.9)
  • del grupo de Hora == 1 seleccione (1, cat67,28.5)
  • del grupo de Hora == 2 seleccione (2, cat56,39.6)
  • y así

Entonces el resultado deseado sería:

+----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 1| cat67| 28.5| | 2| cat56| 39.6| | 3| cat8| 35.6| | ...| ...| ...| +----+--------+----------+

Puede ser útil poder seleccionar también las N filas superiores de cada grupo.

Cualquier ayuda es muy apreciada.


Aquí puedes hacer así:

val data = df.groupBy("Hour").agg(first("Hour").as("_1"),first("Category").as("Category"),first("TotalValue").as("TotalValue")).drop("Hour") data.withColumnRenamed("_1","Hour").show


El patrón es agrupar por teclas => hacer algo a cada grupo, por ejemplo, reducir => volver al marco de datos

Pensé que la abstracción de Dataframe es un poco engorrosa en este caso, así que utilicé la funcionalidad RDD

val rdd: RDD[Row] = originalDf .rdd .groupBy(row => row.getAs[String]("grouping_row")) .map(iterableTuple => { iterableTuple._2.reduce(reduceFunction) }) val productDf = sqlContext.createDataFrame(rdd, originalDf.schema)


Esto es exactamente igual a la share de pero en forma de consulta SQL.

Suponiendo que el marco de datos se crea y se registra como

df.createOrReplaceTempView("table") //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|0 |cat26 |30.9 | //|0 |cat13 |22.1 | //|0 |cat95 |19.6 | //|0 |cat105 |1.3 | //|1 |cat67 |28.5 | //|1 |cat4 |26.8 | //|1 |cat13 |12.6 | //|1 |cat23 |5.3 | //|2 |cat56 |39.6 | //|2 |cat40 |29.7 | //|2 |cat187 |27.9 | //|2 |cat68 |9.8 | //|3 |cat8 |35.6 | //+----+--------+----------+

Función de ventana:

sqlContext.sql("select Hour, Category, TotalValue from (select *, row_number() OVER (PARTITION BY Hour ORDER BY TotalValue DESC) as rn FROM table) tmp where rn = 1").show(false) //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|1 |cat67 |28.5 | //|3 |cat8 |35.6 | //|2 |cat56 |39.6 | //|0 |cat26 |30.9 | //+----+--------+----------+

Agregación SQL simple seguida de unión:

sqlContext.sql("select Hour, first(Category) as Category, first(TotalValue) as TotalValue from " + "(select Hour, Category, TotalValue from table tmp1 " + "join " + "(select Hour as max_hour, max(TotalValue) as max_value from table group by Hour) tmp2 " + "on " + "tmp1.Hour = tmp2.max_hour and tmp1.TotalValue = tmp2.max_value) tmp3 " + "group by tmp3.Hour") .show(false) //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|1 |cat67 |28.5 | //|3 |cat8 |35.6 | //|2 |cat56 |39.6 | //|0 |cat26 |30.9 | //+----+--------+----------+

Usando pedidos sobre estructuras:

sqlContext.sql("select Hour, vs.Category, vs.TotalValue from (select Hour, max(struct(TotalValue, Category)) as vs from table group by Hour)").show(false) //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|1 |cat67 |28.5 | //|3 |cat8 |35.6 | //|2 |cat56 |39.6 | //|0 |cat26 |30.9 | //+----+--------+----------+

La forma de DataSets y no hacer s son las mismas que en la respuesta original


La solución a continuación solo hace un groupBy y extrae las filas de su marco de datos que contienen el maxValue en una sola toma. No hay necesidad de más Uniones, o Windows.

import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.DataFrame //df is the dataframe with Day, Category, TotalValue implicit val dfEnc = RowEncoder(df.schema) val res: DataFrame = df.groupByKey{(r) => r.getInt(0)}.mapGroups[Row]{(day: Int, rows: Iterator[Row]) => i.maxBy{(r) => r.getDouble(2)}}


Para Spark 2.0.2 con agrupación por múltiples columnas:

import org.apache.spark.sql.functions.row_number import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc) val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")


Podemos usar la función de ventana de rango () donde elegiría el rango = 1) el rango solo agrega un número para cada fila de un grupo (en este caso sería la hora)

Aquí hay un ejemplo. (de https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank )

val dataset = spark.range(9).withColumn("bucket", ''id % 3) import org.apache.spark.sql.expressions.Window val byBucket = Window.partitionBy(''bucket).orderBy(''id) scala> dataset.withColumn("rank", rank over byBucket).show +---+------+----+ | id|bucket|rank| +---+------+----+ | 0| 0| 1| | 3| 0| 2| | 6| 0| 3| | 1| 1| 1| | 4| 1| 2| | 7| 1| 3| | 2| 2| 1| | 5| 2| 2| | 8| 2| 3| +---+------+----+


Si el marco de datos debe agruparse por varias columnas, esto puede ayudar

val keys = List("Hour", "Category"); val selectFirstValueOfNoneGroupedColumns = df.columns .filterNot(keys.toSet) .map(_ -> "first").toMap val grouped = df.groupBy(keys.head, keys.tail: _*) .agg(selectFirstValueOfNoneGroupedColumns)

Espero que esto ayude a alguien con un problema similar


Una buena manera de hacer esto con la API de trama de datos es usar la lógica argmax de esta manera

val df = Seq( (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3), (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3), (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8), (3,"cat8",35.6)).toDF("Hour", "Category", "TotalValue") df.groupBy($"Hour") .agg(max(struct($"TotalValue", $"Category")).as("argmax")) .select($"Hour", $"argmax.*").show +----+----------+--------+ |Hour|TotalValue|Category| +----+----------+--------+ | 1| 28.5| cat67| | 3| 35.6| cat8| | 2| 39.6| cat56| | 0| 30.9| cat26| +----+----------+--------+


Funciones de ventana :

Algo como esto debería hacer el truco:

import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window val df = sc.parallelize(Seq( (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3), (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3), (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8), (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue") val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc) val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+

Este método será ineficiente en caso de sesgo significativo de datos.

Agregación SQL simple seguida de join :

Alternativamente, puede unirse con el marco de datos agregado:

val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(broadcast(dfMax), ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value")) .drop("max_hour") .drop("max_value") dfTopByJoin.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+

Mantendrá valores duplicados (si hay más de una categoría por hora con el mismo valor total). Puede eliminarlos de la siguiente manera:

dfTopByJoin .groupBy($"hour") .agg( first("category").alias("category"), first("TotalValue").alias("TotalValue"))

Usando pedidos sobre structs :

Truco ordenado, aunque no muy bien probado, que no requiere uniones o funciones de ventana:

val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+

Con la API DataSet (Spark 1.6+, 2.0+):

Spark 1.6 :

case class Record(Hour: Integer, Category: String, TotalValue: Double) df.as[Record] .groupBy($"hour") .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) .show // +---+--------------+ // | _1| _2| // +---+--------------+ // |[0]|[0,cat26,30.9]| // |[1]|[1,cat67,28.5]| // |[2]|[2,cat56,39.6]| // |[3]| [3,cat8,35.6]| // +---+--------------+

Spark 2.0 o posterior :

df.as[Record] .groupByKey(_.Hour) .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)

Los dos últimos métodos pueden aprovechar la combinación del lado del mapa y no requieren una combinación aleatoria completa, por lo que la mayoría de las veces debería exhibir un mejor rendimiento en comparación con las funciones y combinaciones de ventanas. Estos bastones también se pueden utilizar con Streaming estructurado en modo de salida completado.

No utilizar :

df.orderBy(...).groupBy(...).agg(first(...), ...)

Puede parecer que funciona (especialmente en el modo local ) pero no es confiable ( SPARK-16207 ). Créditos a Tzach Zohar por vincular un tema relevante de JIRA .

La misma nota se aplica a

df.orderBy(...).dropDuplicates(...)

que utiliza internamente un plan de ejecución equivalente.