scala - true - spark write csv
Escribir un solo archivo CSV usando spark-csv (9)
Estoy usando https://github.com/databricks/spark-csv , estoy tratando de escribir un solo CSV, pero no puedo, está haciendo una carpeta.
Necesita una función Scala que tomará parámetros como ruta y nombre de archivo y escribirá ese archivo CSV.
Está creando una carpeta con múltiples archivos, porque cada partición se guarda individualmente.
Si necesita un solo archivo de salida (todavía en una carpeta), puede
repartition
(se prefiere si los datos ascendentes son grandes, pero requiere una combinación aleatoria):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
o
coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
marco de datos antes de guardar:
Todos los datos se escribirán en
mydata.csv/part-00000
.
Antes de utilizar esta opción,
asegúrese de comprender qué está sucediendo y cuál es el costo de transferir todos los datos a un solo trabajador
.
Si usa un sistema de archivos distribuido con replicación, los datos se transferirán varias veces: primero se obtienen a un solo trabajador y luego se distribuyen a través de nodos de almacenamiento.
Alternativamente, puede dejar su código como está y usar herramientas de propósito general como
cat
o
HDFS
getmerge
para simplemente fusionar todas las partes después.
Hay una forma más de usar Java
import java.io._
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit)
{
val p = new java.io.PrintWriter(f);
try { op(p) }
finally { p.close() }
}
printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
La API
df.write()
de
df.write()
creará múltiples archivos de partes dentro de la ruta dada ... para forzar que spark escriba solo un archivo de partes, use
df.coalesce(1).write.csv(...)
lugar de
df.repartition(1).write.csv(...)
ya que la fusión es una transformación estrecha, mientras que la repartición es una transformación amplia, vea
Spark - repartition () vs coalesce ()
df.coalesce(1).write.csv(filepath,header=True)
creará la carpeta en la
part-0001-...-c000.csv
archivo dada con un uso de archivo de
part-0001-...-c000.csv
cat filepath/part-0001-...-c000.csv > filename_you_want.csv
tener un nombre de archivo fácil de usar
Podría llegar un poco tarde al juego aquí, pero el uso de la
coalesce(1)
o
repartition(1)
puede funcionar para conjuntos de datos pequeños, pero los conjuntos de datos grandes se arrojarían a una partición en un nodo.
Es probable que esto arroje errores OOM o, en el mejor de los casos, que se procese lentamente.
Recomiendo encarecidamente que utilice la función
FileUtil.copyMerge()
de la API de Hadoop.
Esto combinará las salidas en un solo archivo.
EDITAR - Esto efectivamente lleva los datos al controlador en lugar de a un nodo ejecutor.
Coalesce()
estaría bien si un solo ejecutor tiene más RAM para usar que el controlador.
EDIT 2:
copyMerge()
se está eliminando en Hadoop 3.0.
Consulte el siguiente artículo de desbordamiento de pila para obtener más información sobre cómo trabajar con la versión más reciente:
Hadoop cómo hacer CopyMerge en Hadoop 3.0
Si está ejecutando Spark con HDFS, he estado resolviendo el problema escribiendo archivos csv normalmente y aprovechando HDFS para hacer la fusión. Lo estoy haciendo en Spark (1.6) directamente:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()
No recuerdo dónde aprendí este truco, pero podría funcionar para ti.
Si está utilizando Databricks y puede ajustar todos los datos en la RAM en un trabajador (y, por lo tanto, puede usar
.coalesce(1)
), puede usar dbfs para buscar y mover el archivo CSV resultante:
val fileprefix= "/mnt/aws/path/file-prefix"
dataset
.coalesce(1)
.write
//.mode("overwrite") // I usually don''t use this, but you may want to.
.option("header", "true")
.option("delimiter","/t")
.csv(fileprefix+".tmp")
val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
.filter(file=>file.name.endsWith(".csv"))(0).path
dbutils.fs.cp(partition_path,fileprefix+".tab")
dbutils.fs.rm(fileprefix+".tmp",recurse=true)
Si su archivo no cabe en la RAM del trabajador, puede considerar la sugerencia de chaotic3quilibrium de usar FileUtils.copyMerge () . No he hecho esto, y todavía no sé si es posible o no, por ejemplo, en S3.
Esta respuesta se basa en respuestas anteriores a esta pregunta, así como en mis propias pruebas del fragmento de código proporcionado. Originalmente lo publiqué en Databricks y lo vuelvo a publicar aquí.
La mejor documentación para la opción recursiva de dbfs rm que he encontrado está en un foro de Databricks .
Una solución que funciona para S3 modificada de Minkymorgan.
Simplemente pase la ruta temporal del directorio particionado (con un nombre diferente al de la ruta final) como
srcPath
y csv / txt final único como
destPath
Especifique también
deleteSource
si desea eliminar el directorio original.
/**
* Merges multiple partitions of spark text file output into single file.
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit = {
import org.apache.hadoop.fs.FileUtil
import java.net.URI
val config = spark.sparkContext.hadoopConfiguration
val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
FileUtil.copyMerge(
fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
)
}
puede usar
rdd.coalesce(1, true).saveAsTextFile(path)
almacenará datos como archivo individual en la ruta / parte-00000
repartition / coalesce a 1 partición antes de guardar (todavía obtendría una carpeta pero tendría un archivo de parte en ella)