java - tutorial - MultipleOutputFormat en hadoop
mapreduce mongodb (3)
Soy un novato en Hadoop. Estoy probando el programa Wordcount.
Ahora para probar múltiples archivos de salida, utilizo MultipleOutputFormat
. este enlace me ayudó a hacerlo. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html
en mi clase de manejo que tuve
MultipleOutputs.addNamedOutput(conf, "even",
org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
IntWritable.class);
MultipleOutputs.addNamedOutput(conf, "odd",
org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
IntWritable.class);`
y mi clase reducida se convirtió en esto
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
MultipleOutputs mos = null;
public void configure(JobConf job) {
mos = new MultipleOutputs(job);
}
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
if (sum % 2 == 0) {
mos.getCollector("even", reporter).collect(key, new IntWritable(sum));
}else {
mos.getCollector("odd", reporter).collect(key, new IntWritable(sum));
}
//output.collect(key, new IntWritable(sum));
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
mos.close();
}
}
Las cosas funcionaron, pero recibo MUCHOS archivos (uno impar y uno incluso para cada mapa).
La pregunta es: ¿cómo puedo tener solo 2 archivos de salida (impar y par) para que cada salida impar de cada map-reduce se escriba en ese archivo impar, y lo mismo para par.
Cada reductor utiliza un OutputFormat para escribir registros en. Es por eso que obtienes un conjunto de archivos pares e impares por reductor. Esto es por diseño para que cada reductor pueda realizar escrituras en paralelo.
Si solo desea un único archivo par e impar, deberá establecer mapred.reduce.tasks en 1. Pero el rendimiento se verá afectado, ya que todos los correlacionadores se alimentarán en un solo reductor.
Otra opción es cambiar el proceso de lectura de estos archivos para aceptar múltiples archivos de entrada, o escribir un proceso separado que combine estos archivos.
Se generarán múltiples archivos de salida en función del número de reductores.
Puede usar hadoop dfs -getmerge para salidas fusionadas
Escribí una clase para hacer esto. Solo use su trabajo:
job.setOutputFormatClass(m_customOutputFormatClass);
Esta es mi clase:
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* TextOutputFormat extension which enables writing the mapper/reducer''s output in multiple files.<br>
* <p>
* <b>WARNING</b>: The number of different folder shuoldn''t be large for one mapper since we keep an
* {@link RecordWriter} instance per folder name.
* </p>
* <p>
* In this class the folder name is defined by the written entry''s key.<br>
* To change this behavior simply extend this class and override the
* {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own
* {@link FolderNameExtractor} implementation.
* </p>
*
*
* @author ykesten
*
* @param <K> - Keys type
* @param <V> - Values type
*/
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {
private String folderName;
private class MultipleFilesRecordWriter extends RecordWriter<K, V> {
private Map<String, RecordWriter<K, V>> fileNameToWriter;
private FolderNameExtractor<K, V> fileNameExtractor;
private TaskAttemptContext job;
public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
this.fileNameExtractor = fileNameExtractor;
this.job = job;
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
String fileName = fileNameExtractor.extractFolderName(key, value);
RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
if (writer == null) {
writer = createNewWriter(fileName, fileNameToWriter, job);
if (writer == null) {
throw new IOException("Unable to create writer for path: " + fileName);
}
}
writer.write(key, value);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
entry.getValue().close(context);
}
}
}
private synchronized RecordWriter<K, V> createNewWriter(String folderName,
Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
try {
this.folderName = folderName;
RecordWriter<K, V> writer = super.getRecordWriter(job);
this.folderName = null;
fileNameToWriter.put(folderName, writer);
return writer;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
Path path = super.getDefaultWorkFile(context, extension);
if (folderName != null) {
String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName();
path = new Path(newPath);
}
return path;
}
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
}
public FolderNameExtractor<K, V> getFolderNameExtractor() {
return new KeyFolderNameExtractor<K, V>();
}
public interface FolderNameExtractor<K, V> {
public String extractFolderName(K key, V value);
}
private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
public String extractFolderName(K key, V value) {
return key.toString();
}
}
}