scala - sqlcontext - spark sql tutorial
API de Dataset Spark: únete (3)
Intento utilizar la API Spark Dataset, pero estoy teniendo algunos problemas al hacer una unión simple.
Digamos que tengo dos conjuntos de datos con campos: date | value
date | value
, entonces en el caso de DataFrame
mi unión se vería así:
val dfA : DataFrame
val dfB : DataFrame
dfA.join(dfB, dfB("date") === dfA("date") )
Sin embargo, para Dataset
existe el método .joinWith
, pero el mismo enfoque no funciona:
val dfA : Dataset
val dfB : Dataset
dfA.joinWith(dfB, ? )
¿Cuál es el argumento requerido por .joinWith
?
Para usar joinWith
primero tiene que crear un DataSet
, y probablemente dos de ellos. Para crear un DataSet
, necesita crear una clase de caso que coincida con su esquema y llamar a DataFrame.as[T]
donde T
es su clase de caso. Asi que:
case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
También puede omitir la clase de caso y usar una tupla:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
Entonces, si tuviera otra clase de caso / DF, haga lo siguiente:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
Entonces, aunque la sintaxis de join
y joinWith
son similares, los resultados son diferentes:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// | 1| asdf| 1| 7.7| 101|
// | 2|34234| 2| 1.2| 10|
// +---+-----+---+----+----+
ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// | _1| _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+
Como puede ver, joinWith
deja los objetos intactos como partes de una tupla, mientras que join
aplana las columnas en un solo espacio de nombres. (Lo cual causará problemas en el caso anterior porque se repite el nombre de la columna "clave").
Curiosamente, tengo que usar df.col("key")
y df2.col("key")
para crear las condiciones para unir ds
y ds2
- si usa solo col("key")
en cualquier lado lo hace no funciona, y ds.col(...)
no existe. Sin embargo, usar el df.col("key")
original df.col("key")
es el truco.
Desde https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html
parece que podrías hacer
dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
En el ejemplo anterior, puede probar la opción siguiente:
Definir una clase de caso para su salida
case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)
Unir dos conjuntos de datos con "Seq (" clave ")", esto le ayudará a evitar dos columnas clave duplicadas en la salida. Lo cual ayudará a aplicar la clase de caso o buscar los datos en el próximo paso
ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]
scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+