java - que - mapreduce mongodb
Hadoop DistributedCache está en desuso: ¿cuál es la API preferida? (6)
Mis tareas de mapa necesitan algunos datos de configuración, que me gustaría distribuir a través de la memoria caché distribuida.
El Tutorial de Hadoop MapReduce muestra el usage de la clase DistributedCache, más o menos de la siguiente manera:
// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf);
// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...
Sin embargo, DistributedCache
está marcado como obsoleto en Hadoop 2.2.0.
¿Cuál es la nueva forma preferida de lograr esto? ¿Hay algún ejemplo o tutorial actualizado que cubra esta API?
La nueva API DistributedCache para YARN / MR2 se encuentra en la clase org.apache.hadoop.mapreduce.Job
.
Job.addCacheFile()
Desafortunadamente, todavía no hay muchos ejemplos completos de este tipo de tutoriales.
Las API para el caché distribuido se pueden encontrar en la clase de trabajo en sí. Consulte la documentación aquí: http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html El código debería ser algo así como
Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());
En tu código mapper:
Path[] localPaths = context.getLocalCacheFiles();
...
Ninguna de las soluciones mencionadas funcionó para mí en forma completa. Podría porque la versión de Hadoop sigue cambiando. Estoy usando hadoop 2.6.4. Esencialmente, DistributedCache está en desuso, así que no quería usar eso. Como algunas de las publicaciones sugieren que usemos AddCacheFile (), sin embargo, ha cambiado un poco. Así es como me funcionó
job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));
Aquí XXXX puede ser la dirección IP maestra o el host local. The EnglishStop.txt se almacenó en HDFS en / location.
hadoop fs -ls /
El resultado es
-rw-r--r-- 3 centos supergroup 1833 2016-03-12 20:24 /EnglishStop.txt
drwxr-xr-x - centos supergroup 0 2016-03-12 19:46 /test
Divertido pero conveniente, # EnglishStop.txt significa que ahora podemos acceder a él como "EnglishStop.txt" en mapper. Aquí está el código para el mismo
public void setup(Context context) throws IOException, InterruptedException
{
File stopwordFile = new File("EnglishStop.txt");
FileInputStream fis = new FileInputStream(stopwordFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
while ((stopWord = reader.readLine()) != null) {
// stopWord is a word read from Cache
}
}
Esto solo funcionó para mí. Puede leer la línea del archivo almacenado en HDFS
No utilicé job.addCacheFile (). En su lugar, utilicé la opción de archivos como "-files /path/to/myfile.txt#myfile" como antes. Luego en el mapeador o código reductor utilizo el siguiente método:
/**
* This method can be used with local execution or HDFS execution.
*
* @param context
* @param symLink
* @param throwExceptionIfNotFound
* @return
* @throws IOException
*/
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException
{
URI[] uris = context.getCacheFiles();
if(uris==null||uris.length==0)
{
if(throwExceptionIfNotFound)
throw new RuntimeException("Unable to find file with symlink ''"+symLink+"'' in distributed cache");
return null;
}
URI symlinkUri = null;
for(URI uri: uris)
{
if(symLink.equals(uri.getFragment()))
{
symlinkUri = uri;
break;
}
}
if(symlinkUri==null)
{
if(throwExceptionIfNotFound)
throw new RuntimeException("Unable to find file with symlink ''"+symLink+"'' in distributed cache");
return null;
}
//if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);
}
Luego en mapper / reductor:
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
super.setup(context);
File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
... do work ...
}
Tenga en cuenta que si utilicé "-files /path/to/myfile.txt" directamente, entonces necesito usar "myfile.txt" para acceder al archivo ya que ese es el nombre de enlace simbólico predeterminado.
Para expandir @jtravaglini, la forma preferida de usar DistributedCache
para YARN / MapReduce 2 es la siguiente:
En su controlador, use Job.addCacheFile()
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "MyJob");
job.setMapperClass(MyMapper.class);
// ...
// Mind the # sign after the absolute file location.
// You will be using the name after the # sign as your
// file name in your Mapper/Reducer
job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));
return job.waitForCompletion(true) ? 0 : 1;
}
Y en su Mapper / Reducer, anule el método de setup(Context context)
:
@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
if (context.getCacheFiles() != null
&& context.getCacheFiles().length > 0) {
File some_file = new File("./some");
File other_file = new File("./other");
// Do things to these two files, like read them
// or parse as JSON or whatever.
}
super.setup(context);
}
Yo tuve el mismo problema. Y no solo se desaprovecha DistributedCach sino también getLocalCacheFiles y "nuevo trabajo". Entonces, lo que funcionó para mí es lo siguiente:
Conductor:
Configuration conf = getConf();
Job job = Job.getInstance(conf);
...
job.addCacheFile(new Path(filename).toUri());
En la configuración de Mapper / Reducer:
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
super.setup(context);
URI[] files = context.getCacheFiles(); // getCacheFiles returns null
Path file1path = new Path(files[0])
...
}