Hadoop - MapReduce

MapReduce es un marco mediante el cual podemos escribir aplicaciones para procesar grandes cantidades de datos, en paralelo, en grandes grupos de hardware básico de manera confiable.

¿Qué es MapReduce?

MapReduce es una técnica de procesamiento y un modelo de programa para computación distribuida basada en java. El algoritmo MapReduce contiene dos tareas importantes, a saber, Map y Reduce. Map toma un conjunto de datos y los convierte en otro conjunto de datos, donde los elementos individuales se dividen en tuplas (pares clave / valor). En segundo lugar, reduce la tarea, que toma la salida de un mapa como entrada y combina esas tuplas de datos en un conjunto más pequeño de tuplas. Como implica la secuencia del nombre MapReduce, la tarea de reducción siempre se realiza después del trabajo de mapa.

La principal ventaja de MapReduce es que es fácil escalar el procesamiento de datos en múltiples nodos informáticos. En el modelo MapReduce, las primitivas de procesamiento de datos se denominan mapeadores y reductores. A veces, descomponer una aplicación de procesamiento de datos en mapeadores y reductores no es trivial. Pero, una vez que escribimos una aplicación en el formulario MapReduce, escalar la aplicación para que se ejecute en cientos, miles o incluso decenas de miles de máquinas en un clúster es simplemente un cambio de configuración. Esta sencilla escalabilidad es lo que ha atraído a muchos programadores a utilizar el modelo MapReduce.

El algoritmo

  • En general, el paradigma de MapReduce se basa en enviar la computadora a donde residen los datos.

  • El programa MapReduce se ejecuta en tres etapas, a saber, etapa de mapa, etapa de reproducción aleatoria y etapa de reducción.

    • Map stage- El mapa o el trabajo del mapeador es procesar los datos de entrada. Generalmente, los datos de entrada están en forma de archivo o directorio y se almacenan en el sistema de archivos Hadoop (HDFS). El archivo de entrada se pasa a la función del asignador línea por línea. El asignador procesa los datos y crea varios fragmentos pequeños de datos.

    • Reduce stage - Esta etapa es la combinación del Shuffle escenario y el Reduceetapa. El trabajo del Reducer es procesar los datos que provienen del mapeador. Después del procesamiento, produce un nuevo conjunto de resultados, que se almacenará en HDFS.

  • Durante un trabajo de MapReduce, Hadoop envía las tareas de Map y Reduce a los servidores apropiados en el clúster.

  • El marco gestiona todos los detalles del paso de datos, como la emisión de tareas, la verificación de la finalización de la tarea y la copia de datos en el clúster entre los nodos.

  • La mayor parte de la informática tiene lugar en nodos con datos en discos locales que reducen el tráfico de la red.

  • Después de completar las tareas dadas, el clúster recopila y reduce los datos para formar un resultado apropiado y los envía de vuelta al servidor Hadoop.

Entradas y salidas (perspectiva Java)

El marco MapReduce opera en pares <clave, valor>, es decir, el marco ve la entrada al trabajo como un conjunto de pares <clave, valor> y produce un conjunto de pares <clave, valor> como resultado del trabajo , posiblemente de diferentes tipos.

La clave y las clases de valor deben estar serializadas por el marco y, por lo tanto, es necesario implementar la interfaz de escritura. Además, las clases clave tienen que implementar la interfaz Writable-Comparable para facilitar la clasificación por marco. Tipos de entrada y salida de unMapReduce job - (Entrada) <k1, v1> → mapa → <k2, v2> → reducir → <k3, v3> (Salida).

Entrada Salida
Mapa <k1, v1> lista (<k2, v2>)
Reducir <k2, lista (v2)> lista (<k3, v3>)

