scala - que - spark ejemplo
Spark: ¿cuál es la mejor estrategia para unir un RDD de 2 tuplas con RDD de una sola llave? (2)
Otra forma de hacerlo es crear un particionador personalizado y luego usar zipPartitions para unir sus RDD.
import org.apache.spark.HashPartitioner
class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case k: Tuple2[Int, String] => super.getPartition(k._1)
case _ => super.getPartition(key)
}
}
val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))
val result = rdd2.zipPartitions(rdd1)(
(iter2, iter1) => {
val m = iter1.toMap
for {
((t: Int, w), u) <- iter2
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}
).partitionBy(new HashPartitioner(numSplits))
result.glom.collect
Tengo dos RDD a los que deseo unirme y se ven así:
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Ocurre que los valores clave de rdd1
son únicos y también que los valores de tupla-clave de rdd2
son únicos. Me gustaría unirme a los dos conjuntos de datos para obtener el siguiente rdd:
val rdd_joined:RDD[((T,W), (U,V))]
¿Cuál es la forma más eficiente de lograr esto? Aquí hay algunas ideas en las que he pensado.
Opción 1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Opcion 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
La Opción 1 recogerá todos los datos para dominar, ¿verdad? Entonces, eso no parece una buena opción si rdd1 es grande (es relativamente grande en mi caso, aunque un orden de magnitud menor que rdd2). La opción 2 hace un producto feo y cartesiano feo, que también parece ser muy ineficiente. Otra posibilidad que se me pasó por la cabeza (pero aún no lo he intentado) es hacer la opción 1 y transmitir el mapa, aunque sería mejor transmitir de manera "inteligente" para que las teclas del mapa estén ubicadas junto con el claves de rdd2
.
¿Alguien ha encontrado este tipo de situación antes? Estaría feliz de tener tus pensamientos.
¡Gracias!
Una opción es realizar una combinación de difusión recopilando rdd1
al controlador y transmitiéndolo a todos los rdd1
de rdd1
; hecho correctamente, esto nos permitirá evitar una costosa rdd2
RDD rdd2
grande:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))
val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
val m = rdd1Broadcast.value
for {
((t, w), u) <- iter
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)
El rdd2
preservesPartitioning = true
le dice a Spark que esta función de mapa no modifica las claves de rdd2
; esto permitirá que Spark evite volver a particionar rdd2
para cualquier operación posterior que se una sobre la base de la tecla (t, w)
.
Esta transmisión podría ser ineficiente ya que implica un cuello de botella de comunicaciones en el controlador. En principio, es posible transmitir un RDD a otro sin involucrar al controlador; Tengo un prototipo de esto que me gustaría generalizar y agregar a Spark.
Otra opción es volver a mapear las claves de rdd2
y usar el método de join
Spark; esto implicará una mezcla completa de rdd2
(y posiblemente rdd1
):
rdd1.join(rdd2.map {
case ((t, w), u) => (t, (w, u))
}).map {
case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()
En mi entrada de muestra, ambos métodos producen el mismo resultado:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
Una tercera opción sería reestructurar rdd2
para que t
sea su clave, luego realice la unión anterior.