python - MRjob: ¿Puede un reductor realizar 2 operaciones?
mapreduce (3)
Intento ceder la probabilidad que tiene cada clave, par de valores generados por el mapeador.
Entonces, digamos que el mapeador cede:
a, (r, 5)
a, (e, 6)
a, (w, 7)
Necesito agregar 5 + 6 + 7 = 18 y luego hallar las probabilidades 5/18, 6/18, 7/18
por lo que la salida final del reductor se vería así:
a, [[r, 5, 0.278], [e, 6, 0.33], [w, 7, 0.389]]
hasta ahora, solo puedo obtener el reductor para sumar todos los enteros del valor. ¿Cómo puedo hacer para retroceder y dividir cada instancia entre la suma total?
¡Gracias!
Lo que está haciendo arriba también debería funcionar, pero esto supone que todos los datos de una sola clave encajarán en la memoria. Si lo hace, en Reducer puede mantener todos los valores en la memoria y luego calcular su total para luego calcular el marginal para cada par clave-valor. Esto se conoce comúnmente como el enfoque de "rayas".
Sin embargo, la mayoría de las veces esto podría ser cierto y los datos podrían no ajustarse a la memoria. En este caso, tendrá que encontrar una forma de enviar valores para calcular su total antes del par clave-valor real, de modo que cuando puedan ser utilizados para calcular el marginal y emitir el valor de inmediato.
Este es un candidato para el patrón de diseño de "orden de inversión". Es útil cuando necesitas calcular frecuencias relativas. La idea básica es al final del mapeador emitir 2 pares de clave-valor para cada uno de los datos intermedios donde uno de los pares clave-valor tendrá la misma clave común para todos los valores. Esto se usará para calcular el total.
Ejemplo:
For a, (r, 5) :
---------------
emit (a, r), 5
emit (a, *), 5
For a, (e, 6) :
---------------
emit (a, e), 6
emit (a, *), 6
For a, (w, 7) :
---------------
emit (a, w), 7
emit (a, *), 7
Una vez hecho esto, necesita un particionador que dividirá cada par intermedio de clave-valor utilizando solo el primer valor de la clave. En el ejemplo anterior usando "a".
También necesitará un orden de clasificación clave que siempre coloca la tecla que tiene * en la segunda parte de la clave sobre todo.
De esta forma, todas las teclas intermedias que tengan "a" en la primera parte de la tecla terminarán en el mismo reductor. Además, ordenarán de la manera que se muestra a continuación:
emit (a, *), 5
emit (a, *), 6
emit (a, *), 7
emit (a, e), 6
emit (a, r), 5
emit (a, w), 7
En el reductor a medida que recorre los pares clave-valor, simplemente deberá acumular los valores de las claves si tienen un * en la segunda parte de la clave. Luego puede usar el valor acumulado para calcular su margen para todos los otros pares clave-valor.
total = 0
for(value : values){
if (key.second == *)
total += value
else
emit (key.first , key.second, value, value/total)
}
Este patrón de diseño se conoce comúnmente como Orden de inversión que usa el enfoque de pares. Para obtener más información sobre este y otros patrones de diseño, sugiero leer el capítulo sobre los patrones de diseño de MapReduce en este libro: http://lintool.github.com/MapReduceAlgorithms/ . Muy bien explicado con ejemplos.
Simplemente puede contar la suma, como lo hace, y también mantener los pares en la memoria, para emitir las probabilidades que desee, de la siguiente manera:
reduce (key, list<values>):
int sum = 0;
for (value in values) {
sum = sum + value.frequency; //assuming you can extract two fields in each value: value.word and value.frequency
}
String outputValue = "[";
for (value in values) { //iterate over the values once more
outputValue = outputValue + "["+ value.word + ", " +value.frequency + ", "+ value.frequency/sum +"],"
}
outputValue = outputValue.replaceLast(",","]");
emit (key, outputValue);
Por supuesto, eso es solo un pseudocódigo, ya que no estoy acostumbrado a Python, pero espero que la transición sea bastante fácil.
La solución de Pai es técnicamente correcta, pero en la práctica esto te dará muchas dificultades, ya que establecer el reparto puede ser un gran problema (mira https://groups.google.com/forum/#!topic/mrjob/aV7bNn0sJ2k ).
Puede lograr esta tarea más fácilmente utilizando mrjob.step, y luego creando dos reductores, como en este ejemplo: https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_next_word_stats.py
Para hacerlo en la misma línea que estás describiendo:
from mrjob.job import MRJob
import re
from mrjob.step import MRStep
from collections import defaultdict
wordRe = re.compile(r"[/w]+")
class MRComplaintFrequencyCount(MRJob):
def mapper(self, _, line):
self.increment_counter(''group'',''num_mapper_calls'',1)
#Issue is third column in csv
issue = line.split(",")[3]
for word in wordRe.findall(issue):
#Send all map outputs to same reducer
yield word.lower(), 1
def reducer(self, key, values):
self.increment_counter(''group'',''num_reducer_calls'',1)
wordCounts = defaultdict(int)
total = 0
for value in values:
word, count = value
total+=count
wordCounts[word]+=count
for k,v in wordCounts.iteritems():
# word, frequency, relative frequency
yield k, (v, float(v)/total)
def combiner(self, key, values):
self.increment_counter(''group'',''num_combiner_calls'',1)
yield None, (key, sum(values))
if __name__ == ''__main__'':
MRComplaintFrequencyCount.run()
Esto hace un recuento de palabras estándar y agrega principalmente en el combinador, luego usa "Ninguno" como la clave común, de modo que cada palabra indirectamente se envía al reductor bajo la misma tecla. En el reductor puede obtener el recuento total de palabras y calcular frecuencias relativas.