spark outer example duplicate data columns scala join apache-spark dataset apache-spark-sql

outer - Realice una unión escrita a máquina en Scala con Spark Datasets



scala df join (1)

Me gustan los conjuntos de datos Spark ya que me dan errores de análisis y de sintaxis en tiempo de compilación y también me permiten trabajar con getters en lugar de nombres / números codificados. La mayoría de los cálculos se pueden realizar con las API de alto nivel de Dataset. Por ejemplo, es mucho más simple realizar operaciones de agg, select, sum, avg, map, filter o groupBy accediendo a un objeto de tipo Dataset que utilizando campos de datos de filas de RDD.

Sin embargo, falta la operación de unión, he leído que puedo hacer una unión como esta

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

Pero eso no es lo que quiero, ya que preferiría hacerlo a través de la interfaz de clase de caso, así que algo más como esto

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

La mejor alternativa por ahora parece crear un objeto al lado de la clase de caso y otorgar a esta función el nombre de columna correcto como cadena. Entonces utilizaría la primera línea de código pero pondría una función en lugar de un nombre de columna codificado. Pero eso no se siente lo suficientemente elegante ..

¿Puede alguien aconsejarme sobre otras opciones aquí? El objetivo es tener una abstracción de los nombres reales de las columnas y trabajar preferiblemente a través de los buscadores de la clase de casos.

Estoy usando Spark 1.6.1 y Scala 2.10


Observación

Spark SQL puede optimizar la unión solo si la condición de unión se basa en el operador de igualdad. Esto significa que podemos considerar equijoins y non-equijoins por separado.

Equijoin

Equijoin se puede implementar de forma segura asociando ambos Datasets de Datasets a tuplas (clave, valor), realizando una unión basada en claves y remodelando el resultado:

import org.apache.spark.sql.Encoder import org.apache.spark.sql.Dataset def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U]) (f: T => K, g: U => K) (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = { val ds1_ = ds1.map(x => (f(x), x)) val ds2_ = ds2.map(x => (g(x), x)) ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2)) }

No equijoin

Puede expresarse utilizando operadores de álgebra relacional como R ⋈θ S = σθ (R × S) y convertirse directamente en código.

Spark 2.0

Habilite crossJoin y use joinWith con un predicado trivialmente igual:

spark.conf.set("spark.sql.crossJoin.enabled", true) def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U]) (p: (T, U) => Boolean) = { ds1.joinWith(ds2, lit(true)).filter(p.tupled) }

Spark 2.1

Use el método crossJoin :

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U]) (p: (T, U) => Boolean) (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = { ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled) }

Ejemplos

case class LabeledPoint(label: String, x: Double, y: Double) case class Category(id: Long, name: String) val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS val points2 = Seq( LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0) ).toDS val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS safeEquiJoin(points1, categories)(_.label, _.name) safeNonEquiJoin(points1, points2)(_.x > _.x)

Notas

  • Cabe señalar que estos métodos son cualitativamente diferentes de los de una aplicación de joinWith directa y requieren costosas transformaciones DeserializeToObject / SerializeFromObject (en comparación con la joinWith uso de operaciones lógicas en los datos).

    Esto es similar al comportamiento descrito en Spark 2.0 Dataset vs DataFrame .

  • Si no está limitado a Spark SQL, la API frameless proporciona interesantes extensiones seguras tipo para Datasets de Datasets (a partir de hoy solo admite Spark 2.0):

    import frameless.TypedDataset val typedPoints1 = TypedDataset.create(points1) val typedPoints2 = TypedDataset.create(points2) typedPoints1.join(typedPoints2, typedPoints1(''x), typedPoints2(''x))

  • Dataset API del Dataset no es estable en 1.6, así que no creo que tenga sentido usarla allí.

  • Por supuesto, este diseño y los nombres descriptivos no son necesarios. Puede usar fácilmente la clase de tipo para agregar estos métodos implícitamente al Dataset de Dataset y no hay conflicto con las firmas incorporadas, por lo que ambos pueden llamarse joinWith .