scala - significado - rdd spark
Stackoverflow debido al largo linaje RDD (1)
En general, puede usar puntos de control para romper linajes largos. Algunos más o menos similares a esto deberían funcionar:
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
val checkpointInterval: Int = ???
def loadAndFilter(path: String) = sc.textFile(path)
.filter(_.startsWith("#####"))
.map((path, _))
def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
(acc: RDD[T], xi: (RDD[T], Int)) = {
if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
else xi._1.union(acc)
}
val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)]
fileList.map(loadAndFilter).zipWithIndex
.foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))
En esta situación particular, una solución mucho más simple debería ser usar el método SparkContext.union
:
val masterRDD = sc.union(
fileList.map(path => sc.textFile(path)
.filter(_.startsWith("#####"))
.map((path, _)))
)
Una diferencia entre estos métodos debería ser obvia cuando eche un vistazo al DAG generado por loop / reduce
:
y una sola union
:
Por supuesto, si los archivos son pequeños, puede combinar wholeTextFiles
con flatMap
y leer todos los archivos a la vez:
sc.wholeTextFiles(fileList.mkString(","))
.flatMap{case (path, text) =>
text.split("/n").filter(_.startsWith("#####")).map((path, _))}
Tengo miles de archivos pequeños en HDFS. Necesita procesar un subconjunto de archivos un poco más pequeño (que de nuevo es miles), fileList contiene una lista de rutas de archivos que deben procesarse.
// fileList == list of filepaths in HDFS
var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD
for (i <- 0 to fileList.size() - 1) {
val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line))
masterRDD = masterRDD.union(sampleRDD)
}
masterRDD.first()
// Una vez fuera del ciclo, realizar cualquier acción da como resultado un error de stackoverflow debido al largo linaje de RDD
Exception in thread "main" java.lang.StackOverflowError
at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
=====================================================================
=====================================================================
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)