hadoop - true - spark write csv
Cómo exportar datos de Spark SQL a CSV (6)
Con la ayuda de spark-csv podemos escribir en un archivo CSV.
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
Este comando funciona con HiveQL:
insert overwrite directory ''/data/home.csv'' select * from testtable;
Pero con Spark SQL obtengo un error con un rastro de pila org.apache.spark.sql.hive.HiveQl
:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory ''/data/home.csv'' select * from testtable
Guíeme para escribir la exportación a la función CSV en Spark SQL.
Dado que Spark 2.X
spark-csv
está integrado como fuente de datos nativa . Por lo tanto, la declaración necesaria se simplifica a (ventanas)
df.write
.option("header", "true")
.csv("file:///C:/out.csv")
o UNIX
df.write
.option("header", "true")
.csv("/var/out.csv")
El mensaje de error sugiere que esta no es una característica compatible en el lenguaje de consulta. Pero puede guardar un DataFrame en cualquier formato, como de costumbre, a través de la interfaz RDD ( df.rdd.saveAsTextFile
). O puede consultar spark-csv .
La forma más simple es mapear el RDD del DataFrame y usar mkString:
df.rdd.map(x=>x.mkString(","))
A partir de Spark 1.5 (o incluso antes) df.map(r=>r.mkString(","))
haría lo mismo si quiere que CSV escape, puede usar apache commons lang para eso. por ejemplo, aquí está el código que estamos usando
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[//p{C}|//uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
La respuesta anterior con spark-csv es correcta, pero hay un problema: la biblioteca crea varios archivos basados en el particionamiento del marco de datos. Y esto no es lo que generalmente necesitamos. Entonces, puedes combinar todas las particiones en una:
df.coalesce(1).
write.
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
y cambie el nombre de la salida de la lib (nombre "part-00000") a un nombre de archivo deseado.
Esta publicación de blog proporciona más detalles: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
Puede usar la declaración a continuación para escribir el contenido del marco de datos en formato CSV df.write.csv("/data/home/csv")
Si necesita escribir el marco de datos completo en un solo archivo CSV, use df.coalesce(1).write.csv("/data/home/sample.csv")
Para spark 1.x, puede usar spark-csv para escribir los resultados en archivos CSV
A continuación, el fragmento de Scala ayudaría
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")
Para escribir los contenidos en un solo archivo
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")