java - tutorial - Buscar amigo#de todos los usuarios: ¿cómo implementarlo con Hadoop Mapreduce?
mapreduce mongodb (4)
1) Introducción / Problema
Antes de seguir adelante con el controlador del trabajo, es importante comprender que, con un enfoque simple, los valores de los reductores deben clasificarse en orden ascendente. La primera idea es pasar la lista de valores sin clasificar y hacer una clasificación en el reductor por tecla. Esto tiene dos desventajas:
1) Probablemente no sea eficiente para grandes listas de valores
y
2) ¿Cómo sabrá el marco si (1,4) es igual a (4,1) si estos pares se procesan en diferentes partes del clúster?
2) Solución en teoría
La forma de hacerlo en Hadoop es "burlarse" del framework creando una clave sintética .
Entonces nuestra función de mapa en lugar de "conceptualmente más apropiada" (si puedo decir eso)
map(k1, v1) -> list(k2, v2)
es el siguiente:
map(k1, v1) -> list(ksynthetic, null)
Como nota, descartamos el uso de valores (el reductor aún obtiene una lista de valores null
, pero realmente no nos importan). Lo que ocurre aquí es que estos valores están realmente incluidos en ksynthetic
. Aquí hay un ejemplo del problema en cuestión:
`map(1, 2) -> list([1,2], null)
Sin embargo, es necesario realizar algunas operaciones más para que las claves se agrupen y particionen de forma adecuada y logremos el resultado correcto en el reductor.
3) Implementación de Hadoop
Implementaremos una clase llamada FFGroupKeyComparator
y una clase FindFriendPartitioner
.
Aquí está nuestro FFGroupKeyComparator
:
public static class FFGroupComparator extends WritableComparator
{
protected FFGroupComparator()
{
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
Text t1 = (Text) w1;
Text t2 = (Text) w2;
String[] t1Items = t1.toString().split(",");
String[] t2Items = t2.toString().split(",");
String t1Base = t1Items[0];
String t2Base = t2Items[0];
int comp = t1Base.compareTo(t2Base); // We compare using "real" key part of our synthetic key
return comp;
}
}
Esta clase actuará como nuestra clase de comparación de agrupamiento. Controla qué teclas se agrupan para una sola llamada a Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
Esto es muy importante ya que garantiza que cada reductor obtenga las claves sintéticas apropiadas (a juzgar por la verdadera clave).
Debido al hecho de que Hadoop se ejecuta en un clúster con muchos nodos, es importante asegurarse de que haya tantas tareas de reducción como particiones. Su número debe ser el mismo que el de las claves reales (no sintéticas). Entonces, generalmente hacemos esto con valores hash. En nuestro caso, lo que tenemos que hacer es calcular la partición a la que pertenece una clave sintética basada en el valor hash de la clave real (antes de la coma). Entonces nuestro FindFriendPartitioner
es el siguiente:
public static class FindFriendPartitioner extends Partitioner implements Configurable
{
@Override
public int getPartition(Text key, Text NullWritable, int numPartitions)
{
String[] keyItems = key.toString().split(",");
String keyBase = keyItems[0];
int part = keyBase.hashCode() % numPartitions;
return part;
}
Entonces ahora estamos listos para escribir el trabajo real y resolver nuestro problema.
Supongo que su archivo de entrada se ve así:
1,2
2,1
1,3
3,2
2,4
4,1
Utilizaremos TextInputFormat
.
Aquí está el código para el controlador de trabajo que usa Hadoop 1.0.4:
public class FindFriendTwo
{
public static class FindFriendMapper extends Mapper<Object, Text, Text, NullWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
context.write(value, new NullWritable() );
String tempStrings[] = value.toString().split(",");
Text value2 = new Text(tempStrings[1] + "," + tempStrings[0]); //reverse relationship
context.write(value2, new NullWritable());
}
}
Tenga en cuenta que también pasamos las relaciones inversas en la función de map
.
Por ejemplo, si la cadena de entrada es (1,4) no debemos olvidar (4,1).
public static class FindFriendReducer extends Reducer<Text, NullWritable, IntWritable, IntWritable> {
private Set<String> friendsSet;
public void setup(Context context)
{
friendSet = new LinkedHashSet<String>();
}
public void reduce(Text syntheticKey, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
String tempKeys[] = syntheticKey.toString().split(",");
friendsSet.add(tempKeys[1]);
if( friendsList.size() == 2 )
{
IntWritable key = Integer.parseInt(tempKeys[0]);
IntWritable value = Integer.parseInt(tempKeys[1]);
write(key, value);
}
}
}
Finalmente, debemos recordar incluir lo siguiente en nuestra clase principal, para que el marco utilice nuestras clases.
jobConf.setGroupingComparatorClass(FFGroupComparator.class);
jobConf.setPartitionerClass(FindFriendPartitioner.class);
Digamos que tengo una entrada de la siguiente manera:
(1,2)(2,1)(1,3)(3,2)(2,4)(4,1)
Ouput se espera de la siguiente manera:
(1,(2,3,4)) -> (1,3) //second index is total friend #
(2,(1,3,4)) -> (2,3)
(3,(1,2)) -> (3,2)
(4,(1,2)) -> (4,2)
Sé cómo hacer esto con hashset en java. Pero no sé cómo funciona esto con el modelo mapreduce. ¿Alguien puede arrojar alguna idea o código de muestra sobre este problema? Apreciaré esto.
-------------------------------------------------- ----------------------------------
Aquí está mi solución ingenua: 1 mapeador, dos reductor. El mapeador organizará la entrada (1,2), (2,1), (1,3);
Organice la salida como
* (1, hashset <2>), (2, hashSet <1>), (1, hashset <2>), (2, hashset <1>), (1, hashset <3>), (3, hashset <1>). *
Reducer1 :
tomar la salida de mapper como entrada y salida como:
* (1, hashset <2,3>), (3, hashset <1>) y (2, hashset <1>) *
Reducer2 :
tomar la salida del reducer1 como entrada y salida como:
* (1,2), (3,1) y (2,1) *
Esta es solo mi solución ingenua. No estoy seguro de si esto puede hacerse por el código de hadoop.
Creo que debería haber una manera fácil de resolver este problema.
Mapper Input: (1,2)(2,1)(1,3)(3,2)(2,4)(4,1)
Simplemente emita dos registros por cada par como este:
Mapper Output/ Reducer Input:
Key => Value
1 => 2
2 => 1
2 => 1
1 => 2
1 => 3
3 => 1
3 => 2
2 => 3
2 => 4
4 => 2
4 => 1
1 => 1
En el lado del reductor, obtendrás 4 grupos diferentes como este:
Reducer Output:
Key => Values
1 => [2,3,4]
2 => [1,3,4]
3 => [1,2]
4 => [1,2]
Ahora, eres bueno para formatear tu resultado como quieras. :) Avíseme si alguien puede ver algún problema en este enfoque
Me acercaría a este problema de la siguiente manera.
- Asegúrate de tener todas las relaciones y tenerlas exactamente una vez cada una.
- Simplemente cuente el
Notas sobre mi enfoque:
- Mi notación para pares de valores clave es: K -> V
- Tanto la clave como el valor son casi siempre una estructura de datos (no solo una cadena o int)
- Nunca uso la clave para datos. La clave está SOLAMENTE allí para controlar el flujo de los cartógrafos hacia el reductor derecho. En todos los demás lugares, no miro la clave en absoluto. El marco requiere una clave en todas partes. Con ''()'' quiero decir que hay una clave que ignoro por completo.
- La clave de mi enfoque es que nunca necesita "todos los amigos" en la memoria en el mismo momento (por lo que también funciona en las situaciones realmente grandes).
Comenzamos con una gran cantidad de
(x,y)
y sabemos que no tenemos todas las relaciones en el conjunto de datos.
Mapper: Crea todas las relaciones
Input: () -> (x,y)
Output: (x,y) -> (x,y)
(y,x) -> (y,x)
Reducer: elimina los duplicados (simplemente solo saca el primero del iterador)
Input: (x,y) -> [(x,y),(x,y),(x,y),(x,y),.... ]
Output: () -> (x,y)
Mapper: "Wordcount"
Input: () -> (x,y)
Output: (x) -> (x,1)
Reductor: cuentelos
Input: (x) -> [(x,1),(x,1),(x,1),(x,1),.... ]
Output: () -> (x,N)
Con la ayuda de tantos excelentes ingenieros, finalmente probé la solución.
Solo un Mapper y un Reducer. Sin combinador aquí.
entrada de Mapper:
1,2
2,1
1,3
3,1
3,2
3,4
5,1
Salida de Mapper:
1,2
2,1
1,2
2,1
1,3
3,1
1,3
3,1
4,3
3,4
1,5
5,1
Salida del reductor:
1 3
2 2
3 3
4 1
5 1
El primer col es usuario, el segundo es friend #.
En la etapa de reducción, agrego hashSet al análisis del asistente. Gracias @Artem Tsikiridis @Ashish Tu respuesta me dio una buena pista.
Editado:
Código agregado:
// mapper
public static class TokenizerMapper extends
Mapper<Object, Text, Text, Text> {
private Text word1 = new Text();
private Text word2 = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line,",");
if(itr.hasMoreElements()){
word1.set(itr.nextToken().toLowerCase());
}
if(itr.hasMoreElements()){
word2.set(itr.nextToken().toLowerCase());
}
context.write(word1, word2);
context.write(word2, word1);
//
}}
// reductor
public static class IntSumReducer extends
Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
HashSet<Text> set = new HashSet<Text>();
int sum = 0;
for (Text val : values) {
if(!set.contains(val)){
set.add(val);
sum++;
}
}
result.set(sum);
context.write(key, result);
}
}