write true spark read example scala csv apache-spark spark-csv

scala - true - spark write csv



Escribir un solo archivo CSV usando spark-csv (9)

Estoy usando https://github.com/databricks/spark-csv , estoy tratando de escribir un solo CSV, pero no puedo, está haciendo una carpeta.

Necesita una función Scala que tomará parámetros como ruta y nombre de archivo y escribirá ese archivo CSV.


Está creando una carpeta con múltiples archivos, porque cada partición se guarda individualmente. Si necesita un solo archivo de salida (todavía en una carpeta), puede repartition (se prefiere si los datos ascendentes son grandes, pero requiere una combinación aleatoria):

df .repartition(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv")

o coalesce :

df .coalesce(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv")

marco de datos antes de guardar:

Todos los datos se escribirán en mydata.csv/part-00000 . Antes de utilizar esta opción, asegúrese de comprender qué está sucediendo y cuál es el costo de transferir todos los datos a un solo trabajador . Si usa un sistema de archivos distribuido con replicación, los datos se transferirán varias veces: primero se obtienen a un solo trabajador y luego se distribuyen a través de nodos de almacenamiento.

Alternativamente, puede dejar su código como está y usar herramientas de propósito general como cat o HDFS getmerge para simplemente fusionar todas las partes después.


Hay una forma más de usar Java

import java.io._ def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f); try { op(p) } finally { p.close() } } printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}


La API df.write() de df.write() creará múltiples archivos de partes dentro de la ruta dada ... para forzar que spark escriba solo un archivo de partes, use df.coalesce(1).write.csv(...) lugar de df.repartition(1).write.csv(...) ya que la fusión es una transformación estrecha, mientras que la repartición es una transformación amplia, vea Spark - repartition () vs coalesce ()

df.coalesce(1).write.csv(filepath,header=True)

creará la carpeta en la part-0001-...-c000.csv archivo dada con un uso de archivo de part-0001-...-c000.csv

cat filepath/part-0001-...-c000.csv > filename_you_want.csv

tener un nombre de archivo fácil de usar


Podría llegar un poco tarde al juego aquí, pero el uso de la coalesce(1) o repartition(1) puede funcionar para conjuntos de datos pequeños, pero los conjuntos de datos grandes se arrojarían a una partición en un nodo. Es probable que esto arroje errores OOM o, en el mejor de los casos, que se procese lentamente.

Recomiendo encarecidamente que utilice la función FileUtil.copyMerge() de la API de Hadoop. Esto combinará las salidas en un solo archivo.

EDITAR - Esto efectivamente lleva los datos al controlador en lugar de a un nodo ejecutor. Coalesce() estaría bien si un solo ejecutor tiene más RAM para usar que el controlador.

EDIT 2: copyMerge() se está eliminando en Hadoop 3.0. Consulte el siguiente artículo de desbordamiento de pila para obtener más información sobre cómo trabajar con la versión más reciente: Hadoop cómo hacer CopyMerge en Hadoop 3.0


Si está ejecutando Spark con HDFS, he estado resolviendo el problema escribiendo archivos csv normalmente y aprovechando HDFS para hacer la fusión. Lo estoy haciendo en Spark (1.6) directamente:

import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) // the "true" setting deletes the source files once they are merged into the new output } val newData = << create your dataframe >> val outputfile = "/user/feeds/project/outputs/subject" var filename = "myinsights" var outputFileName = outputfile + "/temp_" + filename var mergedFileName = outputfile + "/merged_" + filename var mergeFindGlob = outputFileName newData.write .format("com.databricks.spark.csv") .option("header", "false") .mode("overwrite") .save(outputFileName) merge(mergeFindGlob, mergedFileName ) newData.unpersist()

No recuerdo dónde aprendí este truco, pero podría funcionar para ti.


Si está utilizando Databricks y puede ajustar todos los datos en la RAM en un trabajador (y, por lo tanto, puede usar .coalesce(1) ), puede usar dbfs para buscar y mover el archivo CSV resultante:

val fileprefix= "/mnt/aws/path/file-prefix" dataset .coalesce(1) .write //.mode("overwrite") // I usually don''t use this, but you may want to. .option("header", "true") .option("delimiter","/t") .csv(fileprefix+".tmp") val partition_path = dbutils.fs.ls(fileprefix+".tmp/") .filter(file=>file.name.endsWith(".csv"))(0).path dbutils.fs.cp(partition_path,fileprefix+".tab") dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Si su archivo no cabe en la RAM del trabajador, puede considerar la sugerencia de chaotic3quilibrium de usar FileUtils.copyMerge () . No he hecho esto, y todavía no sé si es posible o no, por ejemplo, en S3.

Esta respuesta se basa en respuestas anteriores a esta pregunta, así como en mis propias pruebas del fragmento de código proporcionado. Originalmente lo publiqué en Databricks y lo vuelvo a publicar aquí.

La mejor documentación para la opción recursiva de dbfs rm que he encontrado está en un foro de Databricks .


Una solución que funciona para S3 modificada de Minkymorgan.

Simplemente pase la ruta temporal del directorio particionado (con un nombre diferente al de la ruta final) como srcPath y csv / txt final único como destPath Especifique también deleteSource si desea eliminar el directorio original.

/** * Merges multiple partitions of spark text file output into single file. * @param srcPath source directory of partitioned files * @param dstPath output path of individual path * @param deleteSource whether or not to delete source directory after merging * @param spark sparkSession */ def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit = { import org.apache.hadoop.fs.FileUtil import java.net.URI val config = spark.sparkContext.hadoopConfiguration val fs: FileSystem = FileSystem.get(new URI(srcPath), config) FileUtil.copyMerge( fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null ) }


puede usar rdd.coalesce(1, true).saveAsTextFile(path)

almacenará datos como archivo individual en la ruta / parte-00000


repartition / coalesce a 1 partición antes de guardar (todavía obtendría una carpeta pero tendría un archivo de parte en ella)