apache spark - spark - ¿Cómo divido un RDD en dos o más RDD?
rdd spark (4)
Como otros carteles mencionados anteriormente, no existe una única transformación RDD nativa que divida los RDD, pero aquí hay algunas operaciones "multiplex" que pueden emular eficientemente una amplia variedad de "división" en los RDD, sin leer varias veces:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
Algunos métodos específicos para la división aleatoria:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
Los métodos están disponibles en el proyecto de código abierto de silex:
https://github.com/willb/silex
Una publicación de blog que explica cómo funcionan:
http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}
def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
Como se mencionó en otra parte, estos métodos implican un intercambio de memoria por velocidad, porque operan calculando los resultados completos de la partición "ansiosamente" en lugar de "perezosamente". Por lo tanto, es posible que estos métodos se encuentren con problemas de memoria en particiones grandes, donde las transformaciones diferidas más tradicionales no lo harán.
Estoy buscando una manera de dividir un RDD en dos o más RDD. ¿Lo más cerca que he visto es Scala Spark: Split colección en varios RDD? que sigue siendo un solo RDD.
Si está familiarizado con SAS, algo como esto:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
lo que resultó en dos conjuntos de datos distintos. Tendría que persistir de inmediato para obtener los resultados que pretendo ...
No es posible producir múltiples RDD a partir de una sola transformación *.
Si desea dividir un RDD, debe aplicar un
filter
para cada condición de división.
Por ejemplo:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Si solo tiene una condición binaria y el cálculo es costoso, puede preferir algo como esto:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
Significa solo un cálculo de predicado único, pero requiere un paso adicional sobre todos los datos.
Es importante tener en cuenta que, siempre que un RDD de entrada se almacene en caché correctamente y no haya supuestos adicionales con respecto a la distribución de datos, no hay una diferencia significativa cuando se trata de la complejidad del tiempo entre el filtro repetido y el bucle for con if-else anidado.
Con N elementos y M condiciones, el número de operaciones que debe realizar es claramente proporcional a N veces M. En caso de bucle for, debe estar más cerca de (N + MN) / 2 y el filtro repetido es exactamente NM pero al final de el día no es más que O (NM). Puedes ver mi discusión ** con Jason Lenderman para leer sobre algunos pros y contras.
En el nivel más alto, debe considerar dos cosas:
-
Las transformaciones de chispa son flojas, hasta que ejecutas una acción, tu RDD no se materializa
¿Por qué eso importa? Volviendo a mi ejemplo:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Si más tarde decido que solo necesito
rdd_odd
entonces no hay razón para materializarrdd_even
.Si observa su ejemplo de SAS para calcular
work.split2
, necesita materializar los datos de entrada ywork.split1
. -
Los RDD proporcionan una API declarativa. Cuando usa
filter
omap
, depende completamente del motor de Spark cómo se realiza esta operación. Siempre que las funciones pasadas a las transformaciones estén libres de efectos secundarios, crea múltiples posibilidades para optimizar una tubería completa.
Al final del día, este caso no es lo suficientemente especial como para justificar su propia transformación.
Este mapa con patrón de filtro se usa realmente en un núcleo de Spark.
Vea mi respuesta a
¿Cómo Sparks RDD.randomSplit realmente divide el RDD
y una
parte relevante
del método
randomSplit
?
Si el único objetivo es lograr una división en la entrada, es posible usar la cláusula
DataFrameWriter
para
DataFrameWriter
cuyo formato de salida de texto:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
* Solo hay 3 tipos básicos de transformaciones en Spark:
- RDD [T] => RDD [T]
- RDD [T] => RDD [U]
- (RDD [T], RDD [U]) => RDD [W]
donde T, U, W pueden ser tipos atómicos o products / tuplas (K, V). Cualquier otra operación debe expresarse utilizando alguna combinación de lo anterior. Puede consultar el papel RDD original para obtener más detalles.
** http://chat..com/rooms/91928/discussion-between-zero323-and-jason-lenderman
*** Ver también Scala Spark: ¿División dividida en varios RDD?
Si divide un RDD utilizando la llamada a la API randomSplit , obtendrá una matriz de RDD.
Si desea que se devuelvan 5 RDD, pase 5 valores de peso.
p.ej
val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)
splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
Una forma es usar un particionador personalizado para particionar los datos dependiendo de la condición de su filtro.
Esto se puede lograr extendiendo
Partitioner
e implementando algo similar al
RangePartitioner
.
Las particiones de un mapa se pueden usar para construir múltiples RDD a partir del RDD particionado sin leer todos los datos.
val filtered = partitioned.mapPartitions { iter => {
new Iterator[Int](){
override def hasNext: Boolean = {
if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
false
} else {
iter.hasNext
}
}
override def next():Int = iter.next()
}
Solo tenga en cuenta que el número de particiones en los RDD filtrados será el mismo que el número en el RDD particionado, por lo que se debe usar una fusión para reducir esto y eliminar las particiones vacías.