tutorial spark org examples example ejemplo scala apache-spark mapreduce

scala - org - Rolling your own reduceByKey en Spark Dataset



spark apache org download (2)

Supongo que su objetivo es traducir este idioma a los conjuntos de datos:

rdd.map(x => (x.someKey, x.someField)) .reduceByKey(_ + _) // => returning an RDD of (KeyType, FieldType)

Actualmente, la solución más cercana que he encontrado con la API de conjunto de datos se ve así:

ds.map(x => (x.someKey, x.someField)) // [1] .groupByKey(_._1) .reduceGroups((a, b) => (a._1, a._2 + b._2)) .map(_._2) // [2] // => returning a Dataset of (KeyType, FieldType) // Comments: // [1] As far as I can see, having a map before groupByKey is required // to end up with the proper type in reduceGroups. After all, we do // not want to reduce over the original type, but the FieldType. // [2] required since reduceGroups converts back to Dataset[(K, V)] // not knowing that our V''s are already key-value pairs.

No parece muy elegante y, según un punto de referencia rápido, también tiene un rendimiento mucho menor, por lo que tal vez nos falte algo ...

Nota: una alternativa podría ser usar groupByKey(_.someKey) como primer paso. El problema es que el uso de groupByKey cambia el tipo de un Dataset de Dataset normal a un KeyValueGroupedDataset . Este último no tiene una función de map regular. En su lugar, ofrece un mapGroups , que no parece muy conveniente porque envuelve los valores en un Iterator y realiza una orden aleatoria de acuerdo con la cadena de documentación.

Estoy tratando de aprender a usar DataFrames y DataSets más además de los RDD. Para un RDD, sé que puedo hacer someRDD.reduceByKey((x,y) => x + y) , pero no veo esa función para Dataset. Así que decidí escribir uno.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => { val result = mutable.HashMap.empty[(Long,Long),Int] val keys = mutable.HashSet.empty[(Long,Long)] y.keys.foreach(z => keys += z) x.keys.foreach(z => keys += z) for (elem <- keys) { val s1 = if(x.contains(elem)) x(elem) else 0 val s2 = if(y.contains(elem)) y(elem) else 0 result(elem) = s1 + s2 } result })

Sin embargo, esto lo devuelve todo al conductor. ¿Cómo escribirías esto para devolver un Dataset ? Tal vez mapPartition y hacerlo allí?

Tenga en cuenta que esto se compila pero no se ejecuta porque todavía no tiene codificadores para Map


Una solución más eficiente usa mapPartitions antes de groupByKey para reducir la cantidad de barajadas (tenga en cuenta que no es exactamente la misma firma que reduceByKey pero creo que es más flexible pasar una función que requerir que el conjunto de datos conste de una tupla).

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V) (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = { def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = { iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator } ds.mapPartitions(h(f, g, _)) .groupByKey(f)(encK) .reduceGroups(g) }

Dependiendo de la forma / tamaño de sus datos, esto está dentro de 1 segundo del rendimiento de reduceByKey , y aproximadamente 2x veces más rápido que un groupByKey(_._1).reduceGroups . Todavía hay espacio para mejoras, por lo que las sugerencias serían bienvenidas.