python apache-spark pyspark

python - collect_list conservando el orden basado en otra variable



apache-spark pyspark (4)

La pregunta era para PySpark, pero podría ser útil tenerla también para Scala Spark.

Preparemos el marco de datos de prueba:

import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.expressions.{ Window, UserDefinedFunction} import java.sql.Date import java.time.LocalDate val spark: SparkSession = ... // Out test data set val data: Seq[(Int, Date, Int)] = Seq( (1, Date.valueOf(LocalDate.parse("2014-01-03")), 10), (1, Date.valueOf(LocalDate.parse("2014-01-04")), 5), (1, Date.valueOf(LocalDate.parse("2014-01-05")), 15), (1, Date.valueOf(LocalDate.parse("2014-01-06")), 20), (2, Date.valueOf(LocalDate.parse("2014-02-10")), 100), (2, Date.valueOf(LocalDate.parse("2014-02-11")), 500), (2, Date.valueOf(LocalDate.parse("2014-02-15")), 1500) ) // Create dataframe val df: DataFrame = spark.createDataFrame(data) .toDF("id", "date", "value") df.show() //+---+----------+-----+ //| id| date|value| //+---+----------+-----+ //| 1|2014-01-03| 10| //| 1|2014-01-04| 5| //| 1|2014-01-05| 15| //| 1|2014-01-06| 20| //| 2|2014-02-10| 100| //| 2|2014-02-11| 500| //| 2|2014-02-15| 1500| //+---+----------+-----+

Use UDF

// Group by id and aggregate date and value to new column date_value val grouped = df.groupBy(col("id")) .agg(collect_list(struct("date", "value")) as "date_value") grouped.show() grouped.printSchema() // +---+--------------------+ // | id| date_value| // +---+--------------------+ // | 1|[[2014-01-03,10],...| // | 2|[[2014-02-10,100]...| // +---+--------------------+ // udf to extract data from Row, sort by needed column (date) and return value val sortUdf: UserDefinedFunction = udf((rows: Seq[Row]) => { rows.map { case Row(date: Date, value: Int) => (date, value) } .sortBy { case (date, value) => date } .map { case (date, value) => value } }) // Select id and value_list val r1 = grouped.select(col("id"), sortUdf(col("date_value")).alias("value_list")) r1.show() // +---+----------------+ // | id| value_list| // +---+----------------+ // | 1| [10, 5, 15, 20]| // | 2|[100, 500, 1500]| // +---+----------------+

Usar ventana

val window = Window.partitionBy(col("id")).orderBy(col("date")) val sortedDf = df.withColumn("values_sorted_by_date", collect_list("value").over(window)) sortedDf.show() //+---+----------+-----+---------------------+ //| id| date|value|values_sorted_by_date| //+---+----------+-----+---------------------+ //| 1|2014-01-03| 10| [10]| //| 1|2014-01-04| 5| [10, 5]| //| 1|2014-01-05| 15| [10, 5, 15]| //| 1|2014-01-06| 20| [10, 5, 15, 20]| //| 2|2014-02-10| 100| [100]| //| 2|2014-02-11| 500| [100, 500]| //| 2|2014-02-15| 1500| [100, 500, 1500]| //+---+----------+-----+---------------------+ val r2 = sortedDf.groupBy(col("id")) .agg(max("values_sorted_by_date").as("value_list")) r2.show() //+---+----------------+ //| id| value_list| //+---+----------------+ //| 1| [10, 5, 15, 20]| //| 2|[100, 500, 1500]| //+---+----------------+

Estoy tratando de crear una nueva columna de listas en Pyspark usando una agregación groupby en un conjunto de columnas existente. A continuación se proporciona un marco de datos de entrada de ejemplo:

------------------------ id | date | value ------------------------ 1 |2014-01-03 | 10 1 |2014-01-04 | 5 1 |2014-01-05 | 15 1 |2014-01-06 | 20 2 |2014-02-10 | 100 2 |2014-03-11 | 500 2 |2014-04-15 | 1500

El resultado esperado es:

id | value_list ------------------------ 1 | [10, 5, 15, 20] 2 | [100, 500, 1500]

Los valores dentro de una lista se ordenan por fecha.

Intenté usar collect_list de la siguiente manera:

from pyspark.sql import functions as F ordered_df = input_df.orderBy([''id'',''date''],ascending = True) grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))

Pero collect_list no garantiza el orden, incluso si clasifico el marco de datos de entrada por fecha antes de la agregación.

¿Podría alguien ayudarme a hacer la agregación preservando el orden en función de una segunda variable (fecha)?


Para asegurarnos de que el ordenamiento se realiza para cada id, podemos usar sortWithinPartitions:

from pyspark.sql import functions as F ordered_df = ( input_df .repartition(input_df.id) .sortWithinPartitions([''date'']) ) grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))


Si recopila fechas y valores como una lista, puede ordenar la columna resultante según la fecha usando y udf , y luego mantener solo los valores en el resultado.

import operator import pyspark.sql.functions as F # create list column grouped_df = input_df.groupby("id") / .agg(F.collect_list(F.struct("date", "value")) / .alias("list_col")) # define udf def sorter(l): res = sorted(l, key=operator.itemgetter(0)) return [item[1] for item in res] sort_udf = F.udf(sorter) # test grouped_df.select("id", sort_udf("list_col") / .alias("sorted_list")) / .show(truncate = False) +---+----------------+ |id |sorted_list | +---+----------------+ |1 |[10, 5, 15, 20] | |2 |[100, 500, 1500]| +---+----------------+


from pyspark.sql import functions as F from pyspark.sql import Window w = Window.partitionBy(''id'').orderBy(''date'') sorted_list_df = input_df.withColumn( ''sorted_list'', F.collect_list(''value'').over(w) )/ .groupBy(''id'')/ .agg(F.max(''sorted_list'').alias(''sorted_list''))

Window ejemplos de Window proporcionados por los usuarios a menudo no explican realmente lo que está sucediendo, así que permítanme analizarlo por usted.

Como sabe, el uso de collect_list junto con groupBy dará como resultado una lista de valores desordenada . Esto se debe a que dependiendo de cómo se particionen sus datos, Spark agregará valores a su lista tan pronto como encuentre una fila en el grupo. El orden entonces depende de cómo Spark planifique su agregación sobre los ejecutores.

Una función de Window permite controlar esa situación, agrupando filas por un cierto valor para que pueda realizar una operación over cada uno de los grupos resultantes:

w = Window.partitionBy(''id'').orderBy(''date'')

  • partitionBy : desea grupos / particiones de filas con la misma id
  • orderBy : desea que cada fila del grupo se ordene por date

Una vez que haya definido el alcance de su ventana - "filas con la misma id , ordenadas por date " -, puede usarla para realizar una operación sobre ella, en este caso, un collect_list :

F.collect_list(''value'').over(w)

En este punto, creó una nueva columna sorted_list con una lista ordenada de valores, ordenada por fecha, pero aún tiene filas duplicadas por id . Para recortar las filas duplicadas que desea groupBy id y mantener el valor max para cada grupo:

.groupBy(''id'')/ .agg(F.max(''sorted_list'').alias(''sorted_list''))