scala - read - Spark: ¿cargar el archivo CSV como DataFrame?
spark read option csv (11)
Analiza CSV y carga como DataFrame / DataSet con Spark 2.x
Primero inicialice el objeto
SparkSession
de forma predeterminada, estará disponible en shells como
spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
Utilice cualquiera de las siguientes formas de cargar CSV como
DataFrame/DataSet
1. Hazlo de manera programática
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
2. También puedes hacer esta manera SQL
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
Dependencias :
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Versión Spark <2.0
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
Dependencias:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
Me gustaría leer un CSV en chispa y convertirlo como DataFrame y almacenarlo en HDFS con
df.registerTempTable("table_name")
Yo he tratado:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Error que obtuve:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
¿Cuál es el comando correcto para cargar el archivo CSV como DataFrame en Apache Spark?
Carga un archivo CSV y devuelve el resultado como un DataFrame.
df=sparksession.read.option("header", true).csv("file_name.csv")
Dataframe trató un archivo como formato csv.
Con Spark 2.0, lo siguiente es cómo puedes leer CSV
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
El ejemplo de Penny''s Spark 2 es la forma de hacerlo en spark2.
Hay un truco más: generar ese encabezado para usted haciendo un escaneo inicial de los datos, estableciendo la opción
inferSchema
en
true
Aquí, entonces, suponiendo que la
spark
es una sesión de chispa que ha configurado, es la operación para cargar en el archivo de índice CSV de todas las imágenes de Landsat que aloja Amazon en S3.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
La mala noticia es: esto desencadena un escaneo a través del archivo; para algo grande como este archivo CSV comprimido de más de 20 MB, que puede demorar 30 segundos en una conexión de larga distancia. Tenga esto en cuenta: es mejor que codifique manualmente el esquema una vez que lo haya introducido.
(fragmento de código Licencia de software Apache 2.0 con licencia para evitar toda ambigüedad; algo que he hecho como prueba de demostración / integración de integración S3)
El formato de archivo predeterminado es Parquet con spark.read ... y la lectura de archivos csv es la razón por la que obtiene la excepción. Especifique el formato csv con la api que está intentando usar
En Java 1.8, este fragmento de código funciona perfectamente para leer archivos CSV
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
En caso de que esté construyendo un jar con scala 2.11 y Apache 2.0 o superior.
No es necesario crear un objeto
sqlContext
o
sparkContext
.
Solo un objeto
SparkSession
suficiente para todas las necesidades.
Lo siguiente es mycode que funciona bien:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}
object driver {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.info("**********JAR EXECUTION STARTED**********")
val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}
En caso de que se esté ejecutando en un clúster, simplemente cambie
.master("local")
a
.master("yarn")
mientras define el objeto
sparkBuilder
El Spark Doc cubre esto: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
Es para cuyo Hadoop es 2.6 y Spark es 1.6 y sin el paquete "databricks".
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
Hay muchos desafíos para analizar un archivo CSV, se sigue sumando si el tamaño del archivo es mayor, si hay caracteres que no están en inglés / escape / separador / otros en los valores de la columna, que podrían causar errores de análisis.
La magia está en las opciones que se usan. Los que funcionaron para mí y espero que cubran la mayoría de los casos límite están en el siguiente código:
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep='','',
quote=''"'',
escape=''"'',
maxColumns=2,
inferSchema=True)
Espero que ayude. Para obtener más información, consulte: Uso de PySpark 2 para leer CSV con código fuente HTML
Nota: El código anterior es de Spark 2 API, donde la API de lectura de archivos CSV viene incluida con paquetes integrados de Spark instalables.
Nota: PySpark es un contenedor de Python para Spark y comparte la misma API que Scala / Java.
Prueba esto si usas spark 2.0+
For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")
For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")
For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
Nota: este trabajo para cualquier archivo delimitado. Simplemente use la opción ("delimitador",) para cambiar el valor.
Espero que esto sea útil.
spark-csv es parte de la funcionalidad central de Spark y no requiere una biblioteca separada. Entonces podrías hacer por ejemplo
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
En scala, (esto funciona para cualquier delimitador de formato en mención "," para csv, "/ t" para tsv, etc.)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")