sqlcontext read python csv apache-spark pyspark file-writing

python - sqlcontext - pyspark read csv



Cómo escribir el RDD resultante en un archivo csv en Spark python (3)

Tengo un RDD resultante labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) . Esto tiene salida en este formato:

[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]

Lo que quiero es crear un archivo CSV con una columna para las labels (la primera parte de la tupla en la salida anterior) y una para las predictions (la segunda parte de la salida de la tupla). Pero no sé cómo escribir en un archivo CSV en Spark usando Python.

¿Cómo puedo crear un archivo CSV con la salida anterior?


No es bueno simplemente unir con comas porque si los campos contienen comas, no se citarán correctamente, por ejemplo '',''.join([''a'', ''b'', ''1,2,3'', ''c'']) le da a,b,1,2,3,c cuando querría a,b,"1,2,3",c . En su lugar, debe usar el módulo csv de Python para convertir cada lista en el RDD a una cadena csv con el formato correcto:

# python 3 import csv, io def list_to_csv_str(x): """Given a list of strings, returns a properly-csv-formatted string.""" output = io.StringIO("") csv.writer(output).writerow(x) return output.getvalue().strip() # remove extra newline # ... do stuff with your rdd ... rdd = rdd.map(list_to_csv_str) rdd.saveAsTextFile("output_directory")

Como el módulo csv solo escribe en objetos de archivo, tenemos que crear un "archivo" vacío con io.StringIO("") y decirle a csv.writer que escriba la cadena con formato csv en él. Luego, usamos output.getvalue() para obtener la cadena que acabamos de escribir en el "archivo". Para hacer que este código funcione con Python 2, simplemente reemplace io con el módulo StringIO.

Si está utilizando la API de Spark DataFrames, también puede buscar en la función de guardar DataBricks , que tiene un formato csv.


Sé que este es un post viejo. Pero para ayudar a alguien que busca lo mismo, así es como escribo un RDD de dos columnas en un solo archivo CSV en PySpark 1.6.2

El RDD:

>>> rdd.take(5) [(73342, u''cells''), (62861, u''cell''), (61714, u''studies''), (61377, u''aim''), (60168, u''clinical'')]

Ahora el código:

# First I convert the RDD to dataframe from pyspark import SparkContext df = sqlContext.createDataFrame(rdd, [''count'', ''word''])

El DF:

>>> df.show() +-----+-----------+ |count| word| +-----+-----------+ |73342| cells| |62861| cell| |61714| studies| |61377| aim| |60168| clinical| |59275| 2| |59221| 1| |58274| data| |58087|development| |56579| cancer| |50243| disease| |49817| provided| |49216| specific| |48857| health| |48536| study| |47827| project| |45573|description| |45455| applicant| |44739| program| |44522| patients| +-----+-----------+ only showing top 20 rows

Ahora escribe a CSV

# Write CSV (I have HDFS storage) df.coalesce(1).write.format(''com.databricks.spark.csv'').options(header=''true'').save(''file:///home/username/csv_out'')

PD: Solo soy un principiante que aprende de las publicaciones aquí en . Así que no sé si esta es la mejor manera. ¡Pero funcionó para mí y espero que ayude a alguien!


Simplemente labelsAndPredictions las líneas del RDD ( labelsAndPredictions ) en cadenas (las líneas del CSV) luego use rdd.saveAsTextFile() .

def toCSVLine(data): return '',''.join(str(d) for d in data) lines = labelsAndPredictions.map(toCSVLine) lines.saveAsTextFile(''hdfs://my-node:9000/tmp/labels-and-predictions.csv'')