scala apache-spark apache-spark-sql spark-streaming spark-dataframe

scala - Spark DataFrame: ¿groupBy after orderBy mantiene ese orden?



apache-spark apache-spark-sql (6)

Tengo un example trama de datos Spark 2.0 con la siguiente estructura:

id, hour, count id1, 0, 12 id1, 1, 55 .. id1, 23, 44 id2, 0, 12 id2, 1, 89 .. id2, 23, 34 etc.

Contiene 24 entradas para cada id (una para cada hora del día) y se ordena por id, hora usando la función orderBy.

He creado un Aggregator groupConcat :

def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable { override def zero: String = "" override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat) override def merge(b1: String, b2: String) = b1 + b2 override def finish(b: String) = b.substring(1) override def bufferEncoder: Encoder[String] = Encoders.STRING override def outputEncoder: Encoder[String] = Encoders.STRING }.toColumn

Me ayuda a concatenar columnas en cadenas para obtener este marco de datos final:

id, hourly_count id1, 12:55:..:44 id2, 12:89:..:34 etc.

Mi pregunta es, si hago example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count") , ¿eso garantiza que el los recuentos por hora se ordenarán correctamente en sus respectivos cubos?

Leí que este no es necesariamente el caso para los RDD (consulte Spark ordenar por clave y luego agrupar por para obtener un orden iterable ), pero ¿tal vez sea diferente para DataFrames?

Si no, ¿cómo puedo solucionarlo?


La respuesta corta es Sí, los recuentos por hora mantendrán el mismo orden.

Para generalizar, es importante que ordene antes de agrupar. Además, la ordenación debe ser la misma que el grupo + la columna para la que realmente desea la ordenación.

Un ejemplo sería como:

employees .sort("company_id", "department_id", "employee_role") .groupBy("company_id", "department_id") .agg(Aggregators.groupConcat(":", 2) as "count_per_role")


No, la ordenación dentro de groupByKey no necesariamente se mantendrá, pero esto es notoriamente difícil de reproducir en la memoria en un nodo. Como se dijo anteriormente, la forma más típica en que esto sucede es cuando las cosas necesitan ser reparticionadas para que el groupByKey tenga lugar. Logré reproducir esto haciendo una repartition manual después del sort . Luego pasé los resultados al groupByKey .

case class Numbered(num:Int, group:Int, otherData:Int) // configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number val v = (1 to 100000) // Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won''t be monotonic, not sure if needed) .map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS() // Be sure they are stored in a small number of partitions .repartition(2) .sort($"num") // Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order. .repartition(200) .groupByKey(_.group) .mapGroups { case (g, nums) => nums // all you need is .sortBy(_.num) here to fix the problem .map(_.num) .mkString("~") } .collect() // Walk through the concatenated strings. If any number ahead // is smaller than the number before it, you know that something // is out of order. v.zipWithIndex.map { case (r, i) => r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) => if (next < prev) { println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***") } next } }


Si desea evitar la implementación en Java (Scala y Python deberían ser similares):

example.orderBy(“hour”) .groupBy(“id”) .agg(functions.sort_array( functions.collect_list( functions.struct(dataRow.col(“hour”), dataRow.col(“count”))),false) .as(“hourly_count”));


Tengo un caso en el que el orden no siempre se mantiene: a veces sí, principalmente no.

Mi marco de datos tiene 200 particiones ejecutándose en Spark 1.6

df_group_sort = data.orderBy(times).groupBy(group_key).agg( F.sort_array(F.collect_list(times)), F.collect_list(times) )

para verificar el orden comparo los valores de retorno de

F.sort_array(F.collect_list(times))

y

F.collect_list(times)

dando por ejemplo (left: sort_array (collect_list ()); right: collect_list ())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000 2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000 2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000 2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000 2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000 2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000 2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000 2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000 2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000 2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000 2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000 2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000 2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000 2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000

La columna izquierda siempre está ordenada, mientras que la columna derecha solo consta de bloques ordenados. Para diferentes ejecuciones de take (), el orden de los bloques en la columna derecha es diferente.


el orden puede o no ser el mismo, dependiendo del número de particiones y la distribución de datos. Podemos resolver usando rdd mismo.

Por ejemplo::

Guardé los datos de muestra a continuación en un archivo y los cargué en hdfs.

1,type1,300 2,type1,100 3,type2,400 4,type2,500 5,type1,400 6,type3,560 7,type2,200 8,type3,800

y ejecuté el siguiente comando:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()

salida:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))

Es decir, agrupamos los datos por tipo, luego los ordenamos por precio y concatenamos los identificadores con "~" como separador. El comando anterior se puede romper de la siguiente manera:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3) val groupedData=validData.groupBy(_(1)) //group data rdds val sortedJoinedData=groupedData.mapValues(x=>{ val list=x.toList val sortedList=list.sortBy(_(2)) val idOnlyList=sortedList.map(_(0)) idOnlyList.mkString("~") } ) sortedJoinedData.collect()

entonces podemos tomar un grupo particular usando el comando

sortedJoinedData.filter(_._1=="type1").collect()

salida:

Array[(String, String)] = Array((type1,2~1~5))


groupBy after orderBy no mantiene el orden, como han señalado otros. Lo que desea hacer es utilizar una función de Windows: particionar en id y ordenar por horas. Puede recopilar_list sobre esto y luego tomar el máximo (más grande) de las listas resultantes, ya que se acumulan (es decir, la primera hora solo se incluirá en la lista, la segunda hora tendrá 2 elementos en la lista, y así sucesivamente).

Código de ejemplo completo:

import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window import spark.implicits._ val data = Seq(( "id1", 0, 12), ("id1", 1, 55), ("id1", 23, 44), ("id2", 0, 12), ("id2", 1, 89), ("id2", 23, 34)).toDF("id", "hour", "count") val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")} data.withColumn("collected", collect_list($"count") .over(Window.partitionBy("id") .orderBy("hour"))) .groupBy("id") .agg(max($"collected").as("collected")) .withColumn("hourly_count", mergeList($"collected")) .select("id", "hourly_count").show

Esto nos mantiene dentro del mundo DataFrame. También simplifiqué el código UDF que estaba utilizando el OP.

Salida:

+---+------------+ | id|hourly_count| +---+------------+ |id1| 12:55:44| |id2| 12:89:34| +---+------------+