scala - significado - Spark: forma eficiente de probar si un RDD está vacío
rdd spark (2)
A partir de RDD.isEmpty() el isEmpty()
es parte de la api de RDD. Una corrección que estaba causando que se isEmpty
se corrigió más tarde en Spark 1.4 .
Para DataFrames puedes hacer:
val df: DataFrame = ...
df.rdd.isEmpty()
Aquí está pegar el código directamente de la implementación de RDD (a partir de 1.4.1).
/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}
No hay un método isEmpty
en los RDD, entonces, ¿cuál es la forma más eficiente de probar si un RDD está vacío?
RDD.isEmpty()
será parte de Spark 1.3.0.
En base a las sugerencias en este hilo de correo apache y más adelante algunos comentarios a esta respuesta, he realizado algunos pequeños experimentos locales. El mejor método es usar take(1).length==0
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.take(1).length == 0
}
Debería ejecutarse en O(1)
excepto cuando el RDD está vacío, en cuyo caso es lineal en el número de particiones.
Gracias a Josh Rosen y Nick Chammas por señalarme esto.
Nota: Esto falla si el RDD es del tipo RDD[Nothing]
por ejemplo, isEmpty(sc.parallelize(Seq()))
, pero es probable que esto no sea un problema en la vida real. isEmpty(sc.parallelize(Seq[Any]()))
funciona bien.
Ediciones:
- Editar 1: método agregado de
take(1)==0
, gracias a los comentarios.
Mi sugerencia original: utilizar mapPartitions
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
}
Debería escalar en el número de particiones y no es tan limpio como take(1)
. Sin embargo, es robusto para los RDD de tipo RDD[Nothing]
.
Experimentos:
Usé este código para los tiempos.
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
En mi máquina local con 3 núcleos de trabajo obtuve estos resultados
Time: 21 Result: false
Time: 75 Result: false
Time: 8664 Result: false
Time: 18266 Result: false
Time: 23836 Result: false
Time: 113 Result: false
Time: 101 Result: false
Time: 68 Result: false
Time: 221 Result: false
Time: 46 Result: false
Time: 79 Result: true
Time: 93 Result: true
Time: 79 Result: true
Time: 100 Result: true
Time: 64 Result: true