spark read functions example espaƱol scala apache-spark dataframe spark-dataframe apache-spark-sql

scala - read - Unirse a los marcos de datos de Spark en la clave



spark sql functions (5)

He construido dos marcos de datos. ¿Cómo podemos unir múltiples marcos de datos de Spark?

Por ejemplo :

PersonDf , ProfileDf con una columna común como personId como (clave). Ahora, ¿cómo podemos tener un Dataframe que combine PersonDf y ProfileDf ?


Enfoque de alias usando scala ( este es un ejemplo dado para la versión anterior de spark para spark 2.x vea mi otra respuesta ):

Puede usar la clase de caso para preparar un conjunto de datos de muestra ... que es opcional para ex: también puede obtener DataFrame de hiveContext.sql ...

import org.apache.spark.sql.functions.col case class Person(name: String, age: Int, personid : Int) case class Profile(name: String, personid : Int , profileDescription: String) val df1 = sqlContext.createDataFrame( Person("Bindu",20, 2) :: Person("Raphel",25, 5) :: Person("Ram",40, 9):: Nil) val df2 = sqlContext.createDataFrame( Profile("Spark",2, "SparkSQLMaster") :: Profile("Spark",5, "SparkGuru") :: Profile("Spark",9, "DevHunter"):: Nil ) // you can do alias to refer column name with aliases to increase readablity val df_asPerson = df1.as("dfperson") val df_asProfile = df2.as("dfprofile") val joined_df = df_asPerson.join( df_asProfile , col("dfperson.personid") === col("dfprofile.personid") , "inner") joined_df.select( col("dfperson.name") , col("dfperson.age") , col("dfprofile.name") , col("dfprofile.profileDescription")) .show

ejemplo de enfoque de tabla temporal que no me gusta personalmente ...

La razón para usar el método registerTempTable( tableName ) para un DataFrame es que, además de poder usar los métodos proporcionados por Spark de un DataFrame, también puede emitir consultas SQL a través del sqlContext.sql( sqlQuery ) , que use ese DataFrame como una tabla SQL. El parámetro tableName especifica el nombre de la tabla que se usará para ese DataFrame en las consultas SQL.

df_asPerson.registerTempTable("dfperson"); df_asProfile.registerTempTable("dfprofile") sqlContext.sql("""SELECT dfperson.name, dfperson.age, dfprofile.profileDescription FROM dfperson JOIN dfprofile ON dfperson.personid == dfprofile.personid""")

Si quieres saber más sobre las uniones, por favor mira esta bonita publicación: beyond-traditional-join-with-apache-spark

Nota: 1) Según lo mencionado por @RaphaelRoth ,

val resultDf = PersonDf.join(ProfileDf,Seq("personId")) es un buen enfoque, ya que no tiene columnas duplicadas de ambos lados si está utilizando la unión interna con la misma tabla.
2) Ejemplo de Spark 2.x actualizado en otra respuesta con un conjunto completo de operaciones de combinación compatibles con spark 2.x con ejemplos + resultado

PROPINA :

Además, lo importante en las uniones: la función de transmisión puede ayudar a dar una pista, por favor vea mi respuesta


Además de mi respuesta anterior, traté de demostrar todas las combinaciones de chispas con las mismas clases de casos usando spark 2.x aquí está mi enlace en el artículo con ejemplos completos y explicaciones .

Todos los tipos de unión: Predeterminado inner . Debe ser uno de los siguientes: inner , cross , outer , full , full_outer , left , left , left_outer , right , right , right_outer , left_semi left_anti .

