MapReduce - Particionador

Un particionador funciona como una condición para procesar un conjunto de datos de entrada. La fase de partición tiene lugar después de la fase Mapa y antes de la fase Reducir.

El número de particionadores es igual al número de reductores. Eso significa que un particionador dividirá los datos según el número de reductores. Por lo tanto, los datos transmitidos desde un solo particionador son procesados ​​por un solo Reductor.

Particionador

Un particionador divide los pares clave-valor de salidas Map intermedias. Divide los datos utilizando una condición definida por el usuario, que funciona como una función hash. El número total de particiones es el mismo que el número de tareas de Reductor para el trabajo. Tomemos un ejemplo para entender cómo funciona el particionador.

Implementación de MapReduce Partitioner

Por conveniencia, supongamos que tenemos una pequeña tabla llamada Empleado con los siguientes datos. Usaremos estos datos de muestra como nuestro conjunto de datos de entrada para demostrar cómo funciona el particionador.

Carné de identidad Nombre Años Género Salario
1201 gopal 45 Masculino 50.000
1202 manisha 40 Hembra 50.000
1203 khalil 34 Masculino 30.000
1204 prasanth 30 Masculino 30.000
1205 Kiran 20 Masculino 40.000
1206 laxmi 25 Hembra 35.000
1207 bhavya 20 Hembra 15.000
1208 reshma 19 Hembra 15.000
1209 kranthi 22 Masculino 22.000
1210 Satish 24 Masculino 25.000
1211 Krishna 25 Masculino 25.000
1212 Arshad 28 Masculino 20.000
1213 lavanya 18 Hembra 8.000

Tenemos que escribir una aplicación para procesar el conjunto de datos de entrada para encontrar el empleado con el salario más alto por género en diferentes grupos de edad (por ejemplo, menos de 20, entre 21 y 30, más de 30).

Los datos de entrada

Los datos anteriores se guardan como input.txt en el directorio “/ home / hadoop / hadoopPartitioner” y se proporciona como entrada.

1201 gopal 45 Masculino 50000
1202 manisha 40 Hembra 51000
1203 khaleel 34 Masculino 30000
1204 prasanth 30 Masculino 31000
1205 Kiran 20 Masculino 40000
1206 laxmi 25 Hembra 35000
1207 bhavya 20 Hembra 15000
1208 reshma 19 Hembra 14000
1209 kranthi 22 Masculino 22000
1210 Satish 24 Masculino 25000
1211 Krishna 25 Masculino 26000
1212 Arshad 28 Masculino 20000
1213 lavanya 18 Hembra 8000

Basado en la entrada dada, a continuación se muestra la explicación algorítmica del programa.

Tareas de mapas

La tarea de mapa acepta los pares clave-valor como entrada mientras tenemos los datos de texto en un archivo de texto. La entrada para esta tarea de mapa es la siguiente:

Input - La clave sería un patrón como "cualquier clave especial + nombre de archivo + número de línea" (ejemplo: clave = @ input1) y el valor serían los datos en esa línea (ejemplo: valor = 1201 \ t gopal \ t 45 \ t Hombre \ t 50000).

Method - El funcionamiento de esta tarea de mapa es el siguiente -

  • Leer el value (datos de registro), que viene como valor de entrada de la lista de argumentos en una cadena.

  • Usando la función de división, separe el género y almacénelo en una variable de cadena.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Envíe la información de género y los datos de registro value como par clave-valor de salida de la tarea de mapa al partition task.

context.write(new Text(gender), new Text(value));
  • Repita todos los pasos anteriores para todos los registros del archivo de texto.

Output - Obtendrá los datos de género y el valor de los datos de registro como pares clave-valor.

Tarea del particionador

La tarea del particionador acepta los pares clave-valor de la tarea de mapa como entrada. La partición implica dividir los datos en segmentos. De acuerdo con los criterios condicionales dados de las particiones, los datos emparejados de clave-valor de entrada se pueden dividir en tres partes según los criterios de edad.

Input - Todos los datos en una colección de pares clave-valor.

clave = Valor del campo de género en el registro.

