significado - Apache Spark RDD filtra en dos RDDs
rdd significado (5)
El punto es que no desea hacer un filtro, sino un mapa.
(T) -> (Boolean, T)
Lo siento, soy ineficiente en la sintaxis de Scala. Pero la idea es que usted divide su conjunto de respuestas al asignarlo a los pares de Clave / Valor. La clave puede ser un valor booleano que indica que se marchita o no que pasa el predicado ''Filtro''.
Necesito dividir un RDD en 2 partes:
1 parte que satisface una condición; Otra parte que no lo hace. Puedo filter
dos veces en el RDD original pero parece ineficiente. ¿Hay alguna manera de hacer lo que estoy buscando? No encuentro nada en la API ni en la literatura.
Puede usar la subtract function
(si la operación del filtro es demasiado costosa).
Código PySpark:
rdd1 = data.filter(filterFunction)
rdd2 = data.subtract(rdd1)
Si está de acuerdo con una T
lugar de una RDD[T]
, puede hacer esto . De lo contrario, tal vez podrías hacer algo como esto:
val data = sc.parallelize(1 to 100)
val splitData = data.mapPartitions{iter => {
val splitList = (iter.toList).partition(_%2 == 0)
Tuple1(splitList).productIterator
}
}.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]])
Y, entonces, probablemente deba reducir esto para fusionar las listas cuando vaya a realizar una acción
Spark RDD no tiene tal api.
Aquí hay una versión basada en una solicitud de extracción para rdd.span que debería funcionar:
import scala.reflect.ClassTag
import org.apache.spark.rdd._
def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = {
val splits = rdd.mapPartitions { iter =>
val (left, right) = iter.partition(p)
val iterSeq = Seq(left, right)
iterSeq.iterator
}
val left = splits.mapPartitions { iter => iter.next().toIterator}
val right = splits.mapPartitions { iter =>
iter.next()
iter.next().toIterator
}
(left, right)
}
val rdd = sc.parallelize(0 to 10, 2)
val (first, second) = split[Int](rdd, _ % 2 == 0 )
first.collect
// Array[Int] = Array(0, 2, 4, 6, 8, 10)
Spark no soporta esto por defecto. Filtrar en los mismos datos dos veces no es tan malo si lo almacena de antemano y el filtrado en sí mismo es rápido.
Si realmente son solo dos tipos diferentes, puedes usar un método auxiliar:
implicit class RDDOps[T](rdd: RDD[T]) {
def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = {
val passes = rdd.filter(f)
val fails = rdd.filter(e => !f(e)) // Spark doesn''t have filterNot
(passes, fails)
}
}
val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)
Pero tan pronto como tenga varios tipos de datos, simplemente asigne el filtrado a un nuevo valor.