wordcount tutorial que google example java hadoop mapreduce

java - tutorial - reduce mapreduce



Clase de clave incorrecta: el texto no es IntWritable (1)

Tu problema es que configuras la clase Reducir como un combinador

conf.setCombinerClass(Reduce.class);

Los combinados se ejecutan en la fase del mapa y necesitan emitir el mismo tipo de clave / valor (IntWriteable, IntWritable en su caso) elimine esta línea y usted debería estar bien.

Esto puede parecer una pregunta estúpida, pero no veo el problema en mis tipos en mi código mapreduce para hadoop

Como se indicó en la pregunta, el problema es que está esperando IntWritable pero le estoy pasando un objeto de texto en collector.collect del reductor.

La configuración de mi trabajo tiene las siguientes clases de salida del asignador:

conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(IntWritable.class);

Y las siguientes clases de salida del reductor:

conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);

Mi clase de mapeo tiene la siguiente definición:

public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable>

con la función requerida:

public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter)

Y luego falla cuando llamo:

output.collect(new Text(),new IntWritable());

Soy bastante nuevo en la reducción de mapas, pero parece que todos los tipos parecen coincidir, se compila pero falla en esa línea y dice que espera una IntWritable como clave para la clase de reducción. Si importa, estoy usando la versión 0.21 de Hadoop

Aquí está mi clase de mapa:

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> { private IntWritable node = new IntWritable(); private IntWritable edge = new IntWritable(); public void map(LongWritable key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { node.set(Integer.parseInt(tokenizer.nextToken())); edge.set(Integer.parseInt(tokenizer.nextToken())); if(node.get() < edge.get()) output.collect(node, edge); } } }

y mi clase de reducción:

public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable> { IntWritable $ = new IntWritable(Integer.MAX_VALUE); Text keyText = new Text(); public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { ArrayList<IntWritable> valueList = new ArrayList<IntWritable>(); //outputs original edge pair as key and $ for value while (values.hasNext()) { IntWritable value = values.next(); valueList.add(value); keyText.set(key.get() + ", " + value.get()); output.collect(keyText, $); } //outputs all the 2 length pairs for(int i = 0; i < valueList.size(); i++) for(int j = i+1; i < valueList.size(); j++) output.collect(new Text(valueList.get(i).get() + ", " + valueList.get(j).get()), key); } }

y mi configuración de trabajo:

JobConf conf = new JobConf(Triangles.class); conf.setJobName("mapred1"); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path("mapred1")); JobClient.runJob(conf);