spark significado scala join apache-spark rdd apache-spark-sql

scala - significado - Unir dos RDD comunes con/sin Spark SQL



spark scala sql (4)

Necesito unir dos RDDs ordinarios en una o más columnas. Lógicamente, esta operación es equivalente a la operación de combinación de base de datos de dos tablas. Me pregunto si esto es posible solo a través de Spark SQL o hay otras formas de hacerlo.

Como ejemplo concreto, considere RDD r1 con la clave primaria ITEM_ID :

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)

y RDD r2 con clave principal COMPANY_ID :

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)

Quiero unirme a r1 y r2 .

¿Cómo puede hacerse esto?


Algo como esto debería funcionar.

scala> case class Item(id:String, name:String, unit:Int, companyId:String) scala> case class Company(companyId:String, name:String, city:String) scala> val i1 = Item("1", "first", 2, "c1") scala> val i2 = i1.copy(id="2", name="second") scala> val i3 = i1.copy(id="3", name="third", companyId="c2") scala> val items = sc.parallelize(List(i1,i2,i3)) items: org.apache.spark.rdd.RDD[Item] = ParallelCollectionRDD[14] at parallelize at <console>:20 scala> val c1 = Company("c1", "company-1", "city-1") scala> val c2 = Company("c2", "company-2", "city-2") scala> val companies = sc.parallelize(List(c1,c2)) scala> val groupedItems = items.groupBy( x => x.companyId) groupedItems: org.apache.spark.rdd.RDD[(String, Iterable[Item])] = ShuffledRDD[16] at groupBy at <console>:22 scala> val groupedComp = companies.groupBy(x => x.companyId) groupedComp: org.apache.spark.rdd.RDD[(String, Iterable[Company])] = ShuffledRDD[18] at groupBy at <console>:20 scala> groupedItems.join(groupedComp).take(10).foreach(println) 14/12/12 00:52:32 INFO DAGScheduler: Job 5 finished: take at <console>:35, took 0.021870 s (c1,(CompactBuffer(Item(1,first,2,c1), Item(2,second,2,c1)),CompactBuffer(Company(c1,company-1,city-1)))) (c2,(CompactBuffer(Item(3,third,2,c2)),CompactBuffer(Company(c2,company-2,city-2))))


Soumya Simanta dio una buena respuesta. Sin embargo, los valores en el RDD unido son Iterable , por lo que los resultados pueden no ser muy similares a la unión de tabla ordinaria.

Alternativamente, puedes:

val mappedItems = items.map(item => (item.companyId, item)) val mappedComp = companies.map(comp => (comp.companyId, comp)) mappedItems.join(mappedComp).take(10).foreach(println)

La salida sería:

(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1))) (c1,(Item(2,second,2,c1),Company(c1,company-1,city-1))) (c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))


(Usando Scala) Supongamos que tiene dos RDD:

  • emp: (empid, ename, departamento)

  • departamento: (dname, departamento)

Lo siguiente es otra forma:

//val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30))) val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30))) val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40))) //val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2)) val shifted_fields_emp = emp.map(t => (t._2, t._1)) val shifted_fields_dept = dept.map(t => (t._2,t._1)) shifted_fields_emp.join(shifted_fields_dept) // Create emp RDD val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30))) // Create dept RDD val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40))) // Establishing that the third field is to be considered as the Key for the emp RDD val manipulated_emp = emp.keyBy(t => t._3) // Establishing that the second field need to be considered as the Key for dept RDD val manipulated_dept = dept.keyBy(t => t._2) // Inner Join val join_data = manipulated_emp.join(manipulated_dept) // Left Outer Join val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept) // Right Outer Join val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept) // Full Outer Join val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept) // Formatting the Joined Data for better understandable (using map) val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1))

Esto dará la salida como:

// Imprime la salida cleaned_joined_data en la consola

scala> cleaned_joined_data.collect() res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))


Spark SQL puede realizar join en SPARK RDDs.

A continuación, el código realiza una combinación de SQL en RDD de empresa y artículos

object SparkSQLJoin { case class Item(id:String, name:String, unit:Int, companyId:String) case class Company(companyId:String, name:String, city:String) def main(args: Array[String]) { val sparkConf = new SparkConf() val sc= new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.createSchemaRDD val i1 = Item("1", "first", 1, "c1") val i2 = Item("2", "second", 2, "c2") val i3 = Item("3", "third", 3, "c3") val c1 = Company("c1", "company-1", "city-1") val c2 = Company("c2", "company-2", "city-2") val companies = sc.parallelize(List(c1,c2)) companies.registerAsTable("companies") val items = sc.parallelize(List(i1,i2,i3)) items.registerAsTable("items") val result = sqlContext.sql("SELECT * FROM companies C JOIN items I ON C.companyId= I.companyId").collect result.foreach(println) } }

La salida se muestra como

[c1,company-1,city-1,1,first,1,c1] [c2,company-2,city-2,2,second,2,c2]