tutorial spark parallelize apache-spark partitioning hadoop-partitioning

apache-spark - tutorial - parallelize spark



En Apache Spark, ¿por qué RDD.union no conserva el particionador? (1)

union es una operación muy eficiente, porque no mueve ningún dato. Si rdd1 tiene 10 particiones y rdd2 tiene 20 particiones, entonces rdd1.union(rdd2) tendrá 30 particiones: las particiones de los dos RDD colocadas una detrás de la otra. Esto es solo un cambio de contabilidad, no hay barajadas.

Pero necesariamente descarta el particionador. Un particionador se construye para un número dado de particiones. El RDD resultante tiene un número de particiones que es diferente de rdd1 y rdd2 .

Después de tomar la unión, puede ejecutar la repartition para barajar los datos y organizarlos por clave.

Hay una excepción a lo anterior. Si rdd1 y rdd2 tienen el mismo particionador (con el mismo número de particiones), la union comporta de manera diferente. Unirá las particiones de los dos RDDs por pares, dándole el mismo número de particiones que tenía cada una de las entradas. Esto puede implicar mover datos alrededor (si las particiones no fueron coubicadas) pero no implicará un orden aleatorio. En este caso se conserva el particionador. (El código para esto está en PartitionerAwareUnionRDD.scala ).

Como todos saben, los particionadores en Spark tienen un gran impacto en el rendimiento en cualquier operación "amplia", por lo que generalmente se personalizan en las operaciones. Estaba experimentando con el siguiente código:

val rdd1 = sc.parallelize(1 to 50).keyBy(_ % 10) .partitionBy(new HashPartitioner(10)) val rdd2 = sc.parallelize(200 to 230).keyBy(_ % 13) val cogrouped = rdd1.cogroup(rdd2) println("cogrouped: " + cogrouped.partitioner) val unioned = rdd1.union(rdd2) println("union: " + unioned.partitioner)

Veo que por defecto cogroup() siempre produce un RDD con el particionador personalizado, pero union() no lo hace, siempre volverá a la configuración predeterminada. Esto es contrario a la intuición, ya que generalmente asumimos que un PairRDD debe usar su primer elemento como clave de partición. ¿Hay una manera de "forzar" a Spark a combinar 2 PairRDDs para usar la misma clave de partición?