file csv hadoop apache-spark distributed-computing

file - Cómo escribir a CSV en Spark



hadoop apache-spark (6)

Extendiendo la respuesta de a Spark 2.x y Scala 2.11

Usando Spark SQL podemos hacer esto en un trazador de líneas

//implicits for magic functions like .toDf import spark.implicits._ val df = Seq( ("first", 2.0), ("choose", 7.0), ("test", 1.5) ).toDF("name", "vals") //write DataFrame/DataSet to external storage df.write .format("csv") .save("csv/file/location")

Entonces puedes ir a la cabeza y proceder con la respuesta de .

Estoy tratando de encontrar una forma efectiva de guardar el resultado de mi Spark Job como un archivo csv. Estoy usando Spark con Hadoop y hasta ahora todos mis archivos están guardados como part-00000 .

¿Alguna idea de cómo hacer que mi chispa ahorre para archivar con un nombre de archivo específico?


Dado que Spark usa la API Hadoop File System para escribir datos en archivos, esto es inevitable. Si lo haces

rdd.saveAsTextFile("foo")

Se guardará como " foo/part-XXXXX " con un archivo de una parte * de cada partición en el RDD que está intentando guardar. La razón por la cual cada partición en el RDD se escribe en un archivo separado es para la tolerancia a fallas. Si la tarea de escribir la 3ra partición (es decir, a la part-00002 ) falla, Spark simplemente vuelve a ejecutar la tarea y sobrescribe la parte parcialmente escrita / corrupta part-00002 , sin efecto en otras partes. Si todos escribieron en el mismo archivo, entonces es mucho más difícil recuperar una sola tarea por fallas.

Los archivos part-XXXXX generalmente no son un problema si va a consumirlos nuevamente en marcos basados ​​en Spark / Hadoop porque dado que todos usan la API HDFS, si les pide que lean "foo", todos leerán toda la part-XXXXX archivos dentro de foo también.


Hay another enfoque basado en Hadoop FileSystem ops.


No es realmente una solución limpia, pero dentro de un foreachRDD () básicamente puedes hacer lo que quieras, también crear un nuevo archivo.

En mi solución, esto es lo que hago: foreachRDD el resultado en HDFS (por razones de tolerancia a fallas), y dentro de un foreachRDD también creo un archivo TSV con estadísticas en una carpeta local.

Creo que probablemente puedas hacer lo mismo si eso es lo que necesitas.

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations


Sugeriré hacerlo de esta manera (ejemplo de Java):

theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName); FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder); FileUtil.copyMerge( fs, new Path(textFileName), fs, new Path(textFileNameDestiny), true, fs.getConf(), null);


Tengo una idea, pero no estoy listo fragmento de código. Internamente (como su nombre lo sugiere) Spark usa el formato de salida de Hadoop. (y también InputFormat cuando lee desde HDFS).

En FileOutputFormat de hadoop, está el miembro protegido setOutputFormat , al que puede llamar desde la clase heredada para establecer otro nombre base.