file - Cómo escribir a CSV en Spark
hadoop apache-spark (6)
Extendiendo la respuesta de a Spark 2.x y Scala 2.11
Usando Spark SQL podemos hacer esto en un trazador de líneas
//implicits for magic functions like .toDf
import spark.implicits._
val df = Seq(
("first", 2.0),
("choose", 7.0),
("test", 1.5)
).toDF("name", "vals")
//write DataFrame/DataSet to external storage
df.write
.format("csv")
.save("csv/file/location")
Entonces puedes ir a la cabeza y proceder con la respuesta de .
Estoy tratando de encontrar una forma efectiva de guardar el resultado de mi Spark Job como un archivo csv. Estoy usando Spark con Hadoop y hasta ahora todos mis archivos están guardados como part-00000
.
¿Alguna idea de cómo hacer que mi chispa ahorre para archivar con un nombre de archivo específico?
Dado que Spark usa la API Hadoop File System para escribir datos en archivos, esto es inevitable. Si lo haces
rdd.saveAsTextFile("foo")
Se guardará como " foo/part-XXXXX
" con un archivo de una parte * de cada partición en el RDD que está intentando guardar. La razón por la cual cada partición en el RDD se escribe en un archivo separado es para la tolerancia a fallas. Si la tarea de escribir la 3ra partición (es decir, a la part-00002
) falla, Spark simplemente vuelve a ejecutar la tarea y sobrescribe la parte parcialmente escrita / corrupta part-00002
, sin efecto en otras partes. Si todos escribieron en el mismo archivo, entonces es mucho más difícil recuperar una sola tarea por fallas.
Los archivos part-XXXXX
generalmente no son un problema si va a consumirlos nuevamente en marcos basados en Spark / Hadoop porque dado que todos usan la API HDFS, si les pide que lean "foo", todos leerán toda la part-XXXXX
archivos dentro de foo también.
Hay another enfoque basado en Hadoop FileSystem ops.
No es realmente una solución limpia, pero dentro de un foreachRDD
() básicamente puedes hacer lo que quieras, también crear un nuevo archivo.
En mi solución, esto es lo que hago: foreachRDD
el resultado en HDFS (por razones de tolerancia a fallas), y dentro de un foreachRDD
también creo un archivo TSV con estadísticas en una carpeta local.
Creo que probablemente puedas hacer lo mismo si eso es lo que necesitas.
http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations
Sugeriré hacerlo de esta manera (ejemplo de Java):
theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
fs, new Path(textFileName),
fs, new Path(textFileNameDestiny),
true, fs.getConf(), null);
Tengo una idea, pero no estoy listo fragmento de código. Internamente (como su nombre lo sugiere) Spark usa el formato de salida de Hadoop. (y también InputFormat
cuando lee desde HDFS).
En FileOutputFormat
de hadoop, está el miembro protegido setOutputFormat
, al que puede llamar desde la clase heredada para establecer otro nombre base.