java - tutorial - reduce mapreduce
Clase de clave incorrecta: el texto no es IntWritable (1)
Tu problema es que configuras la clase Reducir como un combinador
conf.setCombinerClass(Reduce.class);
Los combinados se ejecutan en la fase del mapa y necesitan emitir el mismo tipo de clave / valor (IntWriteable, IntWritable en su caso) elimine esta línea y usted debería estar bien.
Esto puede parecer una pregunta estúpida, pero no veo el problema en mis tipos en mi código mapreduce para hadoop
Como se indicó en la pregunta, el problema es que está esperando IntWritable pero le estoy pasando un objeto de texto en collector.collect del reductor.
La configuración de mi trabajo tiene las siguientes clases de salida del asignador:
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(IntWritable.class);
Y las siguientes clases de salida del reductor:
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
Mi clase de mapeo tiene la siguiente definición:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable>
con la función requerida:
public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter)
Y luego falla cuando llamo:
output.collect(new Text(),new IntWritable());
Soy bastante nuevo en la reducción de mapas, pero parece que todos los tipos parecen coincidir, se compila pero falla en esa línea y dice que espera una IntWritable como clave para la clase de reducción. Si importa, estoy usando la versión 0.21 de Hadoop
Aquí está mi clase de mapa:
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable node = new IntWritable();
private IntWritable edge = new IntWritable();
public void map(LongWritable key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
node.set(Integer.parseInt(tokenizer.nextToken()));
edge.set(Integer.parseInt(tokenizer.nextToken()));
if(node.get() < edge.get())
output.collect(node, edge);
}
}
}
y mi clase de reducción:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable> {
IntWritable $ = new IntWritable(Integer.MAX_VALUE);
Text keyText = new Text();
public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
ArrayList<IntWritable> valueList = new ArrayList<IntWritable>();
//outputs original edge pair as key and $ for value
while (values.hasNext()) {
IntWritable value = values.next();
valueList.add(value);
keyText.set(key.get() + ", " + value.get());
output.collect(keyText, $);
}
//outputs all the 2 length pairs
for(int i = 0; i < valueList.size(); i++)
for(int j = i+1; i < valueList.size(); j++)
output.collect(new Text(valueList.get(i).get() + ", " + valueList.get(j).get()), key);
}
}
y mi configuración de trabajo:
JobConf conf = new JobConf(Triangles.class);
conf.setJobName("mapred1");
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path("mapred1"));
JobClient.runJob(conf);