import org.apache.spark.sql._ import org.apache.spark.sql.functions._ /** * @author : Ram Ghadiyaram */ object SparkJoinTypesDemo extends App { private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("ERROR") case class Person(name: String, age: Int, personid: Int) case class Profile(profileName: String, personid: Int, profileDescription: String) /** * * @param joinType Type of join to perform. Default `inner`. Must be one of: * * `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, * * `right`, `right_outer`, `left_semi`, `left_anti`. */ val joinTypes = Seq( "inner" , "outer" , "full" , "full_outer" , "left" , "left_outer" , "right" , "right_outer" , "left_semi" , "left_anti" //, "cross" ) val df1 = spark.sqlContext.createDataFrame( Person("Nataraj", 45, 2) :: Person("Srinivas", 45, 5) :: Person("Ashik", 22, 9) :: Person("Deekshita", 22, 8) :: Person("Siddhika", 22, 4) :: Person("Madhu", 22, 3) :: Person("Meghna", 22, 2) :: Person("Snigdha", 22, 2) :: Person("Harshita", 22, 6) :: Person("Ravi", 42, 0) :: Person("Ram", 42, 9) :: Person("Chidananda Raju", 35, 9) :: Person("Sreekanth Doddy", 29, 9) :: Nil) val df2 = spark.sqlContext.createDataFrame( Profile("Spark", 2, "SparkSQLMaster") :: Profile("Spark", 5, "SparkGuru") :: Profile("Spark", 9, "DevHunter") :: Profile("Spark", 3, "Evangelist") :: Profile("Spark", 0, "Committer") :: Profile("Spark", 1, "All Rounder") :: Nil ) val df_asPerson = df1.as("dfperson") val df_asProfile = df2.as("dfprofile") val joined_df = df_asPerson.join( df_asProfile , col("dfperson.personid") === col("dfprofile.personid") , "inner") println("First example inner join ") // you can do alias to refer column name with aliases to increase readability joined_df.select( col("dfperson.name") , col("dfperson.age") , col("dfprofile.profileName") , col("dfprofile.profileDescription")) .show println("all joins in a loop") joinTypes foreach { joinType => println(s"${joinType.toUpperCase()} JOIN") df_asPerson.join(right = df_asProfile, usingColumns = Seq("personid"), joinType = joinType) .orderBy("personid") .show() } println( """ |Till 1.x cross join is : df_asPerson.join(df_asProfile) | | Explicit Cross Join in 2.x : | http://blog.madhukaraphatak.com/migrating-to-spark-two-part-4/ | Cartesian joins are very expensive without an extra filter that can be pushed down. | | cross join or cartesian product | | """.stripMargin) val crossJoinDf = df_asPerson.crossJoin(right = df_asProfile) crossJoinDf.show(200, false) println(crossJoinDf.explain()) println(crossJoinDf.count) println("createOrReplaceTempView example ") println( """ |Creates a local temporary view using the given name. The lifetime of this | temporary view is tied to the [[SparkSession]] that was used to create this Dataset. """.stripMargin) df_asPerson.createOrReplaceTempView("dfperson"); df_asProfile.createOrReplaceTempView("dfprofile") val sql = s""" |SELECT dfperson.name |, dfperson.age |, dfprofile.profileDescription | FROM dfperson JOIN dfprofile | ON dfperson.personid == dfprofile.personid """.stripMargin println(s"createOrReplaceTempView sql $sql") val sqldf = spark.sql(sql) sqldf.show println( """ | |**** EXCEPT DEMO *** | """.stripMargin) println(" df_asPerson.except(df_asProfile) Except demo") df_asPerson.except(df_asProfile).show println(" df_asProfile.except(df_asPerson) Except demo") df_asProfile.except(df_asPerson).show }

Resultado:

First example inner join +---------------+---+-----------+------------------+ | name|age|profileName|profileDescription| +---------------+---+-----------+------------------+ | Nataraj| 45| Spark| SparkSQLMaster| | Srinivas| 45| Spark| SparkGuru| | Ashik| 22| Spark| DevHunter| | Madhu| 22| Spark| Evangelist| | Meghna| 22| Spark| SparkSQLMaster| | Snigdha| 22| Spark| SparkSQLMaster| | Ravi| 42| Spark| Committer| | Ram| 42| Spark| DevHunter| |Chidananda Raju| 35| Spark| DevHunter| |Sreekanth Doddy| 29| Spark| DevHunter| +---------------+---+-----------+------------------+ all joins in a loop INNER JOIN +--------+---------------+---+-----------+------------------+ |personid| name|age|profileName|profileDescription| +--------+---------------+---+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 2| Meghna| 22| Spark| SparkSQLMaster| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 5| Srinivas| 45| Spark| SparkGuru| | 9| Ram| 42| Spark| DevHunter| | 9| Ashik| 22| Spark| DevHunter| | 9|Chidananda Raju| 35| Spark| DevHunter| | 9|Sreekanth Doddy| 29| Spark| DevHunter| +--------+---------------+---+-----------+------------------+ OUTER JOIN +--------+---------------+----+-----------+------------------+ |personid| name| age|profileName|profileDescription| +--------+---------------+----+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 1| null|null| Spark| All Rounder| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 2| Meghna| 22| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 4| Siddhika| 22| null| null| | 5| Srinivas| 45| Spark| SparkGuru| | 6| Harshita| 22| null| null| | 8| Deekshita| 22| null| null| | 9| Ashik| 22| Spark| DevHunter| | 9| Ram| 42| Spark| DevHunter| | 9|Chidananda Raju| 35| Spark| DevHunter| | 9|Sreekanth Doddy| 29| Spark| DevHunter| +--------+---------------+----+-----------+------------------+ FULL JOIN +--------+---------------+----+-----------+------------------+ |personid| name| age|profileName|profileDescription| +--------+---------------+----+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 1| null|null| Spark| All Rounder| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 2| Meghna| 22| Spark| SparkSQLMaster| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 4| Siddhika| 22| null| null| | 5| Srinivas| 45| Spark| SparkGuru| | 6| Harshita| 22| null| null| | 8| Deekshita| 22| null| null| | 9| Ashik| 22| Spark| DevHunter| | 9| Ram| 42| Spark| DevHunter| | 9|Sreekanth Doddy| 29| Spark| DevHunter| | 9|Chidananda Raju| 35| Spark| DevHunter| +--------+---------------+----+-----------+------------------+ FULL_OUTER JOIN +--------+---------------+----+-----------+------------------+ |personid| name| age|profileName|profileDescription| +--------+---------------+----+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 1| null|null| Spark| All Rounder| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 2| Meghna| 22| Spark| SparkSQLMaster| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 4| Siddhika| 22| null| null| | 5| Srinivas| 45| Spark| SparkGuru| | 6| Harshita| 22| null| null| | 8| Deekshita| 22| null| null| | 9| Ashik| 22| Spark| DevHunter| | 9| Ram| 42| Spark| DevHunter| | 9|Chidananda Raju| 35| Spark| DevHunter| | 9|Sreekanth Doddy| 29| Spark| DevHunter| +--------+---------------+----+-----------+------------------+ LEFT JOIN +--------+---------------+---+-----------+------------------+ |personid| name|age|profileName|profileDescription| +--------+---------------+---+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 2| Meghna| 22| Spark| SparkSQLMaster| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 4| Siddhika| 22| null| null| | 5| Srinivas| 45| Spark| SparkGuru| | 6| Harshita| 22| null| null| | 8| Deekshita| 22| null| null| | 9| Ram| 42| Spark| DevHunter| | 9| Ashik| 22| Spark| DevHunter| | 9|Chidananda Raju| 35| Spark| DevHunter| | 9|Sreekanth Doddy| 29| Spark| DevHunter| +--------+---------------+---+-----------+------------------+ LEFT_OUTER JOIN +--------+---------------+---+-----------+------------------+ |personid| name|age|profileName|profileDescription| +--------+---------------+---+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 2| Meghna| 22| Spark| SparkSQLMaster| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 4| Siddhika| 22| null| null| | 5| Srinivas| 45| Spark| SparkGuru| | 6| Harshita| 22| null| null| | 8| Deekshita| 22| null| null| | 9|Chidananda Raju| 35| Spark| DevHunter| | 9|Sreekanth Doddy| 29| Spark| DevHunter| | 9| Ashik| 22| Spark| DevHunter| | 9| Ram| 42| Spark| DevHunter| +--------+---------------+---+-----------+------------------+ RIGHT JOIN +--------+---------------+----+-----------+------------------+ |personid| name| age|profileName|profileDescription| +--------+---------------+----+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 1| null|null| Spark| All Rounder| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 2| Meghna| 22| Spark| SparkSQLMaster| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 5| Srinivas| 45| Spark| SparkGuru| | 9|Sreekanth Doddy| 29| Spark| DevHunter| | 9|Chidananda Raju| 35| Spark| DevHunter| | 9| Ram| 42| Spark| DevHunter| | 9| Ashik| 22| Spark| DevHunter| +--------+---------------+----+-----------+------------------+ RIGHT_OUTER JOIN +--------+---------------+----+-----------+------------------+ |personid| name| age|profileName|profileDescription| +--------+---------------+----+-----------+------------------+ | 0| Ravi| 42| Spark| Committer| | 1| null|null| Spark| All Rounder| | 2| Meghna| 22| Spark| SparkSQLMaster| | 2| Snigdha| 22| Spark| SparkSQLMaster| | 2| Nataraj| 45| Spark| SparkSQLMaster| | 3| Madhu| 22| Spark| Evangelist| | 5| Srinivas| 45| Spark| SparkGuru| | 9|Sreekanth Doddy| 29| Spark| DevHunter| | 9| Ashik| 22| Spark| DevHunter| | 9|Chidananda Raju| 35| Spark| DevHunter| | 9| Ram| 42| Spark| DevHunter| +--------+---------------+----+-----------+------------------+ LEFT_SEMI JOIN +--------+---------------+---+ |personid| name|age| +--------+---------------+---+ | 0| Ravi| 42| | 2| Nataraj| 45| | 2| Meghna| 22| | 2| Snigdha| 22| | 3| Madhu| 22| | 5| Srinivas| 45| | 9|Chidananda Raju| 35| | 9|Sreekanth Doddy| 29| | 9| Ram| 42| | 9| Ashik| 22| +--------+---------------+---+ LEFT_ANTI JOIN +--------+---------+---+ |personid| name|age| +--------+---------+---+ | 4| Siddhika| 22| | 6| Harshita| 22| | 8|Deekshita| 22| +--------+---------+---+ Till 1.x cross join is : df_asPerson.join(df_asProfile) Explicit Cross Join in 2.x : http://blog.madhukaraphatak.com/migrating-to-spark-two-part-4/ Cartesian joins are very expensive without an extra filter that can be pushed down. cross join or cartesian product +---------------+---+--------+-----------+--------+------------------+ |name |age|personid|profileName|personid|profileDescription| +---------------+---+--------+-----------+--------+------------------+ |Nataraj |45 |2 |Spark |2 |SparkSQLMaster | |Nataraj |45 |2 |Spark |5 |SparkGuru | |Nataraj |45 |2 |Spark |9 |DevHunter | |Nataraj |45 |2 |Spark |3 |Evangelist | |Nataraj |45 |2 |Spark |0 |Committer | |Nataraj |45 |2 |Spark |1 |All Rounder | |Srinivas |45 |5 |Spark |2 |SparkSQLMaster | |Srinivas |45 |5 |Spark |5 |SparkGuru | |Srinivas |45 |5 |Spark |9 |DevHunter | |Srinivas |45 |5 |Spark |3 |Evangelist | |Srinivas |45 |5 |Spark |0 |Committer | |Srinivas |45 |5 |Spark |1 |All Rounder | |Ashik |22 |9 |Spark |2 |SparkSQLMaster | |Ashik |22 |9 |Spark |5 |SparkGuru | |Ashik |22 |9 |Spark |9 |DevHunter | |Ashik |22 |9 |Spark |3 |Evangelist | |Ashik |22 |9 |Spark |0 |Committer | |Ashik |22 |9 |Spark |1 |All Rounder | |Deekshita |22 |8 |Spark |2 |SparkSQLMaster | |Deekshita |22 |8 |Spark |5 |SparkGuru | |Deekshita |22 |8 |Spark |9 |DevHunter | |Deekshita |22 |8 |Spark |3 |Evangelist | |Deekshita |22 |8 |Spark |0 |Committer | |Deekshita |22 |8 |Spark |1 |All Rounder | |Siddhika |22 |4 |Spark |2 |SparkSQLMaster | |Siddhika |22 |4 |Spark |5 |SparkGuru | |Siddhika |22 |4 |Spark |9 |DevHunter | |Siddhika |22 |4 |Spark |3 |Evangelist | |Siddhika |22 |4 |Spark |0 |Committer | |Siddhika |22 |4 |Spark |1 |All Rounder | |Madhu |22 |3 |Spark |2 |SparkSQLMaster | |Madhu |22 |3 |Spark |5 |SparkGuru | |Madhu |22 |3 |Spark |9 |DevHunter | |Madhu |22 |3 |Spark |3 |Evangelist | |Madhu |22 |3 |Spark |0 |Committer | |Madhu |22 |3 |Spark |1 |All Rounder | |Meghna |22 |2 |Spark |2 |SparkSQLMaster | |Meghna |22 |2 |Spark |5 |SparkGuru | |Meghna |22 |2 |Spark |9 |DevHunter | |Meghna |22 |2 |Spark |3 |Evangelist | |Meghna |22 |2 |Spark |0 |Committer | |Meghna |22 |2 |Spark |1 |All Rounder | |Snigdha |22 |2 |Spark |2 |SparkSQLMaster | |Snigdha |22 |2 |Spark |5 |SparkGuru | |Snigdha |22 |2 |Spark |9 |DevHunter | |Snigdha |22 |2 |Spark |3 |Evangelist | |Snigdha |22 |2 |Spark |0 |Committer | |Snigdha |22 |2 |Spark |1 |All Rounder | |Harshita |22 |6 |Spark |2 |SparkSQLMaster | |Harshita |22 |6 |Spark |5 |SparkGuru | |Harshita |22 |6 |Spark |9 |DevHunter | |Harshita |22 |6 |Spark |3 |Evangelist | |Harshita |22 |6 |Spark |0 |Committer | |Harshita |22 |6 |Spark |1 |All Rounder | |Ravi |42 |0 |Spark |2 |SparkSQLMaster | |Ravi |42 |0 |Spark |5 |SparkGuru | |Ravi |42 |0 |Spark |9 |DevHunter | |Ravi |42 |0 |Spark |3 |Evangelist | |Ravi |42 |0 |Spark |0 |Committer | |Ravi |42 |0 |Spark |1 |All Rounder | |Ram |42 |9 |Spark |2 |SparkSQLMaster | |Ram |42 |9 |Spark |5 |SparkGuru | |Ram |42 |9 |Spark |9 |DevHunter | |Ram |42 |9 |Spark |3 |Evangelist | |Ram |42 |9 |Spark |0 |Committer | |Ram |42 |9 |Spark |1 |All Rounder | |Chidananda Raju|35 |9 |Spark |2 |SparkSQLMaster | |Chidananda Raju|35 |9 |Spark |5 |SparkGuru | |Chidananda Raju|35 |9 |Spark |9 |DevHunter | |Chidananda Raju|35 |9 |Spark |3 |Evangelist | |Chidananda Raju|35 |9 |Spark |0 |Committer | |Chidananda Raju|35 |9 |Spark |1 |All Rounder | |Sreekanth Doddy|29 |9 |Spark |2 |SparkSQLMaster | |Sreekanth Doddy|29 |9 |Spark |5 |SparkGuru | |Sreekanth Doddy|29 |9 |Spark |9 |DevHunter | |Sreekanth Doddy|29 |9 |Spark |3 |Evangelist | |Sreekanth Doddy|29 |9 |Spark |0 |Committer | |Sreekanth Doddy|29 |9 |Spark |1 |All Rounder | +---------------+---+--------+-----------+--------+------------------+ == Physical Plan == BroadcastNestedLoopJoin BuildRight, Cross :- LocalTableScan [name#0, age#1, personid#2] +- BroadcastExchange IdentityBroadcastMode +- LocalTableScan [profileName#7, personid#8, profileDescription#9] () 78 createOrReplaceTempView example Creates a local temporary view using the given name. The lifetime of this temporary view is tied to the [[SparkSession]] that was used to create this Dataset. createOrReplaceTempView sql SELECT dfperson.name , dfperson.age , dfprofile.profileDescription FROM dfperson JOIN dfprofile ON dfperson.personid == dfprofile.personid +---------------+---+------------------+ | name|age|profileDescription| +---------------+---+------------------+ | Nataraj| 45| SparkSQLMaster| | Srinivas| 45| SparkGuru| | Ashik| 22| DevHunter| | Madhu| 22| Evangelist| | Meghna| 22| SparkSQLMaster| | Snigdha| 22| SparkSQLMaster| | Ravi| 42| Committer| | Ram| 42| DevHunter| |Chidananda Raju| 35| DevHunter| |Sreekanth Doddy| 29| DevHunter| +---------------+---+------------------+ **** EXCEPT DEMO *** df_asPerson.except(df_asProfile) Except demo +---------------+---+--------+ | name|age|personid| +---------------+---+--------+ | Ashik| 22| 9| | Harshita| 22| 6| | Madhu| 22| 3| | Ram| 42| 9| | Ravi| 42| 0| |Chidananda Raju| 35| 9| | Siddhika| 22| 4| | Srinivas| 45| 5| |Sreekanth Doddy| 29| 9| | Deekshita| 22| 8| | Meghna| 22| 2| | Snigdha| 22| 2| | Nataraj| 45| 2| +---------------+---+--------+ df_asProfile.except(df_asPerson) Except demo +-----------+--------+------------------+ |profileName|personid|profileDescription| +-----------+--------+------------------+ | Spark| 5| SparkGuru| | Spark| 9| DevHunter| | Spark| 2| SparkSQLMaster| | Spark| 3| Evangelist| | Spark| 0| Committer| | Spark| 1| All Rounder| +-----------+--------+------------------+

Como se discutió anteriormente, estos son los diagramas de Venn de todas las uniones.


De una sola mano

// join type can be inner, left, right, fullouter val mergedDf = df1.join(df2, Seq("keyCol"), "inner") // keyCol can be multiple column names seperated by comma val mergedDf = df1.join(df2, Seq("keyCol1", "keyCol2"), "left")

De otra manera

import spark.implicits._ val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName")) // to select specific columns as output val mergedDf = df1.as("d1").join(df2.as("d2"), ($"d1.colName" === $"d2.colName")).select($"d1.*", $"d2.anotherColName")


Desde https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html , use join :

Equi-join interno con otro DataFrame usando la columna dada.

PersonDf.join(ProfileDf,$"personId")

O

PersonDf.join(ProfileDf,PersonDf("personId") === ProfileDf("personId"))

Actualizar:

También puede guardar los DFs como tabla temporal usando df.registerTempTable("tableName") y puede escribir consultas sql usando sqlContext .


puedes usar

val resultDf = PersonDf.join(ProfileDf, PersonDf("personId") === ProfileDf("personId"))

o más corto y más flexible (ya que puede especificar fácilmente más de 1 columnas para unir)

val resultDf = PersonDf.join(ProfileDf,Seq("personId"))