java - org - mapreduce ejemplo
No estoy ejecutando mi clase de hadoop mapper al analizar xml en hadoop usando XMLInputFormat (1)
Soy nuevo en hadoop, uso la versión Hadoop 2.6.0 y trato de analizar un XML complejo. Después de buscar por un tiempo, sé que para el análisis XML necesitamos escribir un InputFormat personalizado que es el XMLInputFormat de mahout. También tomé una ayuda de este ejemplo
Pero cuando estoy ejecutando mi código después de la clase passig XMLInputformat, no llamará a mi propia clase Mapper y el archivo de salida tendrá 0 datos si utilizo XMLInputFormat en el ejemplo.
Sorprendentemente, si no paso mi clase XMLInputFormat a mi JOB, entonces mi mapper funciona bien y da salida correctamente. ¿Alguien ayudará aquí para señalar lo que me estoy perdiendo aquí?
Mi clase de configuración de trabajo es:
public static void runParserJob(String inputPath, String outputPath) throws IOException {
LOGGER.info("-----runParserJob()-----Start");
Configuration configuration = new Configuration(); configuration.set("xmlinput.start",Constants.XML_INPUT_START_TAG_PRODUCT);
configuration.set("xmlinput.end",Constants.XML_INPUT_END_TAG_PRODUCT);
configuration.set("io.serializations",Constants.XML_IO_SERIALIZATIONS);
Job job = new Job(configuration,Constants.JOB_TITLE);
FileInputFormat.setInputPaths(job, inputPath);
job.setJarByClass(ParserDriver.class);
job.setMapperClass(XMLMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path hdfsOutputPath = new Path(outputPath);
FileOutputFormat.setOutputPath(job, hdfsOutputPath);
FileSystem dfs = FileSystem.get(hdfsOutputPath.toUri(),configuration);
/**Using this condition it will create output at same location
* by deleting older data in that location**/
if(dfs.exists(hdfsOutputPath)){
dfs.delete(hdfsOutputPath,true);
}
try{
job.waitForCompletion(true);
}catch(InterruptedException ie){
LOGGER.error("-----Process interrupted in between Exception-----", ie);
}catch(ClassNotFoundException ce){
LOGGER.error("-----Class not found while running the job-----",ce);
}
}
Mi clase XMLInputFormat es:
public class XmlInputFormat extends TextInputFormat{
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
@Override
public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac) {
return new XmlRecordReader();
}
public static class XmlRecordReader extends RecordReader<LongWritable, Text>{
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)inputSplit;
startTag = taskAttemptContext.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
endTag = taskAttemptContext.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem hdfs = file.getFileSystem(taskAttemptContext.getConfiguration());
fsin = hdfs.open(fileSplit.getPath());
fsin.seek(start);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(fsin.getPos() < end){
if(readUntilMatch(startTag,false)){
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
value.set(buffer.getData(), 0, buffer.getLength());
key.set(fsin.getPos());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}
@Override
public void close() throws IOException {
}
@Override
public LongWritable getCurrentKey() throws IOException,InterruptedException {
return null;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return null;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException{
int i = 0;
while(true){
int b = fsin.read();
//If reaches to EOF
if(b == -1){
return false;
}
//If not then save into the buffer.
if(withinBlock){
buffer.write(b);
}
// check if we''re matching:
if (b == match[i]) {
i++;
if (i >= match.length) return true;
} else i = 0;
// see if we''ve passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
}
}
}
}
¿Puede alguien ayudarme aquí? Gracias por adelantado. Corrígeme si me estoy equivocando en cualquier parte.
No estoy seguro de cómo se ve tu estructura XML, pero por ejemplo si tienes una estructura XML:
<data>
<product id="101" itemCategory="BER" transaction="PUR">
<transaction-id>102A5RET</transaction-id>
<item-name>Blue-Moon-12-PK-BTTLE</item-name>
<item-purchased>2</item-purchased>
<item-price>12.99</item-price>
<time-stamp>2015-04-20 11:12:13 102301</time-stamp>
</product>
.
.
.
</data>
Su clase XMLInputFormat necesitaría saber con qué nodo XML desea trabajar:
configuration.set("xmlinput.start", "<product") //note only <product
configuration.set("xmlinput.end", "</product>") //note only </product>
Espero que esto ayude !!