started spark significado resilient parallelize lazy getting scala apache-spark rdd

scala - significado - Spark: forma eficiente de probar si un RDD está vacío



rdd spark (2)

A partir de RDD.isEmpty() el isEmpty() es parte de la api de RDD. Una corrección que estaba causando que se isEmpty se corrigió más tarde en Spark 1.4 .

Para DataFrames puedes hacer:

val df: DataFrame = ... df.rdd.isEmpty()

Aquí está pegar el código directamente de la implementación de RDD (a partir de 1.4.1).

/** * @note due to complications in the internal implementation, this method will raise an * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`. * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.) * @return true if and only if the RDD contains no elements at all. Note that an RDD * may be empty even when it has at least 1 partition. */ def isEmpty(): Boolean = withScope { partitions.length == 0 || take(1).length == 0 }

No hay un método isEmpty en los RDD, entonces, ¿cuál es la forma más eficiente de probar si un RDD está vacío?


RDD.isEmpty() será parte de Spark 1.3.0.

En base a las sugerencias en este hilo de correo apache y más adelante algunos comentarios a esta respuesta, he realizado algunos pequeños experimentos locales. El mejor método es usar take(1).length==0 .

def isEmpty[T](rdd : RDD[T]) = { rdd.take(1).length == 0 }

Debería ejecutarse en O(1) excepto cuando el RDD está vacío, en cuyo caso es lineal en el número de particiones.

Gracias a Josh Rosen y Nick Chammas por señalarme esto.

Nota: Esto falla si el RDD es del tipo RDD[Nothing] por ejemplo, isEmpty(sc.parallelize(Seq())) , pero es probable que esto no sea un problema en la vida real. isEmpty(sc.parallelize(Seq[Any]())) funciona bien.

Ediciones:

  • Editar 1: método agregado de take(1)==0 , gracias a los comentarios.

Mi sugerencia original: utilizar mapPartitions .

def isEmpty[T](rdd : RDD[T]) = { rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_) }

Debería escalar en el número de particiones y no es tan limpio como take(1) . Sin embargo, es robusto para los RDD de tipo RDD[Nothing] .

Experimentos:

Usé este código para los tiempos.

def time(n : Long, f : (RDD[Long]) => Boolean): Unit = { val start = System.currentTimeMillis() val rdd = sc.parallelize(1L to n, numSlices = 100) val result = f(rdd) printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result) } time(1000000000L, rdd => rdd.take(1).length == 0L) time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)) time(1000000000L, rdd => rdd.count() == 0L) time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty) time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L) time(1L, rdd => rdd.take(1).length == 0L) time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)) time(1L, rdd => rdd.count() == 0L) time(1L, rdd => rdd.takeSample(true, 1).isEmpty) time(1L, rdd => rdd.fold(0)(_ + _) == 0L) time(0L, rdd => rdd.take(1).length == 0L) time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)) time(0L, rdd => rdd.count() == 0L) time(0L, rdd => rdd.takeSample(true, 1).isEmpty) time(0L, rdd => rdd.fold(0)(_ + _) == 0L)

En mi máquina local con 3 núcleos de trabajo obtuve estos resultados

Time: 21 Result: false Time: 75 Result: false Time: 8664 Result: false Time: 18266 Result: false Time: 23836 Result: false Time: 113 Result: false Time: 101 Result: false Time: 68 Result: false Time: 221 Result: false Time: 46 Result: false Time: 79 Result: true Time: 93 Result: true Time: 79 Result: true Time: 100 Result: true Time: 64 Result: true