tutorial sqlcontext spark read example español scala apache-spark apache-spark-sql apache-spark-dataset

scala - sqlcontext - spark sql tutorial



API de Dataset Spark: únete (3)

Intento utilizar la API Spark Dataset, pero estoy teniendo algunos problemas al hacer una unión simple.

Digamos que tengo dos conjuntos de datos con campos: date | value date | value , entonces en el caso de DataFrame mi unión se vería así:

val dfA : DataFrame val dfB : DataFrame dfA.join(dfB, dfB("date") === dfA("date") )

Sin embargo, para Dataset existe el método .joinWith , pero el mismo enfoque no funciona:

val dfA : Dataset val dfB : Dataset dfA.joinWith(dfB, ? )

¿Cuál es el argumento requerido por .joinWith ?


Para usar joinWith primero tiene que crear un DataSet , y probablemente dos de ellos. Para crear un DataSet , necesita crear una clase de caso que coincida con su esquema y llamar a DataFrame.as[T] donde T es su clase de caso. Asi que:

case class KeyValue(key: Int, value: String) val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") val ds = df.as[KeyValue] // org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

También puede omitir la clase de caso y usar una tupla:

val tupDs = df.as[(Int,String)] // org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Entonces, si tuviera otra clase de caso / DF, haga lo siguiente:

case class Nums(key: Int, num1: Double, num2: Long) val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") val ds2 = df2.as[Nums] // org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Entonces, aunque la sintaxis de join y joinWith son similares, los resultados son diferentes:

df.join(df2, df.col("key") === df2.col("key")).show // +---+-----+---+----+----+ // |key|value|key|num1|num2| // +---+-----+---+----+----+ // | 1| asdf| 1| 7.7| 101| // | 2|34234| 2| 1.2| 10| // +---+-----+---+----+----+ ds.joinWith(ds2, df.col("key") === df2.col("key")).show // +---------+-----------+ // | _1| _2| // +---------+-----------+ // | [1,asdf]|[1,7.7,101]| // |[2,34234]| [2,1.2,10]| // +---------+-----------+

Como puede ver, joinWith deja los objetos intactos como partes de una tupla, mientras que join aplana las columnas en un solo espacio de nombres. (Lo cual causará problemas en el caso anterior porque se repite el nombre de la columna "clave").

Curiosamente, tengo que usar df.col("key") y df2.col("key") para crear las condiciones para unir ds y ds2 - si usa solo col("key") en cualquier lado lo hace no funciona, y ds.col(...) no existe. Sin embargo, usar el df.col("key") original df.col("key") es el truco.



En el ejemplo anterior, puede probar la opción siguiente:

  • Definir una clase de caso para su salida

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Unir dos conjuntos de datos con "Seq (" clave ")", esto le ayudará a evitar dos columnas clave duplicadas en la salida. Lo cual ayudará a aplicar la clase de caso o buscar los datos en el próximo paso

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+