hadoop streaming chaining

Hadoop Streaming: encadenando trabajos



chaining (1)

Esta respuesta es lo que el indagador en realidad puso en la pregunta. Normalmente lo citaré pero me abstendré de esto ya que es muy grande.

Esta es una documentación sobre cómo encadenar dos o más trabajos de transmisión, utilizando Hadoop Streaming (actualmente 1.0.3).

Para entender el código final que hará el encadenamiento y poder escribir cualquier otro trabajo de cadena, se requiere una teoría preliminar pero práctica.

Antes que nada, ¿qué es un trabajo en Hadoop? Un trabajo de Hadoop es

hadoopJob = Configuration + Execution

dónde,

Configuración : toda la configuración que hace posible la ejecución.

Ejecución : el conjunto de archivos ejecutables o de script que logran la tarea deseada. En otras palabras, el mapa y reducir los pasos de nuestra tarea.

Configuration = hadoopEnvironment + userEnvironment

dónde,

hadoopEnvironment : es la configuración del entorno general de Hadoop. Este entorno general se define a partir de recursos, es decir, archivos xml que se encuentran en el directorio $ HADOOP_HOME / conf. Por ejemplo, algunos recursos son core-site.xml, mapred-site.xml y hadoop-site.xml, que definen el directorio temporal de hdfs, el rastreador de trabajos y el número de nodos de clúster, respectivamente.

userEnvironment : son los argumentos especificados por el usuario cuando se ejecuta un trabajo. En Hadoop, estos argumentos se llaman opciones.

userEnvironment = genericOptions + streamingOptions

dónde,

genericOptions : son generales en el sentido de que apelan a cada trabajo de transmisión, independientemente del trabajo. Se manejan desde GenericsOptionsParser.

StreamOptions : son específicos del trabajo en el sentido de que apelan a un determinado trabajo. Por ejemplo, cada trabajo tiene sus propios directorios o archivos de entrada y salida. Se manejan desde StreamJob.

Esquemáticamente

hadoopJob // / / / / / / / / Configuration Execution // | / / | / / executable or script files / / / / / / hadoopEnvironment userEnvironment | // | / / | / / $HADOOP_HOME/conf / / / / genericOptions streamingOptions | | | | GenericOptionsParser StreamJob

Como cualquiera puede ver, todo lo anterior es una serie de configuraciones. Una parte de esto es para el administrador del clúster (hadoopEnvironment) y la otra parte es para el usuario (userEnvironment) del clúster. Para concluir, un trabajo es principalmente una configuración en un nivel abstracto, si olvidamos por el momento la parte de ejecución.

Nuestro código debe encargarse de todo lo anterior. Ahora estamos listos para escribir código.

Antes que nada, ¿qué es un trabajo de Hadoop en el nivel de código? Es un archivo jar. Cuando enviamos un trabajo, enviamos un archivo jar con algunos argumentos de línea de comando. Por ejemplo, cuando ejecutamos un solo trabajo de transmisión, ejecutamos el comando

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper m.py -reducer r.py -input /in.txt -output /out/

dónde,

nuestro trabajo es el hadoop-streaming-1.0.3.jar

con argumentos de línea de comandos -D mapred.reduce.tasks = 1 -mapper m.py -reducer r.py -input /in.txt -output / out /

Dentro de este tarro está nuestra clase que se encarga de todo correctamente.

Entonces, abrimos un nuevo archivo java, digamos TestChain.java,

// import everything needed public class TestChain { //code here public static void main( String[] args) throws Exception { //code here }//end main }//end TestChain

Para manejar hadoopEnvironment, nuestra clase debe heredar la clase configurada . Class Configured nos da acceso al entorno y a los parámetros de Hadoop, es decir, a los recursos mencionados anteriormente. Los recursos son archivos xml que contienen datos en forma de par nombre / valor.

En el futuro, cada interfaz es más o menos un medio entre el mundo externo y la tarea que el mundo quiere lograr. Dicho esto, una interfaz es la herramienta que utilizamos para realizar nuestra tarea. Nuestra clase en consecuencia es una herramienta. Para esto, nuestra clase debe implementar la interfaz de la herramienta que declara un método run (). Este método define el comportamiento de nuestra herramienta, cuando la interfaz se implementa, por supuesto. Finalmente, para usar nuestra herramienta, utilizamos la clase ToolRunner . ToolRunner, a través de la clase GenericOptionsParser , también ayuda a manejar genericOptions desde userEnvironment.

