tutorial for español ejemplo dummies common aprender hadoop rewrite fileoutputstream

hadoop - for - hdfs



Cómo sobrescribir/reutilizar la ruta de salida existente para los trabajos de Hadoop nuevamente y nuevamente (7)

¿Qué hay de eliminar el directorio antes de ejecutar el trabajo?

Puedes hacerlo a través de shell:

hadoop fs -rmr /path/to/your/output/

o a través de la API de Java:

// configuration should contain reference to your namenode FileSystem fs = FileSystem.get(new Configuration()); // true stands for recursively deleting the folder you gave fs.delete(new Path("/path/to/your/output"), true);

Quiero sobrescribir / reutilizar el directorio de salida existente cuando ejecuto mi trabajo de Hadoop a diario. En realidad, el directorio de salida almacenará la salida resumida de los resultados de la ejecución de cada día. Si especifico el mismo directorio de salida, aparece el error "el directorio de salida ya existe".

¿Cómo pasar por alto esta validación?


El TextInputFormat de Hadoop (que supongo que estás usando) no permite sobrescribir un directorio existente. Probablemente para disculparte el dolor de descubrir que borraste por error algo en lo que (y tu grupo) trabajaste muy duro.

Sin embargo, si está seguro de que desea que su carpeta de salida se sobrescriba con el trabajo, creo que la forma más sencilla es cambiar TextOutputFormat un poco como esto:

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> { public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get("mapred.textoutputformat.separator","/t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, true); if (!isCompressed) { return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator); } } }

Ahora está creando FSDataOutputStream ( fs.create(file, true) ) con overwrite = true.


Hadoop ya admite el efecto que parece estar intentando lograr al permitir múltiples rutas de entrada a un trabajo. En lugar de tratar de tener un solo directorio de archivos al que agrega más archivos, tenga un directorio de directorios al que agregar nuevos directorios. Para usar el resultado agregado como entrada, simplemente especifique el globo de entrada como un comodín sobre los subdirectorios (por ejemplo, my-aggregate-output/* ). Para "agregar" nuevos datos al agregado como salida, simplemente especifique un nuevo subdirectorio único del agregado como el directorio de salida, generalmente utilizando una marca de tiempo o algún número de secuencia derivado de sus datos de entrada (por ejemplo, my-aggregate-output/20140415154424 ).


La respuesta de Jungblut es tu solución directa. Como nunca confío en los procesos automatizados para eliminar cosas (personalmente), sugeriré una alternativa:

En lugar de intentar sobrescribir, le sugiero que haga que el nombre de salida de su trabajo sea dinámico, incluido el momento en que se ejecutó.

Algo como " /path/to/your/output-2011-10-09-23-04/ ". De esta manera, puede mantener el rendimiento de su antiguo trabajo en caso de que necesite volver a visitarlo. En mi sistema, que ejecuta más de 10 trabajos diarios, estructuramos el resultado para que sea: /output/job1/2011/10/09/job1out/part-r-xxxxx , /output/job1/2011/10/10/job1out/part-r-xxxxx , etc.


Puede crear un subdirectorio de salida para cada ejecución por tiempo. Por ejemplo, digamos que está esperando un directorio de salida del usuario y luego configúrelo de la siguiente manera:

FileOutputFormat.setOutputPath(job, new Path(args[1]);

Cambia esto por las siguientes líneas:

String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis())); FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));


Si uno está cargando el archivo de entrada (con, por ejemplo, entradas adjuntas) del sistema de archivos local para hadoop sistema de archivos distribuido como tal:

hdfs dfs -put /mylocalfile /user/cloudera/purchase

Entonces, también se podría sobrescribir / reutilizar el directorio de salida existente con -f . No es necesario eliminar o volver a crear la carpeta

hdfs dfs -put -f /updated_mylocalfile /user/cloudera/purchase


Tuve un caso de uso similar, uso MultipleOutputs para resolver esto.

Por ejemplo, si quiero que diferentes trabajos de MapReduce escriban en el mismo directorio /outputDir/ . El trabajo 1 escribe en /outputDir/job1-part1.txt , el trabajo 2 escribe en /outputDir/job1-part2.txt (sin eliminar los archivos que salen).

En general, establezca el directorio de salida en uno aleatorio (se puede eliminar antes de que se ejecute un nuevo trabajo)

FileInputFormat.addInputPath(job, new Path("/randomPath"));

En el reductor / asignador, use MultipleOutputs y configure el escritor para que escriba en el directorio deseado:

public void setup(Context context) { MultipleOutputs mos = new MultipleOutputs(context); }

y:

mos.write(key, value, "/outputDir/fileOfJobX.txt")

Sin embargo, mi caso de uso fue un poco más complicado que eso. Si es solo para escribir en el mismo directorio plano, puede escribir en un directorio diferente y ejecutar un script para migrar los archivos, como: hadoop fs -mv /tmp/* /outputDir

En mi caso de uso, cada trabajo de MapReduce se escribe en diferentes subdirectorios según el valor del mensaje que se está escribiendo. La estructura del directorio puede ser multicapa como:

/outputDir/ messageTypeA/ messageSubTypeA1/ job1Output/ job1-part1.txt job1-part2.txt ... job2Output/ job2-part1.txt ... messageSubTypeA2/ ... messageTypeB/ ...

Cada trabajo de Mapreduce puede escribir en miles de subdirectorios. Y el costo de escribir en un directorio tmp y mover cada archivo al directorio correcto es alto.