scala - ¿Es posible una implementación más rápida para reduceByKey en Seq de pares?
mapreduce benchmarking (1)
El siguiente código contiene varias implementaciones de reduceByKeyXXX
métodos reduceByKeyXXX
y algunos métodos auxiliares para crear conjuntos de entrada y medir tiempos de ejecución. (Siéntase libre de ejecutar el método main
)
El objetivo principal de reduceByKey
(como en Spark) es reducir los pares clave-valor con la misma clave. Ejemplo:
scala> val xs = Seq( "a" -> 2, "b" -> 3, "a" -> 5)
xs: Seq[(String, Int)] = List((a,2), (b,3), (a,5))
scala> ReduceByKeyComparison.reduceByKey(xs, (x:Int, y:Int) ⇒ x+y )
res8: Seq[(String, Int)] = ArrayBuffer((b,3), (a,7))
Código
import java.util.HashMap
object Util {
def measure( body : => Unit ) : Long = {
val now = System.currentTimeMillis
body
val nowAfter = System.currentTimeMillis
nowAfter - now
}
def measureMultiple( body: => Unit, n: Int) : String = {
val executionTimes = (1 to n).toList.map( x => {
print(".")
measure(body)
} )
val avg = executionTimes.sum / executionTimes.size
executionTimes.mkString("", "ms, ", "ms") + s" Average: ${avg}ms."
}
}
object RandomUtil {
val AB = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
val r = new java.util.Random();
def randomString( len: Int ) : String = {
val sb = new StringBuilder( len );
for( i <- 0 to len-1 ) {
sb.append(AB.charAt(r.nextInt(AB.length())));
}
sb.toString();
}
def generateSeq(n: Int) : Seq[(String, Int)] = {
Seq.fill(n)( (randomString(2), r.nextInt(100)) )
}
}
object ReduceByKeyComparison {
def main(args: Array[String]) : Unit = {
implicit def iterableToPairedIterable[K, V](x: Iterable[(K, V)]) = { new PairedIterable(x) }
val runs = 10
val problemSize = 2000000
val ss = RandomUtil.generateSeq(problemSize)
println("ReduceByKey : " + Util.measureMultiple( reduceByKey(ss, (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKey2: " + Util.measureMultiple( reduceByKey2(ss, (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKey3: " + Util.measureMultiple( reduceByKey3(ss, (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKeyPaired: " + Util.measureMultiple( ss.reduceByKey( (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKeyA: " + Util.measureMultiple( reduceByKeyA( ss, (x:Int, y:Int) ⇒ x+y ), runs ))
}
// =============================================================================
// Different implementations
// =============================================================================
def reduceByKey[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = {
val t = s.groupBy(x => x._1)
val u = t.map { case (k,v) => (k, v.map(_._2).reduce(fnc))}
u.toSeq
}
def reduceByKey2[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = {
val r = s.foldLeft( Map[A,B]() ){ (m,a) ⇒
val k = a._1
val v = a._2
m.get(k) match {
case Some(pv) ⇒ m + ((k, fnc(pv, v)))
case None ⇒ m + ((k, v))
}
}
r.toSeq
}
def reduceByKey3[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = {
var m = scala.collection.mutable.Map[A,B]()
s.foreach{ e ⇒
val k = e._1
val v = e._2
m.get(k) match {
case Some(pv) ⇒ m(k) = fnc(pv, v)
case None ⇒ m(k) = v
}
}
m.toSeq
}
/**
* Method code from [[http://ideone.com/dyrkYM]]
* All rights to Muhammad-Ali A''rabi according to [[https://issues.scala-lang.org/browse/SI-9064]]
*/
def reduceByKeyA[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B): Map[A, B] = {
s.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce( fnc )))
}
/**
* Method code from [[http://ideone.com/dyrkYM]]
* All rights to Muhammad-Ali A''rabi according to [[https://issues.scala-lang.org/browse/SI-9064]]
*/
class PairedIterable[K, V](x: Iterable[(K, V)]) {
def reduceByKey(func: (V,V) => V) = {
val map = new HashMap[K, V]
x.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
map
}
}
}
produciendo los siguientes resultados en mi máquina
..........ReduceByKey : 723ms, 782ms, 761ms, 617ms, 640ms, 707ms, 634ms, 611ms, 380ms, 458ms Average: 631ms.
..........ReduceByKey2: 580ms, 458ms, 452ms, 463ms, 462ms, 470ms, 463ms, 465ms, 458ms, 462ms Average: 473ms.
..........ReduceByKey3: 489ms, 466ms, 461ms, 468ms, 555ms, 474ms, 469ms, 457ms, 461ms, 468ms Average: 476ms.
..........ReduceByKeyPaired: 140ms, 124ms, 124ms, 120ms, 122ms, 124ms, 118ms, 126ms, 121ms, 119ms Average: 123ms.
..........ReduceByKeyA: 628ms, 694ms, 666ms, 656ms, 616ms, 660ms, 594ms, 659ms, 445ms, 399ms Average: 601ms.
y ReduceByKeyPaired actualmente es el más rápido.
Pregunta / Tarea
¿Hay una implementación más rápida de un solo subproceso (Scala)?
Reescribiendo el método reduceByKey
de PairedIterable
a la recursión da alrededor de 5-10% de mejora en el rendimiento. Eso todo lo que pude conseguir. También intenté aumentar la asignación de capacidad inicial para HashMap, pero no muestra ningún cambio significativo.
class PairedIterable[K, V](x: Iterable[(K, V)]) {
def reduceByKey(func: (V,V) => V) = {
val map = new HashMap[K, V]()
@tailrec
def reduce(it: Iterable[(K, V)]): HashMap[K, V] = {
it match {
case Nil => map
case (k, v) :: tail =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
reduce(tail)
}
}
val r = reduce(x)
r
}
}
En general, haciendo algunos análisis de comparación de los métodos proporcionados, se pueden dividir en dos categorías.
El primer conjunto de reducciones está relacionado con la clasificación (agrupación), ya que podemos ver que esos métodos agregan complejidad extra
O(n*log[n])
y no son efectivos para este escenario.Los segundos son con bucle lineal a través de todos los enries de
Iterable
. Esos conjuntos de métodos tienen operaciones extra get / put para el mapa temporal. Pero esos puts / puts no consumen tanto tiempo -O(n)*O(c)
. Además, la necesidad de trabajar conOptions
en las colecciones de scala lo hace menos efectivo.