wordcount spark google example java hadoop

java - spark - Implementación para CombineFileInputFormat Hadoop 0.20.205



mapreduce spark (1)

¿Puede alguien señalar dónde podría encontrar una implementación para CombineFileInputFormat (utilizando org Höchoop 0.20.205? Esto es para crear divisiones grandes a partir de archivos de registro muy pequeños (texto en líneas) utilizando EMR.

Es sorprendente que Hadoop no tenga una implementación predeterminada para esta clase hecha específicamente para este propósito y que en Google parezca que no soy el único confundido por esto. Necesito compilar la clase y agruparla en un jar para hadoop-streaming, con un conocimiento limitado de Java esto es un desafío.

Editar: Ya probé el ejemplo yetitrails, con las importaciones necesarias, pero obtengo un error de compilación para el siguiente método.


Aquí hay una implementación que tengo para ti:

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileRecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; @SuppressWarnings("deprecation") public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class); } public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> { private final LineRecordReader linerecord; public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations()); linerecord = new LineRecordReader(conf, filesplit); } @Override public void close() throws IOException { linerecord.close(); } @Override public LongWritable createKey() { // TODO Auto-generated method stub return linerecord.createKey(); } @Override public Text createValue() { // TODO Auto-generated method stub return linerecord.createValue(); } @Override public long getPos() throws IOException { // TODO Auto-generated method stub return linerecord.getPos(); } @Override public float getProgress() throws IOException { // TODO Auto-generated method stub return linerecord.getProgress(); } @Override public boolean next(LongWritable key, Text value) throws IOException { // TODO Auto-generated method stub return linerecord.next(key, value); } } }

En su trabajo primero configure el parámetro mapred.max.split.size acuerdo con el tamaño en el que desea que se combinen los archivos de entrada. Haz algo como lo siguiente en tu ejecución () :

... if (argument != null) { conf.set("mapred.max.split.size", argument); } else { conf.set("mapred.max.split.size", "134217728"); // 128 MB } ... conf.setInputFormat(CombinedInputFormat.class); ...