tutorial que google example java hadoop mapreduce

java - que - mapreduce mongodb



Hadoop DistributedCache está en desuso: ¿cuál es la API preferida? (6)

Mis tareas de mapa necesitan algunos datos de configuración, que me gustaría distribuir a través de la memoria caché distribuida.

El Tutorial de Hadoop MapReduce muestra el usage de la clase DistributedCache, más o menos de la siguiente manera:

// In the driver JobConf conf = new JobConf(getConf(), WordCount.class); ... DistributedCache.addCacheFile(new Path(filename).toUri(), conf); // In the mapper Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job); ...

Sin embargo, DistributedCache está marcado como obsoleto en Hadoop 2.2.0.

¿Cuál es la nueva forma preferida de lograr esto? ¿Hay algún ejemplo o tutorial actualizado que cubra esta API?




Ninguna de las soluciones mencionadas funcionó para mí en forma completa. Podría porque la versión de Hadoop sigue cambiando. Estoy usando hadoop 2.6.4. Esencialmente, DistributedCache está en desuso, así que no quería usar eso. Como algunas de las publicaciones sugieren que usemos AddCacheFile (), sin embargo, ha cambiado un poco. Así es como me funcionó

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));

Aquí XXXX puede ser la dirección IP maestra o el host local. The EnglishStop.txt se almacenó en HDFS en / location.

hadoop fs -ls /

El resultado es

-rw-r--r-- 3 centos supergroup 1833 2016-03-12 20:24 /EnglishStop.txt drwxr-xr-x - centos supergroup 0 2016-03-12 19:46 /test

Divertido pero conveniente, # EnglishStop.txt significa que ahora podemos acceder a él como "EnglishStop.txt" en mapper. Aquí está el código para el mismo

public void setup(Context context) throws IOException, InterruptedException { File stopwordFile = new File("EnglishStop.txt"); FileInputStream fis = new FileInputStream(stopwordFile); BufferedReader reader = new BufferedReader(new InputStreamReader(fis)); while ((stopWord = reader.readLine()) != null) { // stopWord is a word read from Cache } }

Esto solo funcionó para mí. Puede leer la línea del archivo almacenado en HDFS


No utilicé job.addCacheFile (). En su lugar, utilicé la opción de archivos como "-files /path/to/myfile.txt#myfile" como antes. Luego en el mapeador o código reductor utilizo el siguiente método:

/** * This method can be used with local execution or HDFS execution. * * @param context * @param symLink * @param throwExceptionIfNotFound * @return * @throws IOException */ public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException { URI[] uris = context.getCacheFiles(); if(uris==null||uris.length==0) { if(throwExceptionIfNotFound) throw new RuntimeException("Unable to find file with symlink ''"+symLink+"'' in distributed cache"); return null; } URI symlinkUri = null; for(URI uri: uris) { if(symLink.equals(uri.getFragment())) { symlinkUri = uri; break; } } if(symlinkUri==null) { if(throwExceptionIfNotFound) throw new RuntimeException("Unable to find file with symlink ''"+symLink+"'' in distributed cache"); return null; } //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink); }

Luego en mapper / reductor:

@Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true); ... do work ... }

Tenga en cuenta que si utilicé "-files /path/to/myfile.txt" directamente, entonces necesito usar "myfile.txt" para acceder al archivo ya que ese es el nombre de enlace simbólico predeterminado.


Para expandir @jtravaglini, la forma preferida de usar DistributedCache para YARN / MapReduce 2 es la siguiente:

En su controlador, use Job.addCacheFile()

public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf, "MyJob"); job.setMapperClass(MyMapper.class); // ... // Mind the # sign after the absolute file location. // You will be using the name after the # sign as your // file name in your Mapper/Reducer job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some")); job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other")); return job.waitForCompletion(true) ? 0 : 1; }

Y en su Mapper / Reducer, anule el método de setup(Context context) :

@Override protected void setup( Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { File some_file = new File("./some"); File other_file = new File("./other"); // Do things to these two files, like read them // or parse as JSON or whatever. } super.setup(context); }


Yo tuve el mismo problema. Y no solo se desaprovecha DistributedCach sino también getLocalCacheFiles y "nuevo trabajo". Entonces, lo que funcionó para mí es lo siguiente:

Conductor:

Configuration conf = getConf(); Job job = Job.getInstance(conf); ... job.addCacheFile(new Path(filename).toUri());

En la configuración de Mapper / Reducer:

@Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); URI[] files = context.getCacheFiles(); // getCacheFiles returns null Path file1path = new Path(files[0]) ... }