tutorial spark software learn for data big scala apache-spark

software - spark scala tutorial



Spark transmitiendo DStream RDD para obtener el nombre del archivo (2)

Spark streaming textFileStream y fileStream pueden monitorear un directorio y procesar los nuevos archivos en un DDR RDD.

¿Cómo obtener los nombres de archivo que DStream RDD está procesando en ese intervalo particular?


fileStream produce UnionRDD de NewHadoopRDD s. Lo bueno de NewHadoopRDD s creado por sc.newAPIHadoopFile es que sus name se establecen en sus rutas.

Aquí está el ejemplo de lo que puedes hacer con ese conocimiento:

def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] = ssc.fileStream[LongWritable, Text, TextInputFormat](directory) .transform( rdd => new UnionRDD(rdd.context, rdd.dependencies.map( dep => dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name) ) ) ) def transformByFile[U: ClassTag](unionrdd: RDD[String], transformFunc: String => RDD[String] => RDD[U]): RDD[U] = { new UnionRDD(unionrdd.context, unionrdd.dependencies.map{ dep => if (dep.rdd.isEmpty) None else { val filename = dep.rdd.name Some( transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]]) .setName(filename) ) } }.flatten ) } def main(args: Array[String]) = { val conf = new SparkConf() .setAppName("Process by file") .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(30)) val dstream = namesTextFileStream(ssc, "/some/directory") def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] = rdd.map(line => (filename, line)) val transformed = dstream. transform(rdd => transformByFile(rdd, byFileTransformer)) // Do some stuff with transformed ssc.start() ssc.awaitTermination() }


Para aquellos que quieren un poco de código Java en lugar de Scala:

JavaPairInputDStream<LongWritable, Text> textFileStream = jsc.fileStream( inputPath, LongWritable.class, Text.class, TextInputFormat.class, FileInputDStream::defaultFilter, false ); JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> { UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd(); List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava(); List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> { if (depRdd.isEmpty()) { return null; } JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD(); String filename = depRdd.name(); JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename); return newDep.rdd(); }).filter(t -> t != null).collect(Collectors.toList()); Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq(); ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class); return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD(); });