started spark significado parallelize getting scala apache-spark rdd

scala - significado - rdd spark



Stackoverflow debido al largo linaje RDD (1)

En general, puede usar puntos de control para romper linajes largos. Algunos más o menos similares a esto deberían funcionar:

import org.apache.spark.rdd.RDD import scala.reflect.ClassTag val checkpointInterval: Int = ??? def loadAndFilter(path: String) = sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _)) def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) (acc: RDD[T], xi: (RDD[T], Int)) = { if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint else xi._1.union(acc) } val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] fileList.map(loadAndFilter).zipWithIndex .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))

En esta situación particular, una solución mucho más simple debería ser usar el método SparkContext.union :

val masterRDD = sc.union( fileList.map(path => sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _))) )

Una diferencia entre estos métodos debería ser obvia cuando eche un vistazo al DAG generado por loop / reduce :

y una sola union :

Por supuesto, si los archivos son pequeños, puede combinar wholeTextFiles con flatMap y leer todos los archivos a la vez:

sc.wholeTextFiles(fileList.mkString(",")) .flatMap{case (path, text) => text.split("/n").filter(_.startsWith("#####")).map((path, _))}

Tengo miles de archivos pequeños en HDFS. Necesita procesar un subconjunto de archivos un poco más pequeño (que de nuevo es miles), fileList contiene una lista de rutas de archivos que deben procesarse.

// fileList == list of filepaths in HDFS var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD for (i <- 0 to fileList.size() - 1) { val filePath = fileStatus.get(i) val fileRDD = sparkContext.textFile(filePath) val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line)) masterRDD = masterRDD.union(sampleRDD) } masterRDD.first()

// Una vez fuera del ciclo, realizar cualquier acción da como resultado un error de stackoverflow debido al largo linaje de RDD

Exception in thread "main" java.lang.StackOverflowError at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12) at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) ===================================================================== ===================================================================== at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)