write spark read example cargar scala apache-spark hadoop apache-spark-sql spark-dataframe hdfs

scala - read - Spark: ¿cargar el archivo CSV como DataFrame?



spark read option csv (11)

Analiza CSV y carga como DataFrame / DataSet con Spark 2.x

Primero inicialice el objeto SparkSession de forma predeterminada, estará disponible en shells como spark

val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Spark CSV Reader") .getOrCreate;

Utilice cualquiera de las siguientes formas de cargar CSV como DataFrame/DataSet

1. Hazlo de manera programática

val df = spark.read .format("csv") .option("header", "true") //first line in file has headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv")

2. También puedes hacer esta manera SQL

val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependencias :

"org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Versión Spark <2.0

val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path");

Dependencias:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST,

Me gustaría leer un CSV en chispa y convertirlo como DataFrame y almacenarlo en HDFS con df.registerTempTable("table_name")

Yo he tratado:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Error que obtuve:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

¿Cuál es el comando correcto para cargar el archivo CSV como DataFrame en Apache Spark?


Carga un archivo CSV y devuelve el resultado como un DataFrame.

df=sparksession.read.option("header", true).csv("file_name.csv")

Dataframe trató un archivo como formato csv.


Con Spark 2.0, lo siguiente es cómo puedes leer CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path)


El ejemplo de Penny''s Spark 2 es la forma de hacerlo en spark2. Hay un truco más: generar ese encabezado para usted haciendo un escaneo inicial de los datos, estableciendo la opción inferSchema en true

Aquí, entonces, suponiendo que la spark es una sesión de chispa que ha configurado, es la operación para cargar en el archivo de índice CSV de todas las imágenes de Landsat que aloja Amazon en S3.

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz")

La mala noticia es: esto desencadena un escaneo a través del archivo; para algo grande como este archivo CSV comprimido de más de 20 MB, que puede demorar 30 segundos en una conexión de larga distancia. Tenga esto en cuenta: es mejor que codifique manualmente el esquema una vez que lo haya introducido.

(fragmento de código Licencia de software Apache 2.0 con licencia para evitar toda ambigüedad; algo que he hecho como prueba de demostración / integración de integración S3)


El formato de archivo predeterminado es Parquet con spark.read ... y la lectura de archivos csv es la razón por la que obtiene la excepción. Especifique el formato csv con la api que está intentando usar


En Java 1.8, este fragmento de código funciona perfectamente para leer archivos CSV

POM.xml

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.4.0</version> </dependency>

Java

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show();


En caso de que esté construyendo un jar con scala 2.11 y Apache 2.0 o superior.

No es necesario crear un objeto sqlContext o sparkContext . Solo un objeto SparkSession suficiente para todas las necesidades.

Lo siguiente es mycode que funciona bien:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.log4j.{Level, LogManager, Logger} object driver { def main(args: Array[String]) { val log = LogManager.getRootLogger log.info("**********JAR EXECUTION STARTED**********") val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate() val df = spark.read.format("csv") .option("header", "true") .option("delimiter","|") .option("inferSchema","true") .load("d:/small_projects/spark/test.pos") df.show() } }

En caso de que se esté ejecutando en un clúster, simplemente cambie .master("local") a .master("yarn") mientras define el objeto sparkBuilder

El Spark Doc cubre esto: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html


Es para cuyo Hadoop es 2.6 y Spark es 1.6 y sin el paquete "databricks".

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema)


Hay muchos desafíos para analizar un archivo CSV, se sigue sumando si el tamaño del archivo es mayor, si hay caracteres que no están en inglés / escape / separador / otros en los valores de la columna, que podrían causar errores de análisis.

La magia está en las opciones que se usan. Los que funcionaron para mí y espero que cubran la mayoría de los casos límite están en el siguiente código:

### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep='','', quote=''"'', escape=''"'', maxColumns=2, inferSchema=True)

Espero que ayude. Para obtener más información, consulte: Uso de PySpark 2 para leer CSV con código fuente HTML

Nota: El código anterior es de Spark 2 API, donde la API de lectura de archivos CSV viene incluida con paquetes integrados de Spark instalables.

Nota: PySpark es un contenedor de Python para Spark y comparte la misma API que Scala / Java.


Prueba esto si usas spark 2.0+

For non-hdfs file: df = spark.read.csv("file:///csvfile.csv") For hdfs file: df = spark.read.csv("hdfs:///csvfile.csv") For hdfs file (with different delimiter than comma: df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

Nota: este trabajo para cualquier archivo delimitado. Simplemente use la opción ("delimitador",) para cambiar el valor.

Espero que esto sea útil.


spark-csv es parte de la funcionalidad central de Spark y no requiere una biblioteca separada. Entonces podrías hacer por ejemplo

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

En scala, (esto funciona para cualquier delimitador de formato en mención "," para csv, "/ t" para tsv, etc.) val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")