tutorial spark que leer con archivo scala apache-spark spark-streaming

scala - leer - que es apache spark



Cómo guardar datos RDD en archivos json, no en carpetas (2)

AFAIK no hay opción para guardarlo como un archivo. Debido a que es un marco de procesamiento distribuido y no es una buena práctica, escriba en un solo archivo en lugar de que cada partición escriba sus propios archivos en la ruta especificada.

Solo podemos pasar el directorio de salida donde queríamos guardar los datos. OutputWriter creará archivo (s) (depende de las particiones) dentro de la ruta especificada con prefijo de nombre de archivo part- .

Estoy recibiendo los datos de transmisión myDStream ( DStream[String] ) que quiero guardar en S3 (básicamente, para esta pregunta, no importa dónde exactamente quiero guardar los resultados, pero lo menciono por si acaso )

El siguiente código funciona bien, pero guarda las carpetas con los nombres como jsonFile-19-45-46.json , y luego dentro de las carpetas guarda los archivos _SUCCESS y part-00000 .

¿Es posible guardar cada dato RDD[String] (estos son cadenas JSON) en los archivos JSON, no en las carpetas? Pensé que la repartition(1) tenía que hacer este truco, pero no fue así.

myDStream.foreachRDD { rdd => // datetimeString = .... rdd.repartition(1).saveAsTextFile("s3n://mybucket/keys/jsonFile-"+datetimeString+".json") }


Como alternativa a rdd.collect.mkString("/n") puede usar la biblioteca del sistema de archivos hadoop para limpiar la salida moviendo el archivo part-00000 a su lugar. El código siguiente funciona perfectamente en el sistema de archivos local y HDFS, pero no puedo probarlo con S3:

val outputPath = "path/to/some/file.json" rdd.saveAsTextFile(outputPath + "-tmp") import org.apache.hadoop.fs.Path val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration) fs.rename(new Path(outputPath + "-tmp/part-00000"), new Path(outputPath)) fs.delete(new Path(outputPath + "-tmp"), true)