spark que introduccion examples ejemplo datos batch arquitectura scala apache-spark

scala - que - spark ejemplo



Spark: ¿cuál es la mejor estrategia para unir un RDD de 2 tuplas con RDD de una sola llave? (2)

Otra forma de hacerlo es crear un particionador personalizado y luego usar zipPartitions para unir sus RDD.

import org.apache.spark.HashPartitioner class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { override def getPartition(key: Any): Int = key match { case k: Tuple2[Int, String] => super.getPartition(k._1) case _ => super.getPartition(key) } } val numSplits = 8 val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits)) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits)) val result = rdd2.zipPartitions(rdd1)( (iter2, iter1) => { val m = iter1.toMap for { ((t: Int, w), u) <- iter2 if m.contains(t) } yield ((t, w), (u, m.get(t).get)) } ).partitionBy(new HashPartitioner(numSplits)) result.glom.collect

Tengo dos RDD a los que deseo unirme y se ven así:

val rdd1:RDD[(T,U)] val rdd2:RDD[((T,W), V)]

Ocurre que los valores clave de rdd1 son únicos y también que los valores de tupla-clave de rdd2 son únicos. Me gustaría unirme a los dos conjuntos de datos para obtener el siguiente rdd:

val rdd_joined:RDD[((T,W), (U,V))]

¿Cuál es la forma más eficiente de lograr esto? Aquí hay algunas ideas en las que he pensado.

Opción 1:

val m = rdd1.collectAsMap val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

Opcion 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

La Opción 1 recogerá todos los datos para dominar, ¿verdad? Entonces, eso no parece una buena opción si rdd1 es grande (es relativamente grande en mi caso, aunque un orden de magnitud menor que rdd2). La opción 2 hace un producto feo y cartesiano feo, que también parece ser muy ineficiente. Otra posibilidad que se me pasó por la cabeza (pero aún no lo he intentado) es hacer la opción 1 y transmitir el mapa, aunque sería mejor transmitir de manera "inteligente" para que las teclas del mapa estén ubicadas junto con el claves de rdd2 .

¿Alguien ha encontrado este tipo de situación antes? Estaría feliz de tener tus pensamientos.

¡Gracias!


Una opción es realizar una combinación de difusión recopilando rdd1 al controlador y transmitiéndolo a todos los rdd1 de rdd1 ; hecho correctamente, esto nos permitirá evitar una costosa rdd2 RDD rdd2 grande:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true)

El rdd2 preservesPartitioning = true le dice a Spark que esta función de mapa no modifica las claves de rdd2 ; esto permitirá que Spark evite volver a particionar rdd2 para cualquier operación posterior que se una sobre la base de la tecla (t, w) .

Esta transmisión podría ser ineficiente ya que implica un cuello de botella de comunicaciones en el controlador. En principio, es posible transmitir un RDD a otro sin involucrar al controlador; Tengo un prototipo de esto que me gustaría generalizar y agregar a Spark.

Otra opción es volver a mapear las claves de rdd2 y usar el método de join Spark; esto implicará una mezcla completa de rdd2 (y posiblemente rdd1 ):

rdd1.join(rdd2.map { case ((t, w), u) => (t, (w, u)) }).map { case (t, (v, (w, u))) => ((t, w), (u, v)) }.collect()

En mi entrada de muestra, ambos métodos producen el mismo resultado:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))

Una tercera opción sería reestructurar rdd2 para que t sea ​​su clave, luego realice la unión anterior.