scala mapreduce benchmarking key-value reduce

scala - ¿Es posible una implementación más rápida para reduceByKey en Seq de pares?



mapreduce benchmarking (1)

El siguiente código contiene varias implementaciones de reduceByKeyXXX métodos reduceByKeyXXX y algunos métodos auxiliares para crear conjuntos de entrada y medir tiempos de ejecución. (Siéntase libre de ejecutar el método main )

El objetivo principal de reduceByKey (como en Spark) es reducir los pares clave-valor con la misma clave. Ejemplo:

scala> val xs = Seq( "a" -> 2, "b" -> 3, "a" -> 5) xs: Seq[(String, Int)] = List((a,2), (b,3), (a,5)) scala> ReduceByKeyComparison.reduceByKey(xs, (x:Int, y:Int) ⇒ x+y ) res8: Seq[(String, Int)] = ArrayBuffer((b,3), (a,7))

Código

import java.util.HashMap object Util { def measure( body : => Unit ) : Long = { val now = System.currentTimeMillis body val nowAfter = System.currentTimeMillis nowAfter - now } def measureMultiple( body: => Unit, n: Int) : String = { val executionTimes = (1 to n).toList.map( x => { print(".") measure(body) } ) val avg = executionTimes.sum / executionTimes.size executionTimes.mkString("", "ms, ", "ms") + s" Average: ${avg}ms." } } object RandomUtil { val AB = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; val r = new java.util.Random(); def randomString( len: Int ) : String = { val sb = new StringBuilder( len ); for( i <- 0 to len-1 ) { sb.append(AB.charAt(r.nextInt(AB.length()))); } sb.toString(); } def generateSeq(n: Int) : Seq[(String, Int)] = { Seq.fill(n)( (randomString(2), r.nextInt(100)) ) } } object ReduceByKeyComparison { def main(args: Array[String]) : Unit = { implicit def iterableToPairedIterable[K, V](x: Iterable[(K, V)]) = { new PairedIterable(x) } val runs = 10 val problemSize = 2000000 val ss = RandomUtil.generateSeq(problemSize) println("ReduceByKey : " + Util.measureMultiple( reduceByKey(ss, (x:Int, y:Int) ⇒ x+y ), runs )) println("ReduceByKey2: " + Util.measureMultiple( reduceByKey2(ss, (x:Int, y:Int) ⇒ x+y ), runs )) println("ReduceByKey3: " + Util.measureMultiple( reduceByKey3(ss, (x:Int, y:Int) ⇒ x+y ), runs )) println("ReduceByKeyPaired: " + Util.measureMultiple( ss.reduceByKey( (x:Int, y:Int) ⇒ x+y ), runs )) println("ReduceByKeyA: " + Util.measureMultiple( reduceByKeyA( ss, (x:Int, y:Int) ⇒ x+y ), runs )) } // ============================================================================= // Different implementations // ============================================================================= def reduceByKey[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = { val t = s.groupBy(x => x._1) val u = t.map { case (k,v) => (k, v.map(_._2).reduce(fnc))} u.toSeq } def reduceByKey2[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = { val r = s.foldLeft( Map[A,B]() ){ (m,a) ⇒ val k = a._1 val v = a._2 m.get(k) match { case Some(pv) ⇒ m + ((k, fnc(pv, v))) case None ⇒ m + ((k, v)) } } r.toSeq } def reduceByKey3[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = { var m = scala.collection.mutable.Map[A,B]() s.foreach{ e ⇒ val k = e._1 val v = e._2 m.get(k) match { case Some(pv) ⇒ m(k) = fnc(pv, v) case None ⇒ m(k) = v } } m.toSeq } /** * Method code from [[http://ideone.com/dyrkYM]] * All rights to Muhammad-Ali A''rabi according to [[https://issues.scala-lang.org/browse/SI-9064]] */ def reduceByKeyA[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B): Map[A, B] = { s.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce( fnc ))) } /** * Method code from [[http://ideone.com/dyrkYM]] * All rights to Muhammad-Ali A''rabi according to [[https://issues.scala-lang.org/browse/SI-9064]] */ class PairedIterable[K, V](x: Iterable[(K, V)]) { def reduceByKey(func: (V,V) => V) = { val map = new HashMap[K, V] x.foreach { pair => val old = map.get(pair._1) map.put(pair._1, if (old == null) pair._2 else func(old, pair._2)) } map } } }

produciendo los siguientes resultados en mi máquina

..........ReduceByKey : 723ms, 782ms, 761ms, 617ms, 640ms, 707ms, 634ms, 611ms, 380ms, 458ms Average: 631ms. ..........ReduceByKey2: 580ms, 458ms, 452ms, 463ms, 462ms, 470ms, 463ms, 465ms, 458ms, 462ms Average: 473ms. ..........ReduceByKey3: 489ms, 466ms, 461ms, 468ms, 555ms, 474ms, 469ms, 457ms, 461ms, 468ms Average: 476ms. ..........ReduceByKeyPaired: 140ms, 124ms, 124ms, 120ms, 122ms, 124ms, 118ms, 126ms, 121ms, 119ms Average: 123ms. ..........ReduceByKeyA: 628ms, 694ms, 666ms, 656ms, 616ms, 660ms, 594ms, 659ms, 445ms, 399ms Average: 601ms.

y ReduceByKeyPaired actualmente es el más rápido.

Pregunta / Tarea

¿Hay una implementación más rápida de un solo subproceso (Scala)?


Reescribiendo el método reduceByKey de PairedIterable a la recursión da alrededor de 5-10% de mejora en el rendimiento. Eso todo lo que pude conseguir. También intenté aumentar la asignación de capacidad inicial para HashMap, pero no muestra ningún cambio significativo.

class PairedIterable[K, V](x: Iterable[(K, V)]) { def reduceByKey(func: (V,V) => V) = { val map = new HashMap[K, V]() @tailrec def reduce(it: Iterable[(K, V)]): HashMap[K, V] = { it match { case Nil => map case (k, v) :: tail => val old = map.get(k) map.put(k, if (old == null) v else func(old, v)) reduce(tail) } } val r = reduce(x) r } }

En general, haciendo algunos análisis de comparación de los métodos proporcionados, se pueden dividir en dos categorías.

  • El primer conjunto de reducciones está relacionado con la clasificación (agrupación), ya que podemos ver que esos métodos agregan complejidad extra O(n*log[n]) y no son efectivos para este escenario.

  • Los segundos son con bucle lineal a través de todos los enries de Iterable . Esos conjuntos de métodos tienen operaciones extra get / put para el mapa temporal. Pero esos puts / puts no consumen tanto tiempo - O(n)*O(c) . Además, la necesidad de trabajar con Options en las colecciones de scala lo hace menos efectivo.