apache spark - software - ¿Cómo funciona Spark aggregate-aggregateByKey?
spark examples (2)
aggregateByKey () es bastante diferente de reduceByKey. Lo que sucede es que reduceByKey es una especie de caso particular de aggregateByKey.
aggregateByKey () combinará los valores para una clave particular, y el resultado de dicha combinación puede ser cualquier objeto que especifique. Debe especificar cómo se combinan los valores ("agregados") dentro de una partición (que se ejecuta en el mismo nodo) y cómo se combina el resultado de diferentes particiones (que pueden estar en nodos diferentes). reduceByKey es un caso particular, en el sentido de que el resultado de la combinación (por ejemplo, una suma) es del mismo tipo que los valores, y que la operación cuando se combina desde diferentes particiones también es la misma operación cuando se combinan valores dentro de una dividir.
Un ejemplo: imagina que tienes una lista de pares. Usted lo paraleliza
val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
Ahora quiere "combinarlos" por clave produciendo una suma. En este caso, reduceByKey y aggregateByKey son iguales:
val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))
//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))
Ahora imagine que desea que la agregación sea un conjunto de valores, que es un tipo diferente que los valores, que son enteros (la suma de los enteros también son enteros):
import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])] =Array((b,Set(7)), (a,Set(1, 5, 3)))
Digamos que tengo un sistema de distribución en 3 nodos y mis datos se distribuyen entre esos nodos. por ejemplo, tengo un archivo test.csv que existe en los 3 nodos y contiene 2 columnas de:
**row | id, c.**
---------------
row1 | k1 , c1
row2 | k1 , c2
row3 | k1 , c3
row4 | k2 , c4
row5 | k2 , c5
row6 | k2 , c6
row7 | k3 , c7
row8 | k3 , c8
row9 | k3 , c9
row10 | k4 , c10
row11 | k4 , c11
row12 | k4 , c12
Luego uso SparkContext.textFile para leer el archivo como rdd y así sucesivamente. Por lo que yo entiendo, cada nodo de trabajador de chispa leerá la porción del archivo. Así que ahora digamos que cada nodo almacenará:
- nodo 1: fila 1 ~ 4
- nodo 2: fila 5 ~ 8
- nodo 3: fila 9 ~ 12
Mi pregunta es que digamos que quiero hacer cálculos sobre esos datos, y hay un paso que necesito agrupar la clave, por lo que el par de valores clave sería [k1 [{k1 c1} {k1 c2} {k1 c3}]]..
y así sucesivamente.
Hay una función llamada groupByKey()
que es muy costosa de usar, y se recomienda usar aggregateByKey()
. Entonces me pregunto ¿cómo funciona groupByKey()
y aggregateByKey()
bajo el capó? ¿Alguien puede usar el ejemplo que proporcioné arriba para explicar? Después de barajar, ¿dónde residen las filas en cada nodo?
aggregateByKey()
es casi idéntico a reduceByKey()
(ambos llaman a combineByKey()
detrás de las escenas), excepto que le da un valor de inicio para aggregateByKey()
. La mayoría de las personas están familiarizadas con reduceByKey()
, así que lo reduceByKey()
en la explicación.
La razón por la cual reduceByKey()
es mucho mejor es porque usa una característica de MapReduce llamada combinador. Cualquier función como +
o *
se puede usar de esta manera porque el orden de los elementos a los que se llama no importa. Esto permite que Spark comience a "reducir" valores con la misma clave incluso si aún no están todos en la misma partición.
Por otro lado, groupByKey()
te ofrece más versatilidad ya que escribes una función que tiene un Iterable, lo que significa que incluso puedes extraer todos los elementos en una matriz. Sin embargo, es ineficiente porque para que funcione, el conjunto completo de pares (K,V,)
debe estar en una partición.
El paso que mueve los datos en una operación de tipo reducir generalmente se denomina mezcla , en el nivel más simple, los datos se dividen en particiones para cada nodo (a menudo con un particionador hash) y luego se ordenan en cada nodo.