outer - spark java tutorial
Apache Spark se une al ejemplo con Java (1)
Cuando hace la combinación externa izquierda y la combinación externa derecha, puede tener valores nulos. ¡derecho!
Así que la chispa devuelve el objeto opcional. después de obtener ese resultado, puede asignar ese resultado a su propio formato.
puedes usar el método isPresent () de Opcional para mapear tus datos.
Aquí está el ejemplo:
JavaPairRDD<String,String> firstRDD = ....
JavaPairRDD<String,String> secondRDD =....
// join both rdd using left outerjoin
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD);
// mapping of join result
JavaPairRDD<String, String> mappedRDD = rddWithJoin
.mapToPair(tuple -> {
if (tuple._2()._2().isPresent()) {
//do your operation and return
return new Tuple2<String, String>(tuple._1(), tuple._2()._1());
} else {
return new Tuple2<String, String>(tuple._1(), "not present");
}
});
Soy muy nuevo en Apache Spark. De hecho, me gustaría centrarme en la especificación Spark API básica y quiero entender y escribir algunos programas usando Spark API. He escrito un programa de Java usando Apache Spark para implementar el concepto de uniones.
Cuando uso Left Outer Join - leftOuterJoin () o Right Outer Join - rightOuterJoin (), ambos métodos devuelven una JavaPairRDD que contiene un tipo especial Google Options. Pero no sé cómo extraer los valores originales del tipo Opcional.
De todos modos, me gustaría saber si puedo usar los mismos métodos de unión que devuelven los datos en mi propio formato. No encontré ninguna manera de hacer eso. Lo que quiero decir es que cuando uso Apache Spark, no puedo personalizar el código en mi propio estilo ya que ya han dado todas las cosas predefinidas.
Por favor encuentre el código a continuación
my 2 sample input datasets
customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter
and
trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit
Aquí está mi código de Java
**SparkJoins.java:**
public class SparkJoins {
@SuppressWarnings("serial")
public static void main(String[] args) throws FileNotFoundException {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] customerSplit = s.split(",");
return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
}
}).distinct();
JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] transactionSplit = s.split(",");
return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
}
});
//Default Join operation (Inner join)
JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
System.out.println("Joins function Output: "+joinsOutput.collect());
//Left Outer join operation
JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());
//Right Outer join operation
JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());
sc.close();
}
}
Y aquí el resultado que obtengo
Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]
LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]
RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]
Estoy ejecutando este programa en la plataforma de Windows
Por favor, observe el resultado anterior y ayúdeme a extraer los valores del tipo Opcional
Gracias por adelantado