with tutorial spark example espaƱol data apache-spark pyspark

apache-spark - tutorial - spark python example



Obtener CSV a Spark dataframe (8)

Estoy usando python en Spark y me gustaría obtener un csv en un marco de datos.

La documentation para Spark SQL de forma extraña no proporciona explicaciones para CSV como fuente.

Encontré Spark-CSV , sin embargo tengo problemas con dos partes de la documentación:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" ¿Realmente necesito agregar este argumento cada vez que lanzo pyspark o spark-submit? Parece muy poco elegante. ¿No hay una manera de importarlo en python en lugar de volver a descargarlo cada vez?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv") Incluso si hago lo anterior, esto no funcionará. ¿Qué significa el argumento "fuente" en esta línea de código? ¿Cómo puedo simplemente cargar un archivo local en Linux, decir "/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?


Basado en la respuesta de Aravind, pero mucho más corto, por ejemplo:

lines = sc.textFile("/path/to/file").map(lambda x: x.split(",")) df = lines.toDF(["year", "month", "day", "count"])


Con las versiones más recientes de Spark (a partir de, creo, 1.4) esto se ha vuelto mucho más fácil. La expresión sqlContext.read le proporciona una instancia de DataFrameReader , con un método .csv() :

df = sqlContext.read.csv("/path/to/your.csv")

Tenga en cuenta que también puede indicar que el archivo csv tiene un encabezado agregando el argumento de palabra clave header=True a la llamada .csv() . Un puñado de otras opciones están disponibles, y se describen en el enlace de arriba.


Lea el archivo csv en un RDD y luego genere un RowRDD a partir del RDD original.

Cree el esquema representado por un StructType que coincida con la estructura de las filas en el RDD creado en el Paso 1.

Aplique el esquema al RDD de Filas mediante el método createDataFrame proporcionado por SQLContext.

lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema)

fuente: GUIA DE PROGRAMACION SPARK


Me encontré con un problema similar. La solución es agregar una variable de entorno llamada "PYSPARK_SUBMIT_ARGS" y establecer su valor en "--packages com.databricks: spark-csv_2.10: 1.4.0 pyspark-shell". Esto funciona con la shell interactiva Python de Spark.

Asegúrese de hacer coincidir la versión de spark-csv con la versión de Scala instalada. Con Scala 2.11, es spark-csv_2.11 y con Scala 2.10 o 2.10.5 es spark-csv_2.10.

Espero que funcione.


Si no le importa la dependencia del paquete adicional, puede usar Pandas para analizar el archivo CSV. Maneja las comas internas muy bien.

Dependencias:

from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd

Lea todo el archivo a la vez en un Spark DataFrame:

sc = SparkContext(''local'',''example'') # if using locally sql_sc = SQLContext(sc) pandas_df = pd.read_csv(''file.csv'') # assuming the file contains a header # If no header: # pandas_df = pd.read_csv(''file.csv'', names = [''column 1'',''column 2'']) s_df = sql_sc.createDataFrame(pandas_df)

O, aún más consciente de los datos, puede dividir los datos en un Spark RDD y luego en el DF:

chunk_100k = pd.read_csv(''file.csv'', chunksize=100000) for chunky in chunk_100k: Spark_temp_rdd = sc.parallelize(chunky.values.tolist()) try: Spark_full_rdd += Spark_temp_rdd except NameError: Spark_full_rdd = Spark_temp_rdd del Spark_temp_rdd Spark_DF = Spark_full_rdd.toDF([''column 1'',''column 2''])


Siguiendo a Spark 2.0, se recomienda usar una sesión de Spark:

from pyspark.sql import SparkSession from pyspark.sql import Row # Create a SparkSession spark = SparkSession / .builder / .appName("basic example") / .config("spark.some.config.option", "some-value") / .getOrCreate() def mapper(line): fields = line.split('','') return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3])) lines = spark.sparkContext.textFile("file.csv") df = lines.map(mapper) # Infer the schema, and register the DataFrame as a table. schemaDf = spark.createDataFrame(df).cache() schemaDf.createOrReplaceTempView("tablename")


para Pyspark, asumiendo que la primera fila del archivo csv contiene un encabezado

spark = SparkSession.builder.appName(''chosenName'').getOrCreate() df=spark.read.csv(''fileNameWithPath'', mode="DROPMALFORMED",inferSchema=True, header = True)


from pyspark.sql.types import StringType from pyspark import SQLContext sqlContext = SQLContext(sc) Employee_rdd = sc.textFile("/../Employee.csv") .map(lambda line: line.split(",")) Employee_df = Employee_rdd.toDF([''Employee_ID'',''Employee_name'']) Employee_df.show()