name keywords etiquetas ejemplos body scala aggregate

keywords - Explicación de la función de escala agregada



meta tags seo (4)

Añadiendo a la respuesta de Rashmit.

  • CombOp se llama solo si la colección se procesa en modo paralelo.

Vea el siguiente ejemplo:

val listP: ParSeq [Int] = List (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .par

val aggregateOp1 = listP.aggregate[String]("Aggregate-")((a, b) => a + b, (s1, s2) => { println("Combiner called , if collections is processed parallel mode") s1 + "," + s2 }) println(aggregateOp1)

OP: Agregado-1, Agregado-2, Agregado-3, Agregado-45, Agregado-6, Agregado-7, Agregado-8, Agregado-910

val list: Seq[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val aggregateOp2 = list.aggregate[String]("Aggregate-")((a, b) => a + b, (s1, s2) => { println("Combiner called , if collections is processed parallel mode") s1 + "," + s2 }) println(aggregateOp2)

}

OP: Agregado-12345678910

En el ejemplo anterior, la operación del combinador se llama solo si la recolección se opera en paralelo

Todavía no entiendo la función agregada:

Por ejemplo, tener:

val x = List(1,2,3,4,5,6) val y = x.par.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x,y) => (x._1 + y._1, x._2 + y._2))

El resultado será: (21,6)

Bueno, creo que (x,y) => (x._1 + y._1, x._2 + y._2) es obtener el resultado en paralelo, por ejemplo, será (1 + 2, 1 + 1 ) y así.

Pero exactamente esta parte que me deja confundido:

(x, y) => (x._1 + y, x._2 + 1)

¿ x._1 + y qué x._1 + y ? y aquí x._2 es 0 ?

Gracias por adelantado.


De la documentation :

def aggregate[B](z: ⇒ B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B

Agrega los resultados de aplicar un operador a elementos posteriores.

Esta es una forma más general de plegar y reducir. Tiene una semántica similar, pero no requiere que el resultado sea un supertipo del tipo de elemento. Atraviesa los elementos en diferentes particiones de forma secuencial, utilizando seqop para actualizar el resultado, y luego aplica combop a los resultados de diferentes particiones. La implementación de esta operación puede operar en un número arbitrario de particiones de colección, por lo que se puede invocar a combop un número arbitrario de veces.

Por ejemplo, uno podría querer procesar algunos elementos y luego producir un Conjunto. En este caso, seqop procesaría un elemento y lo agregaría a la lista, mientras que combop concatenaría dos listas de diferentes particiones juntas. El valor inicial z sería un conjunto vacío.

pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)

Otro ejemplo es el cálculo de la media geométrica a partir de una colección de dobles (por lo general, se requieren grandes dobles para esto). B es el tipo de resultados acumulados z el valor inicial para el resultado acumulado de la partición; este será típicamente el elemento neutral para el operador de seqop (por ejemplo, Nil para la concatenación de lista o 0 para la suma) y se puede evaluar más de una vez por parte de un operador usado para acumular resultados dentro de una partición combinada un operador asociativo usado para combinar resultados de diferentes particiones

En tu ejemplo, B es un Tuple2[Int, Int] . El método seqop luego toma un solo elemento de la lista, con el alcance como y , y actualiza el agregado B a (x._1 + y, x._2 + 1) . Así se incrementa el segundo elemento en la tupla. Esto efectivamente coloca la suma de elementos en el primer elemento de la tupla y el número de elementos en el segundo elemento de la tupla.

El método combop toma los resultados de cada hilo de ejecución paralelo y los combina. La combinación por adición proporciona los mismos resultados como si se ejecutara en la lista de forma secuencial.

Usar B como una tupla es probablemente la parte confusa de esto. Puede dividir el problema en dos problemas secundarios para tener una mejor idea de lo que está haciendo. res0 es el primer elemento de la tupla de resultados, y res1 es el segundo elemento de la tupla de resultados.

// Sums all elements in parallel. scala> x.par.aggregate(0)((x, y) => x + y, (x, y) => x + y) res0: Int = 21 // Counts all elements in parallel. scala> x.par.aggregate(0)((x, y) => x + 1, (x, y) => x + y) res1: Int = 6


En primer lugar, gracias a la respuesta de Diego que me ayudó a conectar los puntos para comprender la función agregada ().

Déjame confesar que anoche no pude dormir bien porque no pude ver cómo funciona de manera interna el agregado (), definitivamente dormiré esta noche definitivamente :-)

Vamos a empezar a entenderlo

val result = List(1,2,3,4,5,6,7,8,9,10).par.aggregate((0, 0)) ( (x, y) => (x._1 + y, x._2 + 1), (x,y) =>(x._1 + y._1, x._2 + y._2) )

resultado: (Int, Int) = (55,10)

