outer - Realice una unión escrita a máquina en Scala con Spark Datasets
scala df join (1)
Me gustan los conjuntos de datos Spark ya que me dan errores de análisis y de sintaxis en tiempo de compilación y también me permiten trabajar con getters en lugar de nombres / números codificados. La mayoría de los cálculos se pueden realizar con las API de alto nivel de Dataset. Por ejemplo, es mucho más simple realizar operaciones de agg, select, sum, avg, map, filter o groupBy accediendo a un objeto de tipo Dataset que utilizando campos de datos de filas de RDD.
Sin embargo, falta la operación de unión, he leído que puedo hacer una unión como esta
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
Pero eso no es lo que quiero, ya que preferiría hacerlo a través de la interfaz de clase de caso, así que algo más como esto
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
La mejor alternativa por ahora parece crear un objeto al lado de la clase de caso y otorgar a esta función el nombre de columna correcto como cadena. Entonces utilizaría la primera línea de código pero pondría una función en lugar de un nombre de columna codificado. Pero eso no se siente lo suficientemente elegante ..
¿Puede alguien aconsejarme sobre otras opciones aquí? El objetivo es tener una abstracción de los nombres reales de las columnas y trabajar preferiblemente a través de los buscadores de la clase de casos.
Estoy usando Spark 1.6.1 y Scala 2.10
Observación
Spark SQL puede optimizar la unión solo si la condición de unión se basa en el operador de igualdad. Esto significa que podemos considerar equijoins y non-equijoins por separado.
Equijoin
Equijoin se puede implementar de forma segura asociando ambos Datasets
de Datasets
a tuplas (clave, valor), realizando una unión basada en claves y remodelando el resultado:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
No equijoin
Puede expresarse utilizando operadores de álgebra relacional como R ⋈θ S = σθ (R × S) y convertirse directamente en código.
Spark 2.0
Habilite crossJoin
y use joinWith
con un predicado trivialmente igual:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
Spark 2.1
Use el método crossJoin
:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
Ejemplos
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
Notas
Cabe señalar que estos métodos son cualitativamente diferentes de los de una aplicación de
joinWith
directa y requieren costosas transformacionesDeserializeToObject
/SerializeFromObject
(en comparación con lajoinWith
uso de operaciones lógicas en los datos).Esto es similar al comportamiento descrito en Spark 2.0 Dataset vs DataFrame .
Si no está limitado a Spark SQL, la API
frameless
proporciona interesantes extensiones seguras tipo paraDatasets
deDatasets
(a partir de hoy solo admite Spark 2.0):import frameless.TypedDataset val typedPoints1 = TypedDataset.create(points1) val typedPoints2 = TypedDataset.create(points2) typedPoints1.join(typedPoints2, typedPoints1(''x), typedPoints2(''x))
Dataset
API delDataset
no es estable en 1.6, así que no creo que tenga sentido usarla allí.Por supuesto, este diseño y los nombres descriptivos no son necesarios. Puede usar fácilmente la clase de tipo para agregar estos métodos implícitamente al
Dataset
deDataset
y no hay conflicto con las firmas incorporadas, por lo que ambos pueden llamarsejoinWith
.