una tipos tamaño recomendadas que punto particiones particion para necesarias montaje duro disco scala hadoop apache-spark

scala - tipos - Cómo definir el particionador personalizado para Spark RDDs de igual tamaño de partición donde cada partición tiene la misma cantidad de elementos?



tipos de particiones en linux (2)

El trabajo del Partitioner asignando una clave a una partición. Necesitará conocimiento previo de la distribución de claves, o mirar todas las claves, para crear dicho particionador. Es por eso que Spark no te proporciona uno.

En general, no necesita ese particionador. De hecho, no puedo encontrar un caso de uso en el que necesite particiones de igual tamaño. ¿Qué pasa si la cantidad de elementos es impar?

De todos modos, digamos que tiene un RDD codificado por Int secuencial, y sabe cuántos en total. Entonces podrías escribir un Partitioner personalizado como este:

class ExactPartitioner[V]( partitions: Int, elements: Int) extends Partitioner { def getPartition(key: Any): Int = { val k = key.asInstanceOf[Int] // `k` is assumed to go continuously from 0 to elements-1. return k * partitions / elements } }

Soy nuevo en Spark. Tengo un gran conjunto de datos de elementos [RDD] y quiero dividirlo en dos particiones de igual tamaño manteniendo el orden de los elementos. Intenté usar RangePartitioner como

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))

Esto no da un resultado satisfactorio porque divide el orden de mantenimiento de los elementos aproximadamente pero no exactamente del mismo tamaño. Por ejemplo, si hay 64 elementos, usamos Rangepartitioner , luego se divide en 31 elementos y 33 elementos.

Necesito un particionador de tal manera que obtenga exactamente los primeros 32 elementos en una mitad y la otra mitad contenga el segundo conjunto de 32 elementos. ¿Podría ayudarme sugiriendo cómo usar un particionador personalizado de modo que obtenga dos mitades del mismo tamaño, manteniendo el orden de los elementos?


Esta respuesta tiene algo de inspiración de Daniel, pero proporciona una implementación completa (usando chimpancé mi patrón de biblioteca ) con un ejemplo para copiar y pegar las necesidades de las personas :)

import RDDConversions._ trait RDDWrapper[T] { def rdd: RDD[T] } // TODO View bounds are deprecated, should use context bounds // Might need to change ClassManifest for ClassTag in spark 1.0.0 case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] { // Here we use a single Long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many Longs or even a GUID def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] = rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey() .grouped(numPartitions).map(t => (t._1._1, t._2)) } case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] { def grouped(size: Int): RDD[T] = { // TODO Version where withIndex is cached val withIndex = rdd.mapPartitions(_.zipWithIndex) val startValues = withIndex.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toIterable.last))).toArray().toList .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L) withIndex.mapPartitionsWithIndex((i, iter) => iter.map { case (value, index) => (startValues(i) + index.toLong, value) }) .partitionBy(new Partitioner { def numPartitions: Int = size def getPartition(key: Any): Int = (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt }) .map(_._2) } }

Luego en otro archivo tenemos

// TODO modify above to be implicit class, rather than have implicit conversions object RDDConversions { implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = new RichRDD[T](rdd) implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd) implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd }

Entonces, para su caso de uso solo desea (suponiendo que ya esté ordenado)

import RDDConversions._ yourRdd.grouped(2)

Descargo de responsabilidad: No probado, simplemente escribió esto directamente en la respuesta SO