MapReduce - Implementación de Hadoop

MapReduce es un marco que se utiliza para escribir aplicaciones para procesar grandes volúmenes de datos en grandes grupos de hardware básico de manera confiable. Este capítulo lo lleva a través del funcionamiento de MapReduce en el marco de Hadoop usando Java.

Algoritmo MapReduce

Generalmente, el paradigma MapReduce se basa en el envío de programas de reducción de mapas a las computadoras donde residen los datos reales.

  • Durante un trabajo de MapReduce, Hadoop envía las tareas de Map y Reduce a los servidores apropiados en el clúster.

  • El marco gestiona todos los detalles del paso de datos, como la emisión de tareas, la verificación de la finalización de la tarea y la copia de datos en el clúster entre los nodos.

  • La mayor parte de la informática tiene lugar en los nodos con datos en discos locales que reducen el tráfico de la red.

  • Después de completar una tarea determinada, el clúster recopila y reduce los datos para formar un resultado apropiado y los envía de vuelta al servidor Hadoop.

Entradas y salidas (perspectiva Java)

El marco MapReduce opera en pares clave-valor, es decir, el marco ve la entrada al trabajo como un conjunto de pares clave-valor y produce un conjunto de pares clave-valor como la salida del trabajo, posiblemente de diferentes tipos.

Las clases de clave y valor deben ser serializables por el marco y, por lo tanto, es necesario implementar la interfaz de escritura. Además, las clases clave deben implementar la interfaz WritableComparable para facilitar la clasificación por marco.

Tanto el formato de entrada como el de salida de un trabajo de MapReduce tienen la forma de pares clave-valor:

(Entrada) <k1, v1> -> mapa -> <k2, v2> -> reducir -> <k3, v3> (Salida).

Entrada Salida
Mapa <k1, v1> lista (<k2, v2>)
Reducir <k2, lista (v2)> lista (<k3, v3>)

Implementación de MapReduce

La siguiente tabla muestra los datos relativos al consumo eléctrico de una organización. La tabla incluye el consumo eléctrico mensual y el promedio anual durante cinco años consecutivos.

ene feb mar abr Mayo jun jul ago sep oct nov dic Promedio
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Necesitamos escribir aplicaciones para procesar los datos de entrada en la tabla dada para encontrar el año de uso máximo, el año de uso mínimo, etc. Esta tarea es fácil para los programadores con una cantidad finita de registros, ya que simplemente escribirán la lógica para producir la salida requerida y pasarán los datos a la aplicación escrita.

Elevemos ahora la escala de los datos de entrada. Supongamos que tenemos que analizar el consumo eléctrico de todas las industrias a gran escala de un estado en particular. Cuando escribimos aplicaciones para procesar datos masivos,

  • Llevarán mucho tiempo ejecutarlos.

  • Habrá mucho tráfico en la red cuando transfiramos datos desde la fuente al servidor de red.

Para solucionar estos problemas, tenemos el framework MapReduce.

Los datos de entrada

Los datos anteriores se guardan como sample.txty dado como entrada. El archivo de entrada se ve como se muestra a continuación.

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Programa de ejemplo

El siguiente programa para los datos de muestra utiliza el marco MapReduce.

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

Guarde el programa anterior en ProcessUnits.java. A continuación se detalla la compilación y ejecución del programa.

Programa de compilación y ejecución de ProcessUnits

Supongamos que estamos en el directorio de inicio del usuario de Hadoop (por ejemplo, / home / hadoop).

Siga los pasos que se indican a continuación para compilar y ejecutar el programa anterior.

Step 1 - Utilice el siguiente comando para crear un directorio para almacenar las clases de Java compiladas.

$ mkdir units

Step 2- Descarga Hadoop-core-1.2.1.jar, que se utiliza para compilar y ejecutar el programa MapReduce. Descargue el archivo jar de mvnrepository.com . Supongamos que la carpeta de descarga es / home / hadoop /.

Step 3 - Los siguientes comandos se utilizan para compilar ProcessUnits.java programa y para crear un frasco para el programa.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4 - El siguiente comando se utiliza para crear un directorio de entrada en HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - El siguiente comando se usa para copiar el archivo de entrada llamado sample.txt en el directorio de entrada de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6 - El siguiente comando se usa para verificar los archivos en el directorio de entrada

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - El siguiente comando se usa para ejecutar la aplicación Eleunit_max tomando archivos de entrada del directorio de entrada.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Espere un momento hasta que se ejecute el archivo. Después de la ejecución, la salida contiene una serie de divisiones de entrada, tareas de mapa, tareas de reducción, etc.

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

Step 8 - El siguiente comando se utiliza para verificar los archivos resultantes en la carpeta de salida.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 - El siguiente comando se usa para ver la salida en Part-00000archivo. Este archivo es generado por HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

A continuación se muestra la salida generada por el programa MapReduce:

1981 34
1984 40
1985 45

Step 10 - El siguiente comando se utiliza para copiar la carpeta de salida de HDFS al sistema de archivos local.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop