MapReduce - Particionador
Un particionador funciona como una condición para procesar un conjunto de datos de entrada. La fase de partición tiene lugar después de la fase Mapa y antes de la fase Reducir.
El número de particionadores es igual al número de reductores. Eso significa que un particionador dividirá los datos según el número de reductores. Por lo tanto, los datos transmitidos desde un solo particionador son procesados por un solo Reductor.
Particionador
Un particionador divide los pares clave-valor de salidas Map intermedias. Divide los datos utilizando una condición definida por el usuario, que funciona como una función hash. El número total de particiones es el mismo que el número de tareas de Reductor para el trabajo. Tomemos un ejemplo para entender cómo funciona el particionador.
Implementación de MapReduce Partitioner
Por conveniencia, supongamos que tenemos una pequeña tabla llamada Empleado con los siguientes datos. Usaremos estos datos de muestra como nuestro conjunto de datos de entrada para demostrar cómo funciona el particionador.
Carné de identidad | Nombre | Años | Género | Salario |
---|---|---|---|---|
1201 | gopal | 45 | Masculino | 50.000 |
1202 | manisha | 40 | Hembra | 50.000 |
1203 | khalil | 34 | Masculino | 30.000 |
1204 | prasanth | 30 | Masculino | 30.000 |
1205 | Kiran | 20 | Masculino | 40.000 |
1206 | laxmi | 25 | Hembra | 35.000 |
1207 | bhavya | 20 | Hembra | 15.000 |
1208 | reshma | 19 | Hembra | 15.000 |
1209 | kranthi | 22 | Masculino | 22.000 |
1210 | Satish | 24 | Masculino | 25.000 |
1211 | Krishna | 25 | Masculino | 25.000 |
1212 | Arshad | 28 | Masculino | 20.000 |
1213 | lavanya | 18 | Hembra | 8.000 |
Tenemos que escribir una aplicación para procesar el conjunto de datos de entrada para encontrar el empleado con el salario más alto por género en diferentes grupos de edad (por ejemplo, menos de 20, entre 21 y 30, más de 30).
Los datos de entrada
Los datos anteriores se guardan como input.txt en el directorio “/ home / hadoop / hadoopPartitioner” y se proporciona como entrada.
1201 | gopal | 45 | Masculino | 50000 |
1202 | manisha | 40 | Hembra | 51000 |
1203 | khaleel | 34 | Masculino | 30000 |
1204 | prasanth | 30 | Masculino | 31000 |
1205 | Kiran | 20 | Masculino | 40000 |
1206 | laxmi | 25 | Hembra | 35000 |
1207 | bhavya | 20 | Hembra | 15000 |
1208 | reshma | 19 | Hembra | 14000 |
1209 | kranthi | 22 | Masculino | 22000 |
1210 | Satish | 24 | Masculino | 25000 |
1211 | Krishna | 25 | Masculino | 26000 |
1212 | Arshad | 28 | Masculino | 20000 |
1213 | lavanya | 18 | Hembra | 8000 |
Basado en la entrada dada, a continuación se muestra la explicación algorítmica del programa.
Tareas de mapas
La tarea de mapa acepta los pares clave-valor como entrada mientras tenemos los datos de texto en un archivo de texto. La entrada para esta tarea de mapa es la siguiente:
Input - La clave sería un patrón como "cualquier clave especial + nombre de archivo + número de línea" (ejemplo: clave = @ input1) y el valor serían los datos en esa línea (ejemplo: valor = 1201 \ t gopal \ t 45 \ t Hombre \ t 50000).
Method - El funcionamiento de esta tarea de mapa es el siguiente -
Leer el value (datos de registro), que viene como valor de entrada de la lista de argumentos en una cadena.
Usando la función de división, separe el género y almacénelo en una variable de cadena.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
Envíe la información de género y los datos de registro value como par clave-valor de salida de la tarea de mapa al partition task.
context.write(new Text(gender), new Text(value));
Repita todos los pasos anteriores para todos los registros del archivo de texto.
Output - Obtendrá los datos de género y el valor de los datos de registro como pares clave-valor.
Tarea del particionador
La tarea del particionador acepta los pares clave-valor de la tarea de mapa como entrada. La partición implica dividir los datos en segmentos. De acuerdo con los criterios condicionales dados de las particiones, los datos emparejados de clave-valor de entrada se pueden dividir en tres partes según los criterios de edad.
Input - Todos los datos en una colección de pares clave-valor.
clave = Valor del campo de género en el registro.
valor = valor de datos de registro completo de ese género.
Method - El proceso de lógica de partición se ejecuta de la siguiente manera.
- Lea el valor del campo de edad del par clave-valor de entrada.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
Verifique el valor de edad con las siguientes condiciones.
- Edad menor o igual a 20
- Edad mayor de 20 y menor o igual a 30.
- Edad mayor de 30.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output- Todos los datos de los pares clave-valor se segmentan en tres colecciones de pares clave-valor. El Reductor trabaja individualmente en cada colección.
Reducir tareas
El número de tareas del particionador es igual al número de tareas del reductor. Aquí tenemos tres tareas de partición y, por lo tanto, tenemos tres tareas de Reductor para ejecutar.
Input - Reducer se ejecutará tres veces con una colección diferente de pares clave-valor.
clave = valor del campo de género en el registro.
valor = todos los datos del registro de ese género.
Method - Se aplicará la siguiente lógica en cada colección.
- Lea el valor del campo Salario de cada registro.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
Verifica el salario con la variable max. Si str [4] es el salario máximo, asigne str [4] a max; de lo contrario, omita el paso.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Repita los pasos 1 y 2 para cada colección de llaves (Masculino y Femenino son las colecciones de llaves). Después de ejecutar estos tres pasos, encontrará un salario máximo de la colección de llaves masculinas y un salario máximo de la colección de llaves femeninas.
context.write(new Text(key), new IntWritable(max));
Output- Finalmente, obtendrá un conjunto de datos de pares clave-valor en tres colecciones de diferentes grupos de edad. Contiene el salario máximo de la colección masculina y el salario máximo de la colección femenina en cada grupo de edad, respectivamente.
Después de ejecutar las tareas Mapa, Particionador y Reducir, las tres colecciones de datos de pares clave-valor se almacenan en tres archivos diferentes como salida.
Las tres tareas se tratan como trabajos de MapReduce. Los siguientes requisitos y especificaciones de estos trabajos deben especificarse en las Configuraciones:
- Nombre del trabajo
- Formatos de entrada y salida de claves y valores
- Clases individuales para tareas de mapa, reducción y particionamiento
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Programa de ejemplo
El siguiente programa muestra cómo implementar los particionadores para los criterios dados en un programa MapReduce.
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
Guarde el código anterior como PartitionerExample.javaen "/ home / hadoop / hadoopPartitioner". A continuación se detalla la compilación y ejecución del programa.
Compilación y ejecución
Supongamos que estamos en el directorio de inicio del usuario de Hadoop (por ejemplo, / home / hadoop).
Siga los pasos que se indican a continuación para compilar y ejecutar el programa anterior.
Step 1- Descarga Hadoop-core-1.2.1.jar, que se utiliza para compilar y ejecutar el programa MapReduce. Puede descargar el archivo jar desde mvnrepository.com .
Supongamos que la carpeta descargada es "/ home / hadoop / hadoopPartitioner"
Step 2 - Los siguientes comandos se utilizan para compilar el programa PartitionerExample.java y creando un frasco para el programa.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
Step 3 - Utilice el siguiente comando para crear un directorio de entrada en HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 4 - Utilice el siguiente comando para copiar el archivo de entrada llamado input.txt en el directorio de entrada de HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Step 5 - Utilice el siguiente comando para verificar los archivos en el directorio de entrada.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 6 - Utilice el siguiente comando para ejecutar la aplicación de salario superior tomando archivos de entrada del directorio de entrada.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Espere un momento hasta que se ejecute el archivo. Después de la ejecución, la salida contiene una serie de divisiones de entrada, tareas de mapa y tareas de Reductor.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
Step 7 - Utilice el siguiente comando para verificar los archivos resultantes en la carpeta de salida.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Encontrará la salida en tres archivos porque está utilizando tres particionadores y tres Reductores en su programa.
Step 8 - Utilice el siguiente comando para ver la salida en Part-00000archivo. Este archivo es generado por HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in Part-00000
Female 15000
Male 40000
Utilice el siguiente comando para ver la salida en Part-00001 archivo.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in Part-00001
Female 35000
Male 31000
Utilice el siguiente comando para ver la salida en Part-00002 archivo.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in Part-00002
Female 51000
Male 50000