python - sqlcontext - spark write csv
Cargar archivo CSV con Spark (10)
Soy nuevo en Spark y estoy intentando leer datos CSV de un archivo con Spark. Esto es lo que estoy haciendo:
sc.textFile(''file.csv'')
.map(lambda line: (line.split('','')[0], line.split('','')[1]))
.collect()
Esperaría que esta llamada me diera una lista de las dos primeras columnas de mi archivo pero recibo este error:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
aunque mi archivo CSV como más de una columna.
¿Estás seguro de que todas las líneas tienen al menos 2 columnas? ¿Puedes probar algo así como solo para comprobar ?:
sc.textFile("file.csv") /
.map(lambda line: line.split(",")) /
.filter(lambda line: len(line)>1) /
.map(lambda line: (line[0],line[1])) /
.collect()
Alternativamente, puede imprimir el culpable (si corresponde):
sc.textFile("file.csv") /
.map(lambda line: line.split(",")) /
.filter(lambda line: len(line)<=1) /
.collect()
Ahora, también hay otra opción para cualquier archivo csv general: https://github.com/seahboonsiew/pyspark-csv siguiente manera:
Supongamos que tenemos el siguiente contexto
sc = SparkContext
sqlCtx = SQLContext or HiveContext
Primero, distribuya pyspark-csv.py a los ejecutores que usan SparkContext
import pyspark_csv as pycsv
sc.addPyFile(''pyspark_csv.py'')
Lea datos csv a través de SparkContext y conviértalo en DataFrame
plaintext_rdd = sc.textFile(''hdfs://x.x.x.x/blah.csv'')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
Esto está en línea con lo que JP Mercier sugirió inicialmente sobre el uso de pandas, pero con una modificación importante: si lees los datos en pandas en trozos, debería ser más maleable. Lo que significa que puede analizar un archivo mucho más grande que el que los Pandas pueden manejar como una sola pieza y pasarlo a Spark en tamaños más pequeños. (Esto también responde al comentario sobre por qué uno querría usar Spark si pueden cargar todo en Pandas de todos modos).
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext(''local'',''example'') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
Si desea cargar csv como un marco de datos, puede hacer lo siguiente:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format(''com.databricks.spark.csv'') /
.options(header=''true'', inferschema=''true'') /
.load(''sampleFile.csv'') # this is your csv file
Funcionó bien para mí
Si su información csv no contiene nuevas líneas en ninguno de los campos, puede cargar sus datos con textFile()
y analizarlos
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
Simplemente dividir por comas también dividirá las comas que están dentro de los campos (por ejemplo a,b,"1,2,3",c
), por lo que no se recomienda. La respuesta de zero323 es buena si quiere usar la API de DataFrames, pero si quiere apegarse a Spark base, puede analizar csvs en Python base con el módulo csv :
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
EDITAR: como @muon se menciona en los comentarios, esto tratará el encabezado como cualquier otra fila, por lo que deberá extraerlo manualmente. Por ejemplo, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(asegúrese de no modificar el header
antes de que el filtro lo evalúe). Pero en este punto, probablemente sea mejor usar un analizador csv incorporado.
Y aún otra opción que consiste en leer el archivo CSV usando Pandas y luego importar el Pandas DataFrame en Spark.
Por ejemplo:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext(''local'',''example'') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv(''file.csv'') # assuming the file contains a header
# pandas_df = pd.read_csv(''file.csv'', names = [''column 1'',''column 2'']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
Spark 2.0.0+
Puede usar la fuente de datos csv integrada directamente:
spark.read.csv(
"some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
o
(spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv"))
sin incluir ninguna dependencia externa.
Spark <2.0.0 :
En lugar de un análisis manual, que no es trivial en un caso general, recomendaría spark-csv
:
Asegúrese de que Spark CSV esté incluido en la ruta ( --packages
, --jars
, --driver-class-path
)
Y carga tus datos de la siguiente manera:
(df = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
Puede manejar la carga, la inferencia de esquema, descartar líneas mal formadas y no requiere pasar datos de Python a la JVM.
Nota :
Si conoce el esquema, es mejor evitar la inferencia de esquema y pasarlo a DataFrameReader
. Suponiendo que tiene tres columnas: entero, doble y cadena:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
from pyspark.sql import SparkSession
spark = SparkSession /
.builder /
.appName("Python Spark SQL basic example") /
.config("spark.some.config.option", "some-value") /
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");
print(df.collect())
import pandas as pd
data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")