scala - org - Rolling your own reduceByKey en Spark Dataset
spark apache org download (2)
Supongo que su objetivo es traducir este idioma a los conjuntos de datos:
rdd.map(x => (x.someKey, x.someField))
.reduceByKey(_ + _)
// => returning an RDD of (KeyType, FieldType)
Actualmente, la solución más cercana que he encontrado con la API de conjunto de datos se ve así:
ds.map(x => (x.someKey, x.someField)) // [1]
.groupByKey(_._1)
.reduceGroups((a, b) => (a._1, a._2 + b._2))
.map(_._2) // [2]
// => returning a Dataset of (KeyType, FieldType)
// Comments:
// [1] As far as I can see, having a map before groupByKey is required
// to end up with the proper type in reduceGroups. After all, we do
// not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
// not knowing that our V''s are already key-value pairs.
No parece muy elegante y, según un punto de referencia rápido, también tiene un rendimiento mucho menor, por lo que tal vez nos falte algo ...
Nota: una alternativa podría ser usar groupByKey(_.someKey)
como primer paso. El problema es que el uso de groupByKey
cambia el tipo de un Dataset
de Dataset
normal a un KeyValueGroupedDataset
. Este último no tiene una función de map
regular. En su lugar, ofrece un mapGroups
, que no parece muy conveniente porque envuelve los valores en un Iterator
y realiza una orden aleatoria de acuerdo con la cadena de documentación.
Estoy tratando de aprender a usar DataFrames y DataSets más además de los RDD. Para un RDD, sé que puedo hacer someRDD.reduceByKey((x,y) => x + y)
, pero no veo esa función para Dataset. Así que decidí escribir uno.
someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
val result = mutable.HashMap.empty[(Long,Long),Int]
val keys = mutable.HashSet.empty[(Long,Long)]
y.keys.foreach(z => keys += z)
x.keys.foreach(z => keys += z)
for (elem <- keys) {
val s1 = if(x.contains(elem)) x(elem) else 0
val s2 = if(y.contains(elem)) y(elem) else 0
result(elem) = s1 + s2
}
result
})
Sin embargo, esto lo devuelve todo al conductor. ¿Cómo escribirías esto para devolver un Dataset
? Tal vez mapPartition y hacerlo allí?
Tenga en cuenta que esto se compila pero no se ejecuta porque todavía no tiene codificadores para Map
Una solución más eficiente usa mapPartitions
antes de groupByKey
para reducir la cantidad de barajadas (tenga en cuenta que no es exactamente la misma firma que reduceByKey
pero creo que es más flexible pasar una función que requerir que el conjunto de datos conste de una tupla).
def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V)
(implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = {
def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = {
iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator
}
ds.mapPartitions(h(f, g, _))
.groupByKey(f)(encK)
.reduceGroups(g)
}
Dependiendo de la forma / tamaño de sus datos, esto está dentro de 1 segundo del rendimiento de reduceByKey
, y aproximadamente 2x
veces más rápido que un groupByKey(_._1).reduceGroups
. Todavía hay espacio para mejoras, por lo que las sugerencias serían bienvenidas.