La función agregada tiene 3 partes:

  1. Valor inicial de los acumuladores: tuple (0,0) aquí.
  2. seqop: Funciona como foldLeft con valor inicial de 0
  3. combop: combina el resultado generado a través de la paralelización (esta parte me resultó difícil de entender)

Entendamos las 3 partes independientemente:

parte 1: Tupla inicial (0,0)

El agregado () comienza con el valor inicial de los acumuladores x, que es (0,0) aquí. La primera tupla x._1 que inicialmente es 0 se usa para calcular la suma, la segunda tupla x._2 se usa para calcular el número total de elementos en la lista.

parte 2: (x, y) => (x._1 + y, x._2 + 1)

Si sabe cómo funciona foldLeft en Scala, debería ser fácil entender esta parte. La función anterior funciona igual que el plegado hacia la izquierda en nuestra lista (1,2,3,4 ... 10).

Iteration# (x._1 + y, x._2 + 1) 1 (0+1, 0+1) 2 (1+2, 1+1) 3 (3+3, 2+1) 4 (6+4, 3+1) . .... . .... 10 (45+10, 9+1)

así, después de todas las 10 iteraciones obtendrás el resultado (55,10). Si entiendes esta parte, el resto es muy fácil, pero para mí fue la parte más difícil de entender si se completaron todos los cálculos requeridos, entonces, ¿cuál es el uso de la segunda parte, es decir, compop? Estad atentos :-)

parte 3: (x, y) => (x._1 + y._1, x._2 + y._2)

Bueno, esta tercera parte es combOp, que combina el resultado generado por diferentes subprocesos durante la paralelización, recuerde que usamos ''par'' en nuestro código para permitir el cálculo paralelo de la lista:

Lista (1,2,3,4,5,6,7,8,9,10) .par.aggregate (....)

Apache spark está utilizando efectivamente la función agregada para realizar cálculos paralelos de RDD.

Supongamos que nuestra lista (1,2,3,4,5,6,7,8,9,9,10) está siendo calculada por 3 hilos en paralelo. Aquí, cada subproceso está trabajando en una lista parcial y luego nuestro agregado () combOp combinará el resultado del cálculo de cada subproceso utilizando el siguiente código:

(x,y) =>(x._1 + y._1, x._2 + y._2)

Lista original: Lista (1,2,3,4,5,6,7,8,9,9,10)

Thread1 comienza a computar en la lista parcial, digamos (1,2,3,4), Thread2 calcula (5,6,7,8) y Thread3 computa la lista parcial, dice (9,10)

Al final del cálculo, el resultado de Thread-1 será (10,4), el resultado de Thread-2 será (26,4) y el resultado de Thread-3 será (19,2).

Al final de la computación paralela, tendremos ((10,4), (26,4), (19,2))

Iteration# (x._1 + y._1, x._2 + y._2) 1 (0+10, 0+4) 2 (10+26, 4+4) 3 (36+19, 8+2)

que es (55,10).

Finalmente, permítanme repetir que el trabajo seqOp es calcular la suma de todos los elementos de la lista y el número total de listas, mientras que el trabajo de la función de combinación es combinar diferentes resultados parciales generados durante la paralelización.

Espero que la explicación anterior te ayude a entender el agregado ().


el agregado toma 3 parámetros: un valor semilla, una función de cálculo y una función de combinación.

Básicamente, lo que hace es dividir la colección en varios subprocesos, calcular resultados parciales utilizando la función de cálculo y luego combinar todos estos resultados parciales utilizando la función de combinación.

Por lo que puedo decir, su función de ejemplo devolverá un par (a, b) donde a es la suma de los valores en la lista, b es el número de valores en la lista. De hecho, (21, 6).

¿Como funciona esto? El valor semilla es el par (0,0). Para una lista vacía, tenemos una suma de 0 y un número de elementos 0, por lo que es correcto.

Su función de cálculo toma un par (Int, Int) x, que es su resultado parcial, y un Int y, que es el siguiente valor en la lista. Esta es tu:

(x, y) => (x._1 + y, x._2 + 1)

De hecho, el resultado que queremos es aumentar el elemento izquierdo de x (el acumulador) en y, y el elemento derecho de x (el contador) en 1 para cada y.

Su función de combinación toma un par (Int, Int) x y un par (Int, Int) y, que son sus dos resultados parciales de diferentes cálculos paralelos, y los combina como:

(x,y) => (x._1 + y._1, x._2 + y._2)

De hecho, sumamos independientemente las partes izquierdas de los pares y las partes derechas de los pares.

Su confusión proviene del hecho de que x e y en la primera función NO SON los mismos xey de la segunda función. En la primera función, tiene x del tipo del valor semilla, e y del tipo de los elementos de la colección, y devuelve un resultado del tipo de x. En la segunda función, sus dos parámetros son del mismo tipo de valor semilla.

Espero que sea más claro ahora!