python - ¿Cómo exportar un marco de datos de tabla en PySpark a csv?
sql apache-spark (5)
Estoy usando Spark 1.3.1 (PySpark) y he generado una tabla usando una consulta SQL.
Ahora tengo un objeto que es un
DataFrame
.
Quiero exportar este objeto
DataFrame
(lo he llamado "tabla") a un archivo csv para poder manipularlo y trazar las columnas.
¿Cómo exporto la "tabla" de
DataFrame
a un archivo csv?
¡Gracias!
¿Qué tal esto (en que no quieres un trazador de líneas)?
for row in df.collect():
d = row.asDict()
s = "%d/t%s/t%s/n" % (d["int_column"], d["string_column"], d["string_column"])
f.write(s)
f es un descriptor de archivo abierto. Además, el separador es un TAB char, pero es fácil de cambiar a lo que quieras.
Debe volver a particionar el Dataframe en una sola partición y luego definir el formato, la ruta y otros parámetros del archivo en formato de sistema de archivos Unix y aquí tiene,
df.repartition(1).write.format(''com.databricks.spark.csv'').save("/path/to/file/myfile.csv",header = ''true'')
Lea más sobre la función de repartición Lea más sobre la función de guardar
Sin embargo, la repartición es una función costosa y toPandas () es peor. Intente usar .coalesce (1) en lugar de .repartition (1) en la sintaxis anterior para un mejor rendimiento.
Lea más sobre las funciones de repartición vs fusión .
Para Apache Spark 2+, para guardar el marco de datos en un solo archivo csv. Utilice el siguiente comando
query.repartition(1).write.csv("cc_out.csv", sep=''|'')
Aquí indico que necesito solo una partición de csv. Puede cambiarlo según sus requisitos.
Si el marco de datos cabe en la memoria de un controlador y desea guardarlo en el sistema de archivos local, puede convertir
Spark DataFrame
en
Pandas DataFrame
local usando el método
toPandas
y luego simplemente usar
to_csv
:
df.toPandas().to_csv(''mycsv.csv'')
De lo contrario, puede usar spark-csv :
-
Chispa 1.3
df.save(''mycsv.csv'', ''com.databricks.spark.csv'')
-
Spark 1.4+
df.write.format(''com.databricks.spark.csv'').save(''mycsv.csv'')
En Spark 2.0+ puede usar la fuente de datos
csv
directamente:
df.write.csv(''mycsv.csv'')
Si no puede usar spark-csv, puede hacer lo siguiente:
df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
Si necesita manejar cadenas con saltos de línea o coma, eso no funcionará. Utilizar esta:
import csv
import cStringIO
def row2csv(row):
buffer = cStringIO.StringIO()
writer = csv.writer(buffer)
writer.writerow([str(s).encode("utf-8") for s in row])
buffer.seek(0)
return buffer.read().strip()
df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")