valor = valor de datos de registro completo de ese género.

Method - El proceso de lógica de partición se ejecuta de la siguiente manera.

  • Lea el valor del campo de edad del par clave-valor de entrada.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Verifique el valor de edad con las siguientes condiciones.

    • Edad menor o igual a 20
    • Edad mayor de 20 y menor o igual a 30.
    • Edad mayor de 30.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output- Todos los datos de los pares clave-valor se segmentan en tres colecciones de pares clave-valor. El Reductor trabaja individualmente en cada colección.

Reducir tareas

El número de tareas del particionador es igual al número de tareas del reductor. Aquí tenemos tres tareas de partición y, por lo tanto, tenemos tres tareas de Reductor para ejecutar.

Input - Reducer se ejecutará tres veces con una colección diferente de pares clave-valor.

clave = valor del campo de género en el registro.

valor = todos los datos del registro de ese género.

Method - Se aplicará la siguiente lógica en cada colección.

  • Lea el valor del campo Salario de cada registro.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Verifica el salario con la variable max. Si str [4] es el salario máximo, asigne str [4] a max; de lo contrario, omita el paso.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Repita los pasos 1 y 2 para cada colección de llaves (Masculino y Femenino son las colecciones de llaves). Después de ejecutar estos tres pasos, encontrará un salario máximo de la colección de llaves masculinas y un salario máximo de la colección de llaves femeninas.

context.write(new Text(key), new IntWritable(max));

Output- Finalmente, obtendrá un conjunto de datos de pares clave-valor en tres colecciones de diferentes grupos de edad. Contiene el salario máximo de la colección masculina y el salario máximo de la colección femenina en cada grupo de edad, respectivamente.

Después de ejecutar las tareas Mapa, Particionador y Reducir, las tres colecciones de datos de pares clave-valor se almacenan en tres archivos diferentes como salida.

Las tres tareas se tratan como trabajos de MapReduce. Los siguientes requisitos y especificaciones de estos trabajos deben especificarse en las Configuraciones:

  • Nombre del trabajo
  • Formatos de entrada y salida de claves y valores
  • Clases individuales para tareas de mapa, reducción y particionamiento
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Programa de ejemplo

El siguiente programa muestra cómo implementar los particionadores para los criterios dados en un programa MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Guarde el código anterior como PartitionerExample.javaen "/ home / hadoop / hadoopPartitioner". A continuación se detalla la compilación y ejecución del programa.

Compilación y ejecución

Supongamos que estamos en el directorio de inicio del usuario de Hadoop (por ejemplo, / home / hadoop).

Siga los pasos que se indican a continuación para compilar y ejecutar el programa anterior.

Step 1- Descarga Hadoop-core-1.2.1.jar, que se utiliza para compilar y ejecutar el programa MapReduce. Puede descargar el archivo jar desde mvnrepository.com .

Supongamos que la carpeta descargada es "/ home / hadoop / hadoopPartitioner"

Step 2 - Los siguientes comandos se utilizan para compilar el programa PartitionerExample.java y creando un frasco para el programa.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 - Utilice el siguiente comando para crear un directorio de entrada en HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 - Utilice el siguiente comando para copiar el archivo de entrada llamado input.txt en el directorio de entrada de HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 - Utilice el siguiente comando para verificar los archivos en el directorio de entrada.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 - Utilice el siguiente comando para ejecutar la aplicación de salario superior tomando archivos de entrada del directorio de entrada.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Espere un momento hasta que se ejecute el archivo. Después de la ejecución, la salida contiene una serie de divisiones de entrada, tareas de mapa y tareas de Reductor.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Step 7 - Utilice el siguiente comando para verificar los archivos resultantes en la carpeta de salida.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Encontrará la salida en tres archivos porque está utilizando tres particionadores y tres Reductores en su programa.

Step 8 - Utilice el siguiente comando para ver la salida en Part-00000archivo. Este archivo es generado por HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Utilice el siguiente comando para ver la salida en Part-00001 archivo.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Utilice el siguiente comando para ver la salida en Part-00002 archivo.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000