apache-spark - tutorial - parallelize spark
En Apache Spark, ¿por qué RDD.union no conserva el particionador? (1)
union
es una operación muy eficiente, porque no mueve ningún dato. Si rdd1
tiene 10 particiones y rdd2
tiene 20 particiones, entonces rdd1.union(rdd2)
tendrá 30 particiones: las particiones de los dos RDD colocadas una detrás de la otra. Esto es solo un cambio de contabilidad, no hay barajadas.
Pero necesariamente descarta el particionador. Un particionador se construye para un número dado de particiones. El RDD resultante tiene un número de particiones que es diferente de rdd1
y rdd2
.
Después de tomar la unión, puede ejecutar la repartition
para barajar los datos y organizarlos por clave.
Hay una excepción a lo anterior. Si rdd1
y rdd2
tienen el mismo particionador (con el mismo número de particiones), la union
comporta de manera diferente. Unirá las particiones de los dos RDDs por pares, dándole el mismo número de particiones que tenía cada una de las entradas. Esto puede implicar mover datos alrededor (si las particiones no fueron coubicadas) pero no implicará un orden aleatorio. En este caso se conserva el particionador. (El código para esto está en PartitionerAwareUnionRDD.scala ).
Como todos saben, los particionadores en Spark tienen un gran impacto en el rendimiento en cualquier operación "amplia", por lo que generalmente se personalizan en las operaciones. Estaba experimentando con el siguiente código:
val rdd1 =
sc.parallelize(1 to 50).keyBy(_ % 10)
.partitionBy(new HashPartitioner(10))
val rdd2 =
sc.parallelize(200 to 230).keyBy(_ % 13)
val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)
val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)
Veo que por defecto cogroup()
siempre produce un RDD con el particionador personalizado, pero union()
no lo hace, siempre volverá a la configuración predeterminada. Esto es contrario a la intuición, ya que generalmente asumimos que un PairRDD debe usar su primer elemento como clave de partición. ¿Hay una manera de "forzar" a Spark a combinar 2 PairRDDs para usar la misma clave de partición?