tutorial spark example español python apache-spark aggregate average rdd

python - example - spark sql español



Cálculo de promedios para cada LLAVE en un RDD por pares(K, V) en Spark con Python (4)

Ahora, una forma mucho mejor de hacerlo es usar el método rdd.aggregateByKey() . Debido a que ese método está tan poco documentado en la documentación de Apache Spark with Python, y es por eso que escribí estas preguntas y respuestas , hasta hace poco había estado usando la secuencia de códigos anterior. Pero, de nuevo, es menos eficiente, así que evite hacerlo de esa manera a menos que sea necesario.

A continuación, se rdd.aggregateByKey() cómo hacer lo mismo con el método rdd.aggregateByKey() ( recomendado ) ...

Por KEY, simultáneamente calcule SUM (el numerador del promedio que queremos calcular) y COUNT (el denominador del promedio que queremos calcular):

>>> aTuple = (0,0) # As of Python3, you can''t pass a literal sequence to a function. >>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))

Donde lo siguiente es cierto sobre el significado de cada a pares b (para que pueda visualizar lo que está sucediendo):

First lambda expression for Within-Partition Reduction Step:: a: is a TUPLE that holds: (runningSum, runningCount). b: is a SCALAR that holds the next Value Second lambda expression for Cross-Partition Reduction Step:: a: is a TUPLE that holds: (runningSum, runningCount). b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Finalmente, calcule el promedio de cada CLAVE y recopile los resultados.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect() >>> print(finalResult) [(u''2013-09-09'', 11.235365503035176), (u''2013-09-01'', 23.39500642456595), (u''2013-09-03'', 13.53240060820617), (u''2013-09-05'', 13.141148418977687), ... snip ... ]

Espero que esta pregunta y respuesta con aggregateByKey() ayude.

Quiero compartir esta solución en particular Apache Spark con Python porque la documentación es bastante pobre.

Quería calcular el valor promedio de los pares K / V (almacenados en un RDD por pares), mediante KEY. Aquí se muestra el aspecto de los datos de muestra:

>>> rdd1.take(10) # Show a small sample. [(u''2013-10-09'', 7.60117302052786), (u''2013-10-10'', 9.322709163346612), (u''2013-10-10'', 28.264462809917358), (u''2013-10-07'', 9.664429530201343), (u''2013-10-07'', 12.461538461538463), (u''2013-10-09'', 20.76923076923077), (u''2013-10-08'', 11.842105263157894), (u''2013-10-13'', 32.32514177693762), (u''2013-10-13'', 26.249999999999996), (u''2013-10-13'', 10.693069306930692)]

Ahora la siguiente secuencia de código es una forma menos que óptima para hacerlo, pero funciona. Es lo que estaba haciendo antes de encontrar una mejor solución. No es terrible, pero, como verán en la sección de respuestas, hay una forma más concisa y eficiente.

>>> import operator >>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u''2013-09-09'': 215, u''2013-09-08'': 69, ... snip ...} >>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs). >>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it''s denominator (i.e. COUNT) >>> print(rdd1.collect()) [(u''2013-10-09'', 11.235365503035176), (u''2013-10-07'', 23.39500642456595), ... snip ... ]


En mi opinión, un equivalente más legible a aggregateByKey con dos lambdas es:

rdd1 = rdd1 / .mapValues(lambda v: (v, 1)) / .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

De esta forma, el cálculo promedio completo sería:

avg_by_key = rdd1 / .mapValues(lambda v: (v, 1)) / .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) / .mapValues(lambda v: v[0]/v[1]) / .collectAsMap()


Solo agregue una nota sobre una solución intuitiva y más corta (pero mala) para este problema. El libro Sam''s Teach Yourself Apache Spark en 24 horas ha explicado bien este problema en el último capítulo.

Usando groupByKey uno puede resolver el problema fácilmente así:

rdd = sc.parallelize([ (u''2013-10-09'', 10), (u''2013-10-09'', 10), (u''2013-10-09'', 13), (u''2013-10-10'', 40), (u''2013-10-10'', 45), (u''2013-10-10'', 50) ]) rdd / .groupByKey() / .mapValues(lambda x: sum(x) / len(x)) / .collect()

Salida:

[(''2013-10-10'', 45.0), (''2013-10-09'', 11.0)]

Esto es intuitivo y atractivo, ¡pero no lo use ! groupByKey no hace ninguna combinación en los mapeadores y trae todos los pares de valores clave individuales al reductor.

Evite groupByKey tanto como sea posible. Vaya con la solución reduceByKey como @ pat''s.


Una ligera mejora en la respuesta de prismalytics.io.

Podría haber un caso donde la computación de la suma podría desbordar el número porque estamos sumando un gran número de valores. En su lugar, podríamos mantener los valores promedio y seguir calculando el promedio del promedio y los conteos de dos partes que se reducen.

Si tiene dos partes que tienen promedio y cuenta como (a1, c1) y (a2, c2), el promedio general es: total / recuentos = (total1 + total2) / (recuento1 + recuentos2) = (a1 * c1 + a2 * c2) / (c1 + c2)

Si marcamos R = c2 / c1, se puede volver a escribir como a1 / (1 + R) + a2 * R / (1 + R) Si además marcamos Ri como 1 / (1 + R), podemos escríbelo como a1 * Ri + a2 * R * Ri

myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10]) sumcount_rdd = myrdd.map(lambda n : (n, 1)) def avg(A, B): R = 1.0*B[1]/A[1] Ri = 1.0/(1+R); av = A[0]*Ri + B[0]*R*Ri return (av, B[1] + A[1]); (av, counts) = sumcount_rdd.reduce(avg) print(av)

Este enfoque se puede convertir para valor-clave simplemente usando mapValues ​​en lugar de map y reduceByKey en lugar de reducir.

Esto es de: https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2