Terminología

  • PayLoad - Las aplicaciones implementan las funciones Mapa y Reducir y forman el núcleo del trabajo.

  • Mapper - Mapper asigna los pares clave / valor de entrada a un conjunto de pares clave / valor intermedios.

  • NamedNode - Nodo que administra el sistema de archivos distribuido de Hadoop (HDFS).

  • DataNode - Nodo donde los datos se presentan por adelantado antes de que se lleve a cabo cualquier procesamiento.

  • MasterNode - Nodo donde se ejecuta JobTracker y que acepta solicitudes de trabajo de los clientes.

  • SlaveNode - Nodo donde se ejecuta el programa Map and Reduce.

  • JobTracker - Programa trabajos y rastrea los trabajos asignados al rastreador de tareas.

  • Task Tracker - Realiza un seguimiento de la tarea e informa el estado a JobTracker.

  • Job - Un programa es una ejecución de Mapper y Reducer en un conjunto de datos.

  • Task - Una ejecución de un Mapper o un Reducer en un segmento de datos.

  • Task Attempt - Una instancia particular de un intento de ejecutar una tarea en un SlaveNode.

Escenario de ejemplo

A continuación se muestran los datos sobre el consumo eléctrico de una organización. Contiene el consumo eléctrico mensual y el promedio anual de varios años.

ene feb mar abr Mayo jun jul ago sep oct nov dic Promedio
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Si los datos anteriores se proporcionan como entrada, tenemos que escribir aplicaciones para procesarlos y producir resultados como encontrar el año de uso máximo, el año de uso mínimo, etc. Este es un paso adelante para los programadores con un número finito de registros. Simplemente escribirán la lógica para producir la salida requerida y pasarán los datos a la aplicación escrita.

Pero piense en los datos que representan el consumo eléctrico de todas las industrias a gran escala de un estado en particular, desde su formación.

Cuando escribimos aplicaciones para procesar datos masivos,

  • Llevarán mucho tiempo ejecutarlos.

  • Habrá un tráfico de red pesado cuando transfiramos datos de la fuente al servidor de red, etc.

Para solucionar estos problemas, tenemos el framework MapReduce.

Los datos de entrada

Los datos anteriores se guardan como sample.txty dado como entrada. El archivo de entrada se ve como se muestra a continuación.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

Programa de ejemplo

A continuación se muestra el programa para los datos de muestra utilizando el marco MapReduce.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

Guarde el programa anterior como ProcessUnits.java. La compilación y ejecución del programa se explica a continuación.

Programa de Compilación y Ejecución de Unidades de Procesos

Supongamos que estamos en el directorio de inicio de un usuario de Hadoop (por ejemplo, / home / hadoop).

Siga los pasos que se indican a continuación para compilar y ejecutar el programa anterior.

Paso 1

El siguiente comando es crear un directorio para almacenar las clases java compiladas.

$ mkdir units

Paso 2

Descargar Hadoop-core-1.2.1.jar,que se utiliza para compilar y ejecutar el programa MapReduce. Visite el siguiente enlace mvnrepository.com para descargar el archivo jar. Supongamos que la carpeta descargada es/home/hadoop/.

Paso 3

Los siguientes comandos se utilizan para compilar ProcessUnits.java programa y creando un frasco para el programa.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

Etapa 4

El siguiente comando se utiliza para crear un directorio de entrada en HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Paso 5

El siguiente comando se usa para copiar el archivo de entrada llamado sample.txten el directorio de entrada de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Paso 6

El siguiente comando se usa para verificar los archivos en el directorio de entrada.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Paso 7

El siguiente comando se usa para ejecutar la aplicación Eleunit_max tomando los archivos de entrada del directorio de entrada.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Espere un momento hasta que se ejecute el archivo. Después de la ejecución, como se muestra a continuación, la salida contendrá el número de divisiones de entrada, el número de tareas del mapa, el número de tareas reductoras, etc.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40

Paso 8

El siguiente comando se utiliza para verificar los archivos resultantes en la carpeta de salida.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Paso 9

El siguiente comando se usa para ver la salida en Part-00000 archivo. Este archivo es generado por HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

A continuación se muestra el resultado generado por el programa MapReduce.

1981    34 
1984    40 
1985    45

