scala - read - spark write csv
Fusionar archivos de salida CSV Spark con un solo encabezado (5)
- Genere el encabezado utilizando dataframe.schema (val header = dataDF.schema.fieldNames.reduce (_ + "," + _))
- crear un archivo con el encabezado en dsefs
- agregue todos los archivos de partición (sin encabezado) al archivo en el # 2 utilizando la API del sistema de archivos hadoop
Quiero crear una canalización de procesamiento de datos en AWS para usar los datos procesados para el aprendizaje automático.
Tengo un script de Scala que toma datos sin procesar de S3, los procesa y los escribe en HDFS o incluso en S3 con Spark-CSV . Creo que puedo usar múltiples archivos como entrada si quiero usar la herramienta AWS Machine Learning para entrenar un modelo de predicción. Pero si quiero usar otra cosa, supongo que es mejor si recibo un solo archivo de salida CSV.
Actualmente, como no quiero usar repartition (1) ni coalesce (1) para propósitos de rendimiento, he usado hadoop fs -getmerge para pruebas manuales, pero como simplemente combina el contenido de los archivos de salida del trabajo, me encuentro un pequeño problema. Necesito una sola fila de encabezados en el archivo de datos para entrenar el modelo de predicción.
Si uso .option("header","true")
para el spark-csv, escribe los encabezados en cada archivo de salida y, después de fusionar, tengo tantas líneas de encabezados en los datos como archivos de salida. Pero si la opción del encabezado es falsa, entonces no agrega ningún encabezado.
Ahora encontré una opción para combinar los archivos dentro del script de Scala con Hadoop API FileUtil.copyMerge
. Intenté esto en spark-shell
con el código de abajo.
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
Pero esta solución aún concatena los archivos uno encima del otro y no maneja encabezados. ¿Cómo puedo obtener un archivo de salida con solo una fila de encabezados?
Incluso intenté agregar df.columns.mkString(",")
como el último argumento para copyMerge
, pero esto agregó los encabezados aún varias veces, no una vez.
Intente especificar el esquema del encabezado y lea todo el archivo de la carpeta con la opción drop malformado de spark-csv. Esto debería permitirle leer todos los archivos en la carpeta manteniendo solo los encabezados (porque usted elimina el formato incorrecto). Ejemplo:
val headerSchema = List(
StructField("example1", StringType, true),
StructField("example2", StringType, true),
StructField("example3", StringType, true)
)
val header_DF =sqlCtx.read
.option("delimiter", ",")
.option("header", "false")
.option("mode","DROPMALFORMED")
.option("inferSchema","false")
.schema(StructType(headerSchema))
.format("com.databricks.spark.csv")
.load("folder containg the files")
En header_DF solo tendrá las filas de los encabezados, a partir de esto puede realizar la estructura de datos de la forma que necesite.
Para combinar archivos en una carpeta en un archivo:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}
Si desea combinar todos los archivos en un solo archivo, pero aún en la misma carpeta ( pero esto trae todos los datos al nodo del controlador):
dataFrame
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)
Otra solución sería usar la solución # 2 y luego mover el archivo dentro de la carpeta a otra ruta (con el nombre de nuestro archivo CSV).
def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {
val tmpDir = "tmpDir"
df.repartition(1)
.write
.format("com.databricks.spark.csv")
.option("header", header.toString)
.option("delimiter", sep)
.save(tmpDir)
val dir = new File(tmpDir)
val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"
(new File(tmpCsvFile)).renameTo(new File(fileName))
dir.listFiles.foreach( f => f.delete )
dir.delete
}
Puedes caminar así.
- 1.Cree un nuevo DataFrame (headerDF) que contenga los nombres de los encabezados.
- 2.Union con el DataFrame (dataDF) que contiene los datos.
- 3. Salga del DataFrame de unión al disco con la opción ("encabezado", "falso") .
- 4.merge los archivos de partición (part-0000 ** 0.csv) usando hadoop FileUtil
De esta manera, todas las particiones no tienen encabezado, a excepción de que el contenido de una sola partición tiene una fila de nombres de encabezado desde headerDF. Cuando todas las particiones se fusionan, hay un solo encabezado en la parte superior del archivo. Código de muestra son los siguientes
//dataFrame is the data to save on disk
//cast types of all columns to String
val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)
//create a new data frame containing only header names
import scala.collection.JavaConverters._
val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
//merge header names with data
headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)
//use hadoop FileUtil to merge all partition csv files into a single file
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
// Convert JavaRDD to CSV and save as text file
outputDataframe.write()
.format("com.databricks.spark.csv")
// Header => true, will enable to have header in each file
.option("header", "true")
Siga el enlace con la prueba de integración sobre cómo escribir un solo encabezado
http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/