write sqlcontext spark read example cargar python csv apache-spark pyspark

python - sqlcontext - spark write csv



Cargar archivo CSV con Spark (10)

Soy nuevo en Spark y estoy intentando leer datos CSV de un archivo con Spark. Esto es lo que estoy haciendo:

sc.textFile(''file.csv'') .map(lambda line: (line.split('','')[0], line.split('','')[1])) .collect()

Esperaría que esta llamada me diera una lista de las dos primeras columnas de mi archivo pero recibo este error:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda> IndexError: list index out of range

aunque mi archivo CSV como más de una columna.


¿Estás seguro de que todas las líneas tienen al menos 2 columnas? ¿Puedes probar algo así como solo para comprobar ?:

sc.textFile("file.csv") / .map(lambda line: line.split(",")) / .filter(lambda line: len(line)>1) / .map(lambda line: (line[0],line[1])) / .collect()

Alternativamente, puede imprimir el culpable (si corresponde):

sc.textFile("file.csv") / .map(lambda line: line.split(",")) / .filter(lambda line: len(line)<=1) / .collect()


Ahora, también hay otra opción para cualquier archivo csv general: https://github.com/seahboonsiew/pyspark-csv siguiente manera:

Supongamos que tenemos el siguiente contexto

sc = SparkContext sqlCtx = SQLContext or HiveContext

Primero, distribuya pyspark-csv.py a los ejecutores que usan SparkContext

import pyspark_csv as pycsv sc.addPyFile(''pyspark_csv.py'')

Lea datos csv a través de SparkContext y conviértalo en DataFrame

plaintext_rdd = sc.textFile(''hdfs://x.x.x.x/blah.csv'') dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)


Esto está en línea con lo que JP Mercier sugirió inicialmente sobre el uso de pandas, pero con una modificación importante: si lees los datos en pandas en trozos, debería ser más maleable. Lo que significa que puede analizar un archivo mucho más grande que el que los Pandas pueden manejar como una sola pieza y pasarlo a Spark en tamaños más pequeños. (Esto también responde al comentario sobre por qué uno querría usar Spark si pueden cargar todo en Pandas de todos modos).

from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext(''local'',''example'') # if using locally sql_sc = SQLContext(sc) Spark_Full = sc.emptyRDD() chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000) # if you have headers in your csv file: headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns) for chunky in chunk_100k: Spark_Full += sc.parallelize(chunky.values.tolist()) YourSparkDataFrame = Spark_Full.toDF(headers) # if you do not have headers, leave empty instead: # YourSparkDataFrame = Spark_Full.toDF() YourSparkDataFrame.show()


Si desea cargar csv como un marco de datos, puede hacer lo siguiente:

from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format(''com.databricks.spark.csv'') / .options(header=''true'', inferschema=''true'') / .load(''sampleFile.csv'') # this is your csv file

Funcionó bien para mí


Si su información csv no contiene nuevas líneas en ninguno de los campos, puede cargar sus datos con textFile() y analizarlos

import csv import StringIO def loadRecord(line): input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name1", "name2"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord)


Simplemente dividir por comas también dividirá las comas que están dentro de los campos (por ejemplo a,b,"1,2,3",c ), por lo que no se recomienda. La respuesta de zero323 es buena si quiere usar la API de DataFrames, pero si quiere apegarse a Spark base, puede analizar csvs en Python base con el módulo csv :

# works for both python 2 and 3 import csv rdd = sc.textFile("file.csv") rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDITAR: como @muon se menciona en los comentarios, esto tratará el encabezado como cualquier otra fila, por lo que deberá extraerlo manualmente. Por ejemplo, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (asegúrese de no modificar el header antes de que el filtro lo evalúe). Pero en este punto, probablemente sea mejor usar un analizador csv incorporado.


Y aún otra opción que consiste en leer el archivo CSV usando Pandas y luego importar el Pandas DataFrame en Spark.

Por ejemplo:

from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext(''local'',''example'') # if using locally sql_sc = SQLContext(sc) pandas_df = pd.read_csv(''file.csv'') # assuming the file contains a header # pandas_df = pd.read_csv(''file.csv'', names = [''column 1'',''column 2'']) # if no header s_df = sql_sc.createDataFrame(pandas_df)


Spark 2.0.0+

Puede usar la fuente de datos csv integrada directamente:

spark.read.csv( "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema )

o

(spark.read .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .csv("some_input_file.csv"))

sin incluir ninguna dependencia externa.

Spark <2.0.0 :

En lugar de un análisis manual, que no es trivial en un caso general, recomendaría spark-csv :

Asegúrese de que Spark CSV esté incluido en la ruta ( --packages , --jars , --driver-class-path )

Y carga tus datos de la siguiente manera:

(df = sqlContext .read.format("com.databricks.spark.csv") .option("header", "true") .option("inferschema", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv"))

Puede manejar la carga, la inferencia de esquema, descartar líneas mal formadas y no requiere pasar datos de Python a la JVM.

Nota :

Si conoce el esquema, es mejor evitar la inferencia de esquema y pasarlo a DataFrameReader . Suponiendo que tiene tres columnas: entero, doble y cadena:

from pyspark.sql.types import StructType, StructField from pyspark.sql.types import DoubleType, IntegerType, StringType schema = StructType([ StructField("A", IntegerType()), StructField("B", DoubleType()), StructField("C", StringType()) ]) (sqlContext .read .format("com.databricks.spark.csv") .schema(schema) .option("header", "true") .option("mode", "DROPMALFORMED") .load("some_input_file.csv"))


from pyspark.sql import SparkSession spark = SparkSession / .builder / .appName("Python Spark SQL basic example") / .config("spark.some.config.option", "some-value") / .getOrCreate() df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|"); print(df.collect())


import pandas as pd data1 = pd.read_csv("test1.csv") data2 = pd.read_csv("train1.csv")