import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.Tool; // import everything else needed public class TestChain extends Configured implements Tool { public int run( String[] args) throws Exception { //code here return 0; }//end run public static void main( String[] args) throws Exception { // ToolRunner handles generic command line options int res = ToolRunner.run( new Configuration(), new TestChain(), args); System.exit( res); }//end main }//end TestChain

Para completar la imagen, el método run () también se denomina controlador, configura el trabajo, incluida la inicialización y la configuración del trabajo. Observe arriba que hemos delegado a ToolRunner que trata con hadoopEnnvironment con el primer parámetro ''nueva configuración '' del método ToolRunner.run ().

¿Qué hicimos hasta ahora? Simplemente configuramos el entorno en el que funcionará nuestra herramienta. Ahora tenemos que definir nuestra herramienta, es decir, hacer el encadenamiento.

En cuanto a cada trabajo de cadena es un trabajo de transmisión, creamos cada uno de ellos como tal. Hacemos esto usando el método StreamJob.createJob (String [] args) de la clase StreamJob . La matriz args de Strings contiene los argumentos de "línea de comando" de cada trabajo. Estos argumentos de línea de comando se refieren a las opciones de transmisión (trabajo específico) de userEnvironment. Además, estos argumentos están en forma de par parámetro / valor. Por ejemplo, si nuestro trabajo tiene el archivo .txt como entrada, / salida / como directorio de salida, m.py como mapper y r.py como reductor, entonces,

String[] example = new String[] { "-mapper" , "m.py" "-reducer" , "r.py" "-input" , "in.txt" "-output" , "/out/" }

Tienes que tener cuidado en dos cosas. Primero, el "-" es necesario. Es esa pequeña cosa que distingue los parámetros de los valores. Aquí, mapper es un parámetro y m.py es su valor. La diferencia se entiende desde "-". En segundo lugar, si agrega un espacio entre la izquierda "y" - "de un parámetro, este parámetro se ignorará. Si tenemos" -mapper ", entonces" -mapper "no se considera como un parámetro. Cuando StreamJob analiza la matriz de args se ve para pares de parámetro / valor. Una última cosa, recuerde que un trabajo es más o menos una configuración. Esperamos que StreamJob.creatJob () devuelva una configuración o algo similar a eso. De hecho, StreamJob.createJob () devuelve un objeto JobConf Un breve resumen de JobConf es una descripción de un trabajo de mapreduce específico que Hadoop entiende y puede ejecutar por supuesto.

Suponiendo que tenemos tres trabajos para encadenar,

import org.apache.hadoop.util.Tool; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.streaming.StreamJob; // import everything else needed public class TestChain extends Configured implements Tool { public int run( String[] args) throws Exception { String[] job1 = new String[] { "-mapper" , "m1.py" "-reducer" , "r1.py" "-input" , "in1.txt" "-output" , "/out1/" } JobConf job1Conf = new StreamJob.createJob( job1); //code here String[] job2 = new String[] { "-mapper" , "m2.py" "-reducer" , "r2.py" "-input" , "in2.txt" "-output" , "/out2/" } JobConf job2Conf = new StreamJob.createJob( job2); //code here String[] job3 = new String[] { "-mapper" , "m3.py" "-reducer" , "r3.py" "-input" , "in3.txt" "-output" , "/out3/" } JobConf job3Conf = new StreamJob.createJob( job3); //code here return 0; }//end run public static void main( String[] args) throws Exception { // ToolRunner handles generic command line options int res = ToolRunner.run( new Configuration(), new TestChain(), args); System.exit( res); }//end main }//end TestChain

En este punto establecemos el entorno en el que nuestra herramienta va a operar y definimos su comportamiento. Sin embargo, no lo hemos puesto en acción. ToolRunner no es suficiente. ToolRunner, ejecuta nuestra herramienta como un todo. No ejecuta los trabajos de la cadena individual. Tenemos que hacer esto.

Hay dos formas de hacer esto. La primera forma es usar JobClient y la segunda es usar JobControl .

First Way - JobClient

Con JobClient ejecutamos trabajos en cadena como una secuencia, un trabajo se ejecuta después de otro llamando a un cliente de trabajo para cada trabajo. El método que ejecuta cada trabajo individual es JobClient.runJob (jobtorun) donde jobtorun es un objeto JobConf.

import org.apache.hadoop.util.Tool; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.streaming.StreamJob; public class TestChain extends Configured implements Tool { public int run( String[] args) throws Exception { String[] job1 = new String[] { "-mapper" , "m1.py" "-reducer" , "r1.py" "-input" , "in1.txt" "-output" , "/out1/" } JobConf job1Conf = new StreamJob.createJob( job1); JobClient.runJob( job1Conf); String[] job2 = new String[] { "-mapper" , "m2.py" "-reducer" , "r2.py" "-input" , "in2.txt" "-output" , "/out2/" } JobConf job2Conf = new StreamJob.createJob( job2); JobClient.runJob( job2Conf); String[] job3 = new String[] { "-mapper" , "m3.py" "-reducer" , "r3.py" "-input" , "in3.txt" "-output" , "/out3/" } JobConf job3Conf = new StreamJob.createJob( job3); JobClient.runJob( job3Conf); return 0; }//end run public static void main( String[] args) throws Exception { // ToolRunner handles generic command line options int res = ToolRunner.run( new Configuration(), new TestChain(), args); System.exit( res); }//end main }//end TestChain

Una ventaja de esta manera, al utilizar JobClient, es que el progreso del trabajo se imprime en la salida estándar.

Una desventaja de JobClient es que no puede ocuparse de las dependencias entre trabajos.

Segunda forma - JobControl

Con JobControl, todos los trabajos de la cadena son parte de un grupo de trabajos. Aquí, cada trabajo se ejecuta en el marco de ese grupo. Esto implica que cada trabajo de la cadena tiene que ser agregado en el grupo al principio y luego el grupo es el que se ejecuta. El grupo es un FIFO o la ejecución de cada trabajo en el grupo sigue el esquema FCFS (First Come First Served). Cada trabajo se agrega en el grupo con el método JobControl.addJob (jobtoadd).

JobControl puede manejar dependencias a través del método x.addDependingJob (y) donde el trabajo x depende del trabajo y. Eso significa que el trabajo x no puede ejecutarse hasta que finalice el trabajo y. Si el trabajo x es dependiente de ambos trabajos y y z y z es independiente de y, entonces con x.addDependingJob (y) y x.addDependingJob (z) podemos expresar estas dependencias.

JobControl en contradicción con JobClient, "funciona" con objetos Job . Cuando llamamos, por ejemplo, al método x.addDependingJob (y), x, y son objetos Job. Lo mismo vale para JobControl.addJob (jobtoadd), jobtoadd es un objeto de trabajo. Cada objeto Job se crea a partir de un objeto JobConf. Volviendo al código que tenemos,

import org.apache.hadoop.util.Tool; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.streaming.StreamJob; public class TestChain extends Configured implements Tool { public int run( String[] args) throws Exception { //TestChain below is an arbitrary name for the group JobControl jobc = new JobControl( "TestChain"); String[] job1 = new String[] { "-mapper" , "m1.py" "-reducer" , "r1.py" "-input" , "in1.txt" "-output" , "/out1/" } JobConf job1Conf = new StreamJob.createJob( job1); Job job1 = new Job( job1conf); jobc.addJob( job1); String[] job2 = new String[] { "-mapper" , "m2.py" "-reducer" , "r2.py" "-input" , "in2.txt" "-output" , "/out2/" } JobConf job2Conf = new StreamJob.createJob( job2); Job job2 = new Job( job2conf); jobc.addJob( job2); String[] job3 = new String[] { "-mapper" , "m3.py" "-reducer" , "r3.py" "-input" , "/out2/par*" "-output" , "/out3/" } JobConf job3Conf = new StreamJob.createJob( job3); Job job3 = new Job( job3conf); job3.addDependingJob( job2); jobc.addJob( job3); //code here return 0; }//end run public static void main( String[] args) throws Exception { // ToolRunner handles generic command line options int res = ToolRunner.run( new Configuration(), new TestChain(), args); System.exit( res); }//end main }//end TestChain

En el código anterior, observe que job3 depende de job2. Como puede ver, la entrada de job3 es la salida de job2. Este hecho es una dependencia. job3 espera hasta que job2 finalice.

Hasta ahora solo agregamos trabajos de cadena en el grupo y describimos su dependencia. Necesitamos una última cosa para ejecutar este grupo de trabajos.

La fuerza bruta dice que simplemente llame al método JobControl.run (). Aunque este enfoque funciona, es problemático. Cuando se finalizan los trabajos de la cadena, todo el trabajo se ejecuta para siempre. Un enfoque que funciona correctamente es definir un nuevo subproceso de ejecución de nuestro trabajo Subproceso que ya existe (cuando se ejecuta el trabajo). Entonces podemos esperar hasta que los trabajos de la cadena estén listos y luego salir. Mientras tanto, la ejecución de los trabajos de la cadena puede solicitar información sobre la ejecución del trabajo, por ejemplo, cuántos trabajos han finalizado o podemos encontrar si un trabajo está en estado inválido y en qué consiste.

Una ventaja de esta forma de utilizar JobControl es que puede ocuparse de las numerosas dependencias que pueden existir entre trabajos.

Una desventaja de JobControl es que el progreso del trabajo no se imprime en la salida estándar, no se presenta directamente. Si un trabajo falla o tiene éxito, no se imprime nada útil. Debe verificar a través de la IU web de Hadoop o agregar algún código en el ciclo while a continuación para rastrear el estado del trabajo o lo que sea necesario. Finalmente,

import org.apache.hadoop.util.Tool; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.streaming.StreamJob; public class TestChain extends Configured implements Tool { public int run( String[] args) throws Exception { //TestChain below is an arbitrary name for the group JobControl jobc = new JobControl( "TestChain"); String[] job1 = new String[] { "-mapper" , "m1.py" "-reducer" , "r1.py" "-input" , "in1.txt" "-output" , "/out1/" } JobConf job1Conf = new StreamJob.createJob( job1); Job job1 = new Job( job1conf); jobc.addJob( job1); String[] job2 = new String[] { "-mapper" , "m2.py" "-reducer" , "r2.py" "-input" , "in2.txt" "-output" , "/out2/" } JobConf job2Conf = new StreamJob.createJob( job2); Job job2 = new Job( job2conf); jobc.addJob( job2); String[] job3 = new String[] { "-mapper" , "m3.py" "-reducer" , "r3.py" "-input" , "/out2/par*" "-output" , "/out3/" } JobConf job3Conf = new StreamJob.createJob( job3); Job job3 = new Job( job3conf); job3.addDependingJob( job2); jobc.addJob( job3); Thread runjobc = new Thread( jobc); runjobc.start(); while( !jobc.allFinished()) { //do whatever you want; just wait or ask for job information } return 0; }//end run public static void main( String[] args) throws Exception { // ToolRunner handles generic command line options int res = ToolRunner.run( new Configuration(), new TestChain(), args); System.exit( res); }//end main }//end TestChain

ERRORES

Esta sección discute algunos errores que pueden ocurrir. En los mensajes de error a continuación hay una clase OptimizingJoins. Esta clase es una clase solo para demostrar los diversos errores y no tiene relación con esta discusión.

Un paquete no existe mientras intentas compilar.

Esta es una cuestión de classpath. Compile como (para agregar el paquete hadoop-streaming-1.0.3.jar por ejemplo),

javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java

y agregue cualquier paquete faltante.

java.lang.NoClassDefFoundError: org / apache / hadoop / streaming / StreamJob

El error total es,

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/streaming/StreamJob at OptimizingJoins.run(OptimizingJoins.java:135) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at OptimizingJoins.main(OptimizingJoins.java:248) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.hadoop.util.RunJar.main(RunJar.java:156) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.streaming.StreamJob at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:423) at java.lang.ClassLoader.loadClass(ClassLoader.java:356) ... 8 more

Esta es una cuestión de archivo manifiesto de nuestro archivo jar. Cuando compilamos nuestro trabajo en el camino anterior, todo está bien. El compilador de Java encuentra lo que necesita. Pero cuando ejecutamos nuestro trabajo en Hadoop a través del comando

$HADOOP_HOME/bin/hadoop jar /home/hduser/TestChain.jar TestChain

entonces la JVM que ejecuta nuestro jar no puede encontrar StreamJob. Para resolver esto, cuando creamos el archivo jar, ponemos en el jar un archivo de manifiesto que contiene la ruta de clases de StreamJob. Prácticamente,

MANIFEST.MF Manifest-Version: 1.0 Class-Path: /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar Created-By: 1.7.0_07 (Oracle Corporation)

Tenga en cuenta que un archivo MANIFEST.MF siempre termina con una línea en blanco. Nuestro archivo MANIFEST.MF tiene 4 líneas, no 3. Entonces creamos archivos jar, como,

jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class

ERROR streaming.StreamJob: opción no reconocida: -D

Este error ocurre porque StreamJob no puede analizar la opción -D. StreamJob solo puede analizar secuencias, opciones específicas del trabajo, -D es una opción genérica.

Hay dos soluciones para este problema. La primera solución es usar la opción -jobconf en lugar de -D. La segunda solución es analizar la opción -D a través de un objeto GenericOptionsParser. En la segunda solución, por supuesto, debe eliminar la opción -D de StreamJob.createJob () args.

Para dar un ejemplo, una implementación de código "limpio" de la segunda solución es,

import org.apache.hadoop.util.Tool; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.streaming.StreamJob; public class TestChain { public class Job1 extends Configured implements Tool { public int run( String[] args) throws Exception { String[] job1 = new String[] { "-mapper" , "m1.py" "-reducer" , "r1.py" "-input" , "in1.txt" "-output" , "/out1/" } JobConf job1Conf = new StreamJob.createJob( job1); JobClient.runJob( job1Conf); return 0; }//end run } public class Job2 extends Configured implements Tool { public int run( String[] args) throws Exception { String[] job2 = new String[] { "-mapper" , "m2.py" "-reducer" , "r2.py" "-input" , "in2.txt" "-output" , "/out2/" } JobConf job2Conf = new StreamJob.createJob( job2); JobClient.runJob( job2Conf); return 0; }//end run } public class Job3 extends Configured implements Tool { public int run( String[] args) throws Exception { String[] job3 = new String[] { "-mapper" , "m3.py" "-reducer" , "r3.py" "-input" , "in3.txt" "-output" , "/out3/" } JobConf job3Conf = new StreamJob.createJob( job3); JobClient.runJob( job3Conf); return 0; }//end run } public static void main( String[] args) throws Exception { TestChain tc = new TestChain(); //Domination String[] j1args = new String[] { "-D", "mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator", "-D", "mapred.text.key.comparator.options=-k1,1" , "-D", "mapred.reduce.tasks=1" }; // Let ToolRunner handle generic command-line options int j1res = ToolRunner.run( new Configuration(), tc.new Job1(), j1args); //Cost evaluation String[] j2rgs = new String[] { "-D", "mapred.reduce.tasks=12 " , "-D", "mapred.text.key,partitioner.options=-k1,1" }; // Let ToolRunner handle generic command-line options int j2res = ToolRunner.run( new Configuration(), tc.new Job2(), j2args); //Minimum Cost String[] j3args = new String[] { "-D", "mapred.reduce.tasks=1" }; // Let ToolRunner handle generic command-line options int j3res = ToolRunner.run( new Configuration(), tc.new Job1(), j3args); System.exit( mres); } }//end TestChain

En el código anterior, definimos una clase global TestChain que encapsula los trabajos de la cadena. Luego definimos cada trabajo de cadena por separado, es decir, definimos su método de ejecución. Cada trabajo de cadena es una clase que hereda la Herramienta configurada e implementada. Finalmente, desde el método principal de TestChain ejecutamos cada trabajo secuencialmente. Tenga en cuenta que antes de ejecutar cualquier trabajo de cadena definimos sus opciones genéricas.

Compilar

javac -classpath /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar:/usr/local/hadoop/hadoop-core-1.0.3.jar TestChain.java

Tarro

jar cmf META-INF/MANIFEST.MF TestChain.jar TestChain.class TestChain/$Dom.class TestChain/$Cost.class TestChain/$Min.class

ERROR security.UserGroupInformation: PriviledgedActionException como: hduser cause: org.apache.hadoop.mapred.InvalidInputException: Patrón de entrada hdfs: // localhost: 54310 / user / hduser / whateverFile coincide con 0 archivos

Este error ocurre cuando usamos JobControl. Por ejemplo, si un trabajo tiene como entrada la salida de un trabajo anterior, entonces si este archivo de entrada - salida ya no existe, se produce este error. JobControl ejecuta todos los trabajos independientes en "paralelo", y no uno por uno como lo hace JobClient. Entonces, Jobcontrol intenta ejecutar un trabajo cuyos archivos de entrada no existen y por esa razón falla.

Para evitar esta situación, declaramos que existe una dependencia entre estos dos trabajos utilizando x.addDependingJob (y), job x depende del trabajo y. Ahora, JobControl no intenta ejecutarse en trabajos dependientes paralelos.

La pregunta es cómo encadenar trabajos en Hadoop, usando Hadoop Streaming (solo).