Paso 10

El siguiente comando se utiliza para copiar la carpeta de salida de HDFS al sistema de archivos local para su análisis.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

Comandos importantes

Todos los comandos de Hadoop son invocados por el $HADOOP_HOME/bin/hadoopmando. La ejecución del script de Hadoop sin argumentos imprime la descripción de todos los comandos.

Usage - COMANDO hadoop [--config confdir]

La siguiente tabla enumera las opciones disponibles y su descripción.

No Señor. Opción y descripción
1

namenode -format

Formatea el sistema de archivos DFS.

2

secondarynamenode

Ejecuta el nodo de nombre secundario DFS.

3

namenode

Ejecuta el nodo de nombre DFS.

4

datanode

Ejecuta un nodo de datos DFS.

5

dfsadmin

Ejecuta un cliente de administración DFS.

6

mradmin

Ejecuta un cliente de administración Map-Reduce.

7

fsck

Ejecuta una utilidad de verificación del sistema de archivos DFS.

8

fs

Ejecuta un cliente de usuario de sistema de archivos genérico.

9

balancer

Ejecuta una utilidad de equilibrio de clústeres.

10

oiv

Aplica el visor de fsimage sin conexión a un fsimage.

11

fetchdt

Obtiene un token de delegación del NameNode.

12

jobtracker

Ejecuta el nodo de seguimiento de trabajos de MapReduce.

13

pipes

Ejecuta un trabajo de tuberías.

14

tasktracker

Ejecuta un nodo de seguimiento de tareas de MapReduce.

15

historyserver

Ejecuta servidores de historial de trabajos como un demonio independiente.

dieciséis

job

Manipula los trabajos de MapReduce.

17

queue

Obtiene información sobre JobQueues.

18

version

Imprime la versión.

19

jar <jar>

Ejecuta un archivo jar.

20

distcp <srcurl> <desturl>

Copia archivos o directorios de forma recursiva.

21

distcp2 <srcurl> <desturl>

DistCp versión 2.

22

archive -archiveName NAME -p <parent path> <src>* <dest>

Crea un archivo hadoop.

23

classpath

Imprime la ruta de clases necesaria para obtener el jar de Hadoop y las bibliotecas necesarias.

24

daemonlog

Obtener / establecer el nivel de registro para cada demonio

Cómo interactuar con trabajos de MapReduce

Uso: trabajo de hadoop [GENERIC_OPTIONS]

Las siguientes son las opciones genéricas disponibles en un trabajo de Hadoop.

No Señor. GENERIC_OPTION y descripción
1

-submit <job-file>

Envía el trabajo.

2

-status <job-id>

Imprime el mapa y reduce el porcentaje de finalización y todos los contadores de trabajos.

3

-counter <job-id> <group-name> <countername>

Imprime el valor del contador.

4

-kill <job-id>

Mata el trabajo.

5

-events <job-id> <fromevent-#> <#-of-events>

Imprime los detalles de los eventos recibidos por jobtracker para el rango dado.

6

-history [all] <jobOutputDir> - history < jobOutputDir>

Imprime detalles del trabajo, detalles de propinas fallidas y eliminadas. Se pueden ver más detalles sobre el trabajo, como las tareas exitosas y los intentos realizados para cada tarea, especificando la opción [todos].

7

-list[all]

Muestra todos los trabajos. -La lista muestra solo los trabajos que aún no se han completado.

8

-kill-task <task-id>

Mata la tarea. Las tareas eliminadas NO se cuentan como intentos fallidos.

9

-fail-task <task-id>

Fracasa la tarea. Las tareas fallidas se contabilizan como intentos fallidos.

10

-set-priority <job-id> <priority>

Cambia la prioridad del trabajo. Los valores de prioridad permitidos son MUY_ALTA, ALTA, NORMAL, BAJA, MUY_BAJA

Para ver el estado del trabajo

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

Para ver el historial del directorio de salida del trabajo

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

Para matar el trabajo

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004