scala - Spark DataFrame: ¿groupBy after orderBy mantiene ese orden?
apache-spark apache-spark-sql (6)
Tengo un
example
trama de datos Spark 2.0 con la siguiente estructura:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
Contiene 24 entradas para cada id (una para cada hora del día) y se ordena por id, hora usando la función orderBy.
He creado un Aggregator
groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
Me ayuda a concatenar columnas en cadenas para obtener este marco de datos final:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
Mi pregunta es, si hago
example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
, ¿eso garantiza que el los recuentos por hora se ordenarán correctamente en sus respectivos cubos?
Leí que este no es necesariamente el caso para los RDD (consulte Spark ordenar por clave y luego agrupar por para obtener un orden iterable ), pero ¿tal vez sea diferente para DataFrames?
Si no, ¿cómo puedo solucionarlo?
La respuesta corta es Sí, los recuentos por hora mantendrán el mismo orden.
Para generalizar, es importante que ordene antes de agrupar. Además, la ordenación debe ser la misma que el grupo + la columna para la que realmente desea la ordenación.
Un ejemplo sería como:
employees
.sort("company_id", "department_id", "employee_role")
.groupBy("company_id", "department_id")
.agg(Aggregators.groupConcat(":", 2) as "count_per_role")
No, la ordenación dentro de
groupByKey
no necesariamente se mantendrá, pero esto es notoriamente difícil de reproducir en la memoria en un nodo.
Como se dijo anteriormente, la forma más típica en que esto sucede es cuando las cosas necesitan ser reparticionadas para que el
groupByKey
tenga lugar.
Logré reproducir esto haciendo una
repartition
manual después del
sort
.
Luego pasé los resultados al
groupByKey
.
case class Numbered(num:Int, group:Int, otherData:Int)
// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number
val v =
(1 to 100000)
// Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won''t be monotonic, not sure if needed)
.map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
// Be sure they are stored in a small number of partitions
.repartition(2)
.sort($"num")
// Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
.repartition(200)
.groupByKey(_.group)
.mapGroups {
case (g, nums) =>
nums // all you need is .sortBy(_.num) here to fix the problem
.map(_.num)
.mkString("~")
}
.collect()
// Walk through the concatenated strings. If any number ahead
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map { case (r, i) =>
r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
if (next < prev) {
println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***")
}
next
}
}
Si desea evitar la implementación en Java (Scala y Python deberían ser similares):
example.orderBy(“hour”)
.groupBy(“id”)
.agg(functions.sort_array(
functions.collect_list(
functions.struct(dataRow.col(“hour”),
dataRow.col(“count”))),false)
.as(“hourly_count”));
Tengo un caso en el que el orden no siempre se mantiene: a veces sí, principalmente no.
Mi marco de datos tiene 200 particiones ejecutándose en Spark 1.6
df_group_sort = data.orderBy(times).groupBy(group_key).agg(
F.sort_array(F.collect_list(times)),
F.collect_list(times)
)
para verificar el orden comparo los valores de retorno de
F.sort_array(F.collect_list(times))
y
F.collect_list(times)
dando por ejemplo (left: sort_array (collect_list ()); right: collect_list ())
2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000
La columna izquierda siempre está ordenada, mientras que la columna derecha solo consta de bloques ordenados. Para diferentes ejecuciones de take (), el orden de los bloques en la columna derecha es diferente.
el orden puede o no ser el mismo, dependiendo del número de particiones y la distribución de datos. Podemos resolver usando rdd mismo.
Por ejemplo::
Guardé los datos de muestra a continuación en un archivo y los cargué en hdfs.
1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800
y ejecuté el siguiente comando:
sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()
salida:
Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))
Es decir, agrupamos los datos por tipo, luego los ordenamos por precio y concatenamos los identificadores con "~" como separador. El comando anterior se puede romper de la siguiente manera:
val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)
val groupedData=validData.groupBy(_(1)) //group data rdds
val sortedJoinedData=groupedData.mapValues(x=>{
val list=x.toList
val sortedList=list.sortBy(_(2))
val idOnlyList=sortedList.map(_(0))
idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()
entonces podemos tomar un grupo particular usando el comando
sortedJoinedData.filter(_._1=="type1").collect()
salida:
Array[(String, String)] = Array((type1,2~1~5))
groupBy after orderBy no mantiene el orden, como han señalado otros. Lo que desea hacer es utilizar una función de Windows: particionar en id y ordenar por horas. Puede recopilar_list sobre esto y luego tomar el máximo (más grande) de las listas resultantes, ya que se acumulan (es decir, la primera hora solo se incluirá en la lista, la segunda hora tendrá 2 elementos en la lista, y así sucesivamente).
Código de ejemplo completo:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val data = Seq(( "id1", 0, 12),
("id1", 1, 55),
("id1", 23, 44),
("id2", 0, 12),
("id2", 1, 89),
("id2", 23, 34)).toDF("id", "hour", "count")
val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
data.withColumn("collected", collect_list($"count")
.over(Window.partitionBy("id")
.orderBy("hour")))
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count").show
Esto nos mantiene dentro del mundo DataFrame. También simplifiqué el código UDF que estaba utilizando el OP.
Salida:
+---+------------+
| id|hourly_count|
+---+------------+
|id1| 12:55:44|
|id2| 12:89:34|
+---+------------+