tutorial spark parallelize introducción examples español ejemplo curso scala apache-spark

scala - introducción - parallelize spark



Usando reduceByKey en Apache Spark(Scala) (3)

La sintaxis está abajo:

reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],

que dice que para la misma clave en un RDD toma los valores (que definitivamente serán del mismo tipo) realiza la operación provista como parte de la función y devuelve el valor del mismo tipo que el RDD padre.

Tengo una lista de tuplas de tipo: (identificación de usuario, nombre, recuento).

Por ejemplo,

val x = sc.parallelize(List( ("a", "b", 1), ("a", "b", 1), ("c", "b", 1), ("a", "d", 1)) )

Estoy intentando reducir esta colección a un tipo en el que se cuente el nombre de cada elemento.

Entonces, en la parte superior, val x se convierte en:

(a,ArrayBuffer((d,1), (b,2))) (c,ArrayBuffer((b,1)))

Aquí está el código que estoy usando actualmente:

val byKey = x.map({case (id,uri,count) => (id,uri)->count}) val grouped = byKey.groupByKey val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))} val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey grouped2.foreach(println)

Estoy intentando usar reduceByKey ya que funciona más rápido que groupByKey.

¿Cómo se puede implementar reduceByKey en lugar del código anterior para proporcionar el mismo mapeo?


Siguiendo tu código:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

Podrías hacerlo:

val reducedByKey = byKey.reduceByKey(_ + _) scala> reducedByKey.collect.foreach(println) ((a,d),1) ((a,b),2) ((c,b),1)

PairRDDFunctions[K,V].reduceByKey toma una función asociativa de reducción que se puede aplicar al tipo V del RDD [(K, V)]. En otras palabras, necesita una función f[V](e1:V, e2:V) : V En este caso particular con suma en Ints: (x:Int, y:Int) => x+y o _ + _ en notación corta de subrayado.

Para el registro: reduceByKey funciona mejor que groupByKey porque groupByKey aplicar la función de reducción localmente antes de la fase de mezcla / reducción. groupByKey obligará a mezclar todos los elementos antes de agruparlos.


Su estructura de datos de origen es: RDD [(String, String, Int)], y reduceByKey solo se puede usar si la estructura de datos es RDD [(K, V)].

val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)] val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)] val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))] val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])] grouped.foreach(println)