spark apache-spark rdd

apache-spark - spark rdd take



¿Cómo selecciono un rango de elementos en Spark RDD? (4)

Me gustaría seleccionar una gama de elementos en un Spark RDD. Por ejemplo, tengo un RDD con cien elementos, y necesito seleccionar elementos del 60 al 80. ¿Cómo hago eso?

Veo que RDD tiene un método take (i: int), que devuelve los primeros i elementos. Pero no hay un método correspondiente para tomar los últimos i elementos, o i elementos del medio que comienzan en un cierto índice.


¿Qué tan grande es su conjunto de datos? Es posible que pueda hacer lo que necesita con:

data.take(80).drop(59)

Esto parece ineficiente, pero para datos pequeños y medianos, debería funcionar.

¿Es posible resolver esto de otra manera? ¿Cuál es el caso para elegir exactamente un cierto rango de la mitad de sus datos? ¿ takeSample te serviría mejor?


El siguiente debería ser capaz de obtener el rango. Tenga en cuenta que la memoria caché le ahorrará algo de sobrecarga, porque internamente zipWithIndex necesita escanear la partición RDD para obtener la cantidad de elementos en cada partición.

scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache scala>val r2 = r1.zipWithIndex scala>val r3 = r2.filter(x=> {x._2>2 && x._2 < 4}).map(x=>x._1) scala>r3.foreach(println) d


No creo que haya un método eficiente para hacer esto todavía. Pero la manera más fácil es usar filter() , digamos que tiene un RDD, se pairs con pares de valores clave y solo quiere elementos de 60 a 80 inclusive.

val 60to80 = pairs.filter { _ match { case (k,v) => k >= 60 && k <= 80 case _ => false //incase of invalid input } }

Creo que es posible que esto se pueda hacer de manera más eficiente en el futuro, utilizando sortByKey y guardando información sobre el rango de valores asignados a cada partición. Tenga en cuenta que este enfoque solo ahorraría algo si planea consultar el rango varias veces porque el tipo es obviamente costoso.

Al mirar la fuente de chispa definitivamente sería posible hacer consultas de rango eficientes usando RangePartitioner :

// An array of upper bounds for the first (partitions - 1) partitions private val rangeBounds: Array[K] = {

Este es un miembro privado de RangePartitioner con el conocimiento de todos los límites superiores de las particiones, sería fácil consultar solo las particiones necesarias. Parece que esto es algo que los usuarios de chispas pueden ver en el futuro: SPARK-911

ACTUALIZACIÓN: Mejor respuesta, basada en la solicitud de extracción Estoy escribiendo para SPARK-911. Funcionará eficientemente si el RDD está ordenado y lo consulta varias veces.

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache() val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]]; val (lower, upper) = (10, 20) val range = p.getPartition(lower) to p.getPartition(upper) println(range) val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => { if (range.contains(i)) for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v) else Iterator.empty } for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

Si tener toda la partición en la memoria es aceptable, incluso podrías hacer algo como esto.
val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

search no es miembro Por cierto, hice una clase implícita que tiene una función de búsqueda binaria, no se muestra aquí


Para aquellos que tropiezan con esta pregunta buscando una respuesta compatible con Spark 2.x, puedes usar filterByRange