apache-spark rdd hortonworks-data-platform

apache spark - Spark leyó el archivo de S3 usando sc.textFile("s3n://...)



apache-spark rdd (12)

Intentando leer un archivo ubicado en S3 usando spark-shell:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log") lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12 scala> myRdd.count java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) ... etc ...

La excepción IOException: No se ha producido ningún sistema de archivos para el esquema: error s3n con:

  • Spark 1.31 o 1.40 en la máquina de desarrollo (sin bibliotecas Hadoop)
  • Ejecutando desde Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60) que integra Spark 1.2.1 fuera de la caja
  • Usando el esquema s3: // o s3n: //

¿Cuál es la causa de este error? ¿Falta dependencia, falta configuración o mal uso de sc.textFile() ?

O puede deberse a un error que afecta a la construcción de Spark específica de Hadoop 2.60 como parece sugerir esta post . Voy a probar Spark para Hadoop 2.40 para ver si esto resuelve el problema.


A pesar de que esta pregunta ya tiene una respuesta aceptada, creo que todavía faltan los detalles exactos de por qué sucede esto. Así que creo que podría haber un lugar para una respuesta más.

Si agrega la hadoop-aws requerida, su código debería funcionar.

Al iniciar Hadoop 2.6.0, el conector s3 FS se ha movido a una biblioteca separada llamada hadoop-aws. También hay un Jira para eso: mover el código del conector FS relacionado con s3 a hadoop-aws .

Esto significa que cualquier versión de spark que se haya creado con Hadoop 2.6.0 o posterior tendrá que usar otra dependencia externa para poder conectarse al Sistema de archivos S3.
Aquí hay un ejemplo de sbt que probé y funciona como se esperaba usando Apache Spark 1.6.2 construido contra Hadoop 2.6.0:

libraryDependencies + = "org.apache.hadoop"% "hadoop-aws"% "2.6.0"

En mi caso, encontré algunos problemas de dependencias, así que resolví agregando exclusión:

libraryDependencies + = "org.apache.hadoop"% "hadoop-aws"% "2.6.0" exclude ("tomcat", "jasper-compiler") excludeAll ExclusionRule (organization = "javax.servlet")

En otra nota relacionada, todavía tengo que probarlo, pero se recomienda usar el sistema de archivos "s3a" y no "s3n" a partir de Hadoop 2.6.0.

La tercera generación, s3a: sistema de archivos. Diseñado para ser un conmutador en reemplazo de s3n :, este enlace de sistema de archivos admite archivos más grandes y promete un mayor rendimiento.


Confirmó que esto está relacionado con la compilación de Spark contra Hadoop 2.60. Acabo de instalar Spark 1.4.0 "Precompilado para Hadoop 2.4 y posterior" (en lugar de Hadoop 2.6). Y el código ahora funciona bien.

sc.textFile("s3n://bucketname/Filename") ahora genera otro error:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

El siguiente código utiliza el formato de URL S3 para mostrar que Spark puede leer el archivo S3. Usando la máquina de desarrollo (no Hadoop libs).

scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt") lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21 scala> lyrics.count res1: Long = 9

Aún mejor : el código anterior con credenciales de AWS en línea en el URI de S3N se romperá si la clave secreta de AWS tiene un "/" hacia adelante. La configuración de las credenciales de AWS en SparkContext lo solucionará. El código funciona si el archivo S3 es público o privado.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/" val myRDD = sc.textFile("s3n://myBucket/MyFilePattern") myRDD.count


Estaba enfrentando el mismo problema. Funcionó bien después de establecer el valor para fs.s3n.impl y agregar la dependencia hadoop-aws.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey) sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")


Este es un código de chispa de muestra que puede leer los archivos presentes en s3

val hadoopConf = sparkContext.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", s3Key) hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret) var jobInput = sparkContext.textFile("s3://" + s3_location)


Hay un Spark JIRA, SPARK-7481 , abierto a partir de hoy, 20 de octubre de 2016, para agregar un módulo de nube de chispas que incluye dependencias transitivas de todo s3a y azul wasb: necesidad, junto con pruebas.

Y un Spark PR a juego. Así es como obtengo soporte s3a en mis versiones de chispa

Si lo hace a mano, debe obtener el JAR de hadoop-aws de la versión exacta que tienen el resto de sus JARS de hadoop, y una versión de los JAR de AWS 100% sincronizada con la que se compiló Hws. Para Hadoop 2.7. {1, 2, 3, ...}

hadoop-aws-2.7.x.jar aws-java-sdk-1.7.4.jar joda-time-2.9.3.jar + jackson-*-2.6.5.jar

Pegue todo esto en SPARK_HOME/jars . Ejecute spark con sus credenciales configuradas en Env vars o en spark-default.conf

la prueba más simple es si puedes hacer un recuento de líneas de un archivo CSV

val landsatCSV = "s3a://landsat-pds/scene_list.gz" val lines = sc.textFile(landsatCSV) val lineCount = lines.count()

Consigue un número: todo está bien. Obtenga un seguimiento de la pila. Malas noticias.


Para Spark 1.4.x "Precompilado para Hadoop 2.6 y posterior":

Acabo de copiar los paquetes necesarios S3, S3native de hadoop-aws-2.6.0.jar a spark-assembly-1.4.1-hadoop2.6.0.jar.

Después de eso reinicié Spark Cluster y funciona. No olvide verificar el propietario y el modo de la jarra de ensamblaje.


Probablemente tenga que usar el esquema s3a: / en lugar de s3: / o s3n: / Sin embargo, no está funcionando de manera predeterminada (para mí) para el shell de chispa. Veo el siguiente stacktrace:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35) at $iwC$$iwC$$iwC.<init>(<console>:37) at $iwC$$iwC.<init>(<console>:39) at $iwC.<init>(<console>:41) at <init>(<console>:43) at .<init>(<console>:47) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072) ... 68 more

Lo que creo: debe agregar manualmente la dependencia hadoop-aws manualmente http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar Pero no tengo idea de cómo agréguelo a la carcasa de chispa correctamente.


Puede agregar el parámetro --packages con el jar apropiado: a su envío:

bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py



Se encontró con el mismo problema en Spark 2.0.2. Lo resolvió alimentándolo con los frascos. Esto es lo que corrí:

$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar scala> val hadoopConf = sc.hadoopConfiguration scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId) scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey) scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala> sqlContext.read.parquet("s3://your-s3-bucket/")

obviamente, necesitas tener los frascos en el camino desde donde estás ejecutando spark-shell


Tuve que copiar los archivos jar de una descarga de hadoop en el $SPARK_HOME/jars . El uso de la bandera --jars o la bandera --packages para spark-submit no funcionó.

Detalles:

  • Spark 2.3.0
  • Hadoop descargado fue 2.7.6
  • Dos archivos jar copiados fueron de (hadoop dir)/share/hadoop/tools/lib/
    • aws-java-sdk-1.7.4.jar
    • hadoop-aws-2.7.6.jar

Use s3a en lugar de s3n. Tuve un problema similar en un trabajo de Hadoop. Después de cambiar de s3n a s3a funcionó.

p.ej

s3a: //myBucket/myFile1.log