todf spark read python apache-spark lambda aggregate rdd

read - spark shell python



Explicar la funcionalidad agregada en Spark (7)

Estoy buscando una explicación mejor de la funcionalidad agregada que está disponible a través de spark en python.

El ejemplo que tengo es el siguiente (usando pyspark de la versión Spark 1.2.0)

sc.parallelize([1,2,3,4]).aggregate( (0, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Salida:

(10, 4)

Obtengo el resultado esperado (10,4) que es la suma de 1+2+3+4 y 4 elementos. Si cambio el valor inicial pasado a la función de agregado a (1,0) desde (0,0) obtengo el siguiente resultado

sc.parallelize([1,2,3,4]).aggregate( (1, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Salida:

(19, 4)

El valor aumenta en 9. Si lo cambio a (2,0) , el valor pasa a (28,4) y así sucesivamente.

¿Puede alguien explicarme cómo se calcula este valor? Esperaba que el valor subiera en 1, no en 9, se esperaba ver (11,4) vez de eso, estoy viendo (19,4) .


Agregado le permite transformar y combinar los valores del RDD a voluntad.

Utiliza dos funciones:

El primero transforma y agrega los elementos de la colección original [T] en un agregado local [U] y toma la forma: (U, T) => U. Puede verlo como un pliegue y, por lo tanto, también requiere un cero para esa operación. Esta operación se aplica localmente a cada partición en paralelo.

Aquí es donde se encuentra la clave de la pregunta: el único valor que se debe usar aquí es el valor CERO para la operación de reducción. Esta operación se ejecuta localmente en cada partición, por lo tanto, agregar algo a ese valor cero se agregará al resultado multiplicado por el número de particiones del RDD.

La segunda operación toma 2 valores del tipo de resultado de la operación anterior [U] y la combina en un valor. Esta operación reducirá los resultados parciales de cada partición y producirá el total real.

Por ejemplo: dado un RDD de cadenas:

val rdd:RDD[String] = ???

Digamos que quieres el agregado de la longitud de las cadenas en ese RDD, por lo que harías:

1) La primera operación transformará las cadenas en tamaño (int) y acumulará los valores para el tamaño.

val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`

2) proporcione el CERO para la operación de adición (0)

val ZERO = 0

3) una operación para agregar dos enteros juntos:

val add: (Int, Int) => Int = _ + _

Poniendolo todo junto:

rdd.aggregate(ZERO, stringSizeCummulator, add)

Entonces, ¿por qué se necesita el ZERO? Cuando la función cummulator se aplica al primer elemento de una partición, no hay un total acumulado. ZERO se usa aquí.

P.ej. Mi RDD es: - Partición 1: ["Jump", "over"] - Partition 2: ["the", "wall"]

Esto dará como resultado:

P1:

  1. stringSizeCummulator (ZERO, "Jump") = 4
  2. stringSizeCummulator (4, "over") = 8

P2:

  1. stringSizeCummulator (ZERO, "the") = 3
  2. stringSizeCummulator (3, "muro") = 7

Reducir: agregar (P1, P2) = 15


Grandes explicaciones, realmente me ayudó a entender el funcionamiento subyacente de la función de agregado. He jugado con él por un tiempo y descubrí lo siguiente.

  • si usa acc como (0,0), no cambiará el resultado de la salida de la función.

  • si se cambia el acumulador inicial, procesará el resultado como se muestra a continuación

[suma de los elementos de RDD + valor inicial de acc * Nº de particiones de RDD + valor inicial de acc]

para la pregunta aquí, sugeriría verificar las particiones ya que el número de particiones debería ser 8, según mi entendimiento, ya que cada vez que procesamos el seq op en una partición de RDD, comenzará con la suma inicial de acc result y también cuando va a hacer el peine Op. volverá a usar el valor inicial de acc una vez.

por ejemplo, Lista (1,2,3,4) y acc (1,0)

Obtener particiones en scala por RDD.partitions.size

si las particiones son 2 y el número de elementos es 4, entonces => [10 + 1 * 2 + 1] => (13,4)

si la partición es 4 y el número de elementos es 4, entonces => [10 + 1 * 4 + 1] => (15,4)

Espero que esto ayude, puedes consultar here para obtener una explicación. Gracias.


Intento muchos experimentos sobre esta pregunta. Es mejor establecer num de partición para agregar. el seqOp procesará cada partición y aplicará el valor inicial, qué más, combOp también aplicará el valor inicial cuando combine todas las particiones. Entonces, presento el formato para esta pregunta:

final result = sum(list) + num_Of_Partitions * initial_Value + 1


No estaba completamente convencido de la respuesta aceptada, y la respuesta de JohnKnight me ayudó, así que este es mi punto de vista:

Primero, explique aggregate() en mis palabras:

Prototipo :

agregado (zeroValue, seqOp, combOp)

Descripción :

aggregate() permite tomar un RDD y generar un único valor que es de un tipo diferente al almacenado en el RDD original.

Parámetros :

  1. zeroValue : El valor de inicialización, para su resultado, en el formato deseado.
  2. seqOp : la operación que desea aplicar a los registros RDD. Se ejecuta una vez por cada registro en una partición.
  3. combOp : define cómo se combOp los objetos resultantes (uno para cada partición).

Ejemplo :

Calcule la suma de una lista y la longitud de esa lista. Devuelve el resultado en un par de (sum, length) .

En un shell Spark, primero creé una lista con 4 elementos, con 2 particiones :

listRDD = sc.parallelize([1,2,3,4], 2)

luego definí mi seqOp :

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

y mi combOp :

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

y luego agregué:

listRDD.aggregate( (0, 0), seqOp, combOp) Out[8]: (10, 4)

Como puede ver, di nombres descriptivos a mis variables, pero permítanme explicarlo más a fondo:

La primera partición tiene la sublista [1, 2]. Aplicaremos el seqOp a cada elemento de esa lista y esto producirá un resultado local, un par de (sum, length) , que reflejará el resultado localmente, solo en esa primera partición.

Entonces, comencemos: local_result se inicializa al parámetro zeroValue con el que zeroValue aggregate() , es decir (0, 0) y list_element es el primer elemento de la lista, es decir 1. Como resultado, esto es lo que sucede:

0 + 1 = 1 0 + 1 = 1

Ahora, el resultado local es (1, 1), eso significa que, hasta el momento, para la primera partición, después de procesar solo el primer elemento, la suma es 1 y la longitud 1. Observe que local_result se actualiza desde (0, 0), a (1, 1).

1 + 2 = 3 1 + 1 = 2

y ahora el resultado local es (3, 2), que será el resultado final de la primera partición, ya que no hay otros elementos en la sublista de la primera partición.

Haciendo lo mismo para la segunda partición, obtenemos (7, 2).

Ahora aplicamos el combOp a cada resultado local, para que podamos formar el resultado final global, como este: (3,2) + (7,2) = (10, 4)

Ejemplo descrito en ''figura'':

(0, 0) <-- zeroValue [1, 2] [3, 4] 0 + 1 = 1 0 + 3 = 3 0 + 1 = 1 0 + 1 = 1 1 + 2 = 3 3 + 4 = 7 1 + 1 = 2 1 + 1 = 2 | | v v (3, 2) (7, 2) / / / / / / / / / / / / ------------ | combOp | ------------ | v (10, 4)

Inspirado por este gran example .

Así que ahora si zeroValue no es (0, 0), sino (1, 0), uno esperaría obtener (8 + 4, 2 + 2) = (12, 4), lo que no explica lo que experimentas. Incluso si modificamos el número de particiones de mi ejemplo, no podré volver a obtenerlo.

La clave aquí es la respuesta de JohnKnight, que establece que zeroValue no solo es análogo al número de particiones, sino que puede aplicarse más veces de lo esperado.


No tengo suficientes puntos de reputación para comentar sobre la respuesta anterior de Maasg. En realidad, el valor cero debería ser ''neural'' hacia el seqop, lo que significa que no interferiría con el resultado de seqop, como 0 para agregar, o 1 hacia *;

NUNCA debes intentar con valores no neuronales ya que podría aplicarse tiempos arbitrarios. Este comportamiento no solo está relacionado con el número de particiones.

Probé el mismo experimento que se indica en la pregunta. con 1 partición, el valor cero se aplicó 3 veces. con 2 particiones, 6 veces. con 3 particiones, 9 veces y esto continuará.


Para las personas que buscan el código equivalente de Scala para el ejemplo anterior, aquí está. La misma lógica, la misma entrada / resultado.

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21 scala> listRDD.collect() res7: Array[Int] = Array(1, 2, 3, 4) scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2)) res10: (Int, Int) = (10,4)


Puede usar el siguiente código (en scala) para ver con precisión qué está haciendo el aggregate . Construye un árbol de todas las operaciones de suma y fusión:

sealed trait Tree[+A] case class Leaf[A](value: A) extends Tree[A] case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A] val zero : Tree[Int] = Leaf(0) val rdd = sc.parallelize(1 to 4).repartition(3)

Y luego, en el caparazón:

scala> rdd.glom().collect() res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

Entonces, tenemos estas 3 particiones: [4], [1,2] y [3].

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r)) res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

Puede representar el resultado como un árbol:

+ | /__________________ + + | /________ | / + + + 2 | / | / | / 0 + 0 3 0 1 | / 0 4

Puede ver que se crea un primer elemento cero en el nodo del controlador (a la izquierda del árbol) y, a continuación, los resultados de todas las particiones se fusionan uno por uno. También verá que si reemplaza 0 por 1 como lo hizo en su pregunta, agregará 1 a cada resultado en cada partición, y también agregará 1 al valor inicial en el controlador. Entonces, la cantidad total de tiempo que se usa el valor cero que se da es:

number of partitions + 1 .

Entonces, en tu caso, el resultado de

aggregate( (X, Y), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

estarán:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

La implementación de aggregate es bastante simple. Se define en RDD.scala, línea 1107 :

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }