scala apache-spark functional-programming scala-collections

scala - La función devuelve una lista vacía en Spark



apache-spark functional-programming (1)

Ocurre porque filesInZip no se comparte entre los trabajadores. foreach opera en una copia local de filesInZip y cuando termina esta copia simplemente se descarta y se recolecta la basura. Si desea conservar los resultados, debe usar la transformación (muy probablemente un flatMap ) y devolver los valores agregados recopilados.

def listFiles(stream: PortableDataStream): TraversableOnce[String] = ??? zipInputStream.flatMap(listFiles)

Puede obtener más información de Comprender los cierres

A continuación se muestra el código para obtener la lista de nombres de archivo en un archivo comprimido

def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = { val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open)) val filesInZip = new ArrayBuffer[String]() var ze : Option[ZipEntry] = None zipInputStream.foreach(stream =>{ do{ ze = Option(stream.getNextEntry); ze.foreach{ze => if(ze.getName.endsWith("java") && !ze.isDirectory()){ var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java")) filesInZip += fileName } } stream.closeEntry() } while(ze.isDefined) println(filesInZip.toList.length) // print 889 (correct) }) println(filesInZip.toList.length) // print 0 (WHY..?) (filesInZip.toList) }

Ejecuto el código anterior de la siguiente manera:

scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip") zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25 scala> getListOfFilesInRepo(zipRDD) 889 0 res12: List[String] = List()

¿Por qué no obtengo 889 y en su lugar obtengo 0?