spark medicina aws amazon-ec2 amazon-s3 apache-spark

amazon ec2 - medicina - Cómo leer la entrada de S3 en una aplicación de clúster Spark Streaming EC2



hadoop (8)

Estoy tratando de hacer que mi aplicación Spark Streaming lea su entrada desde un directorio S3, pero sigo obteniendo esta excepción después de iniciarla con el script spark-submit:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195) at MainClass$.main(MainClass.scala:1190) at MainClass.main(MainClass.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Estoy estableciendo esas variables a través de este bloque de código como se sugiere aquí http://spark.apache.org/docs/latest/ec2-scripts.html (parte inferior de la página):

val ssc = new org.apache.spark.streaming.StreamingContext( conf, Seconds(60)) ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2)) ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))

args (2) y args (3) son mi AWS Access Key ID y Secrete Access Key, por supuesto.

¿Por qué sigue diciendo que no están establecidos?

EDITAR: Lo intenté también de esta manera, pero recibo la misma excepción:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")


Aumentando la respuesta de @nealmcb, la forma más directa de hacerlo es definir

HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

en conf/spark-env.sh o exportar esa variable env en ~/.bashrc o ~/.bash_profile .

Eso funcionará siempre que pueda acceder a s3 a través de hadoop. Por ejemplo, si puedes correr

hadoop fs -ls s3n://path/

entonces hadoop puede ver la ruta s3.

Si hadoop no puede ver la ruta, siga los consejos que figuran en ¿Cómo puedo acceder a S3 / S3n desde una instalación local de Hadoop 2.6?


En AWS EMR, las sugerencias anteriores no funcionaron. En cambio, actualicé las siguientes propiedades en conf / core-site.xml:

fs.s3n.awsAccessKeyId y fs.s3n.awsSecretAccessKey con sus credenciales de S3.


Impar. Intenta también hacer un .set en el sparkContext . Intente también exportar variables env antes de iniciar la aplicación:

export AWS_ACCESS_KEY_ID=<your access> export AWS_SECRET_ACCESS_KEY=<your secret>

^^ así es como lo hacemos.

ACTUALIZACIÓN: de acuerdo con @tribbloid, lo anterior se rompió en 1.3.0, ahora tienes que desfigurar por edades y edades con hdfs-site.xml, o puedes hacerlo (y esto funciona en un spark-shell):

val hadoopConf = sc.hadoopConfiguration; hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey) hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)


La siguiente configuración funciona para mí, asegúrese de configurar también "fs.s3.impl":

val conf = new SparkConf().setAppName("Simple Application").setMaster("local") val sc = new SparkContext(conf) val hadoopConf=sc.hadoopConfiguration; hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey) hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)


Las últimas versiones de EMR (probadas en 4.6.0) requieren la siguiente configuración:

val sc = new SparkContext(conf) val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.s3.impl", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey) hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)

Aunque en la mayoría de los casos, la configuración debe funcionar, esto es si tiene diferentes credenciales S3 de las que lanzó con el clúster.



Quería poner las credenciales de forma más segura en un archivo de configuración en una de mis particiones cifradas. Así que export HADOOP_CONF_DIR=~/Private/.aws/hadoop_conf antes de ejecutar mi aplicación de chispa, y puse un archivo en ese directorio (encriptado a través de ecryptfs ) llamado core-site.xml contiene las credenciales como esta:

<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.s3n.awsAccessKeyId</name> <value>my_aws_access_key_id_here</value> </property> <property> <name>fs.s3n.awsSecretAccessKey</name> <value>my_aws_secret_access_key_here</value> </property> </configuration>

HADOOP_CONF_DIR también se puede configurar en conf/spark-env.sh .


esto funciona para mí en shell 1.4.1:

val conf = sc.getConf conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") conf.set("spark.hadoop.fs.s3.awsAccessKeyId", <your access key>) conf.set("spark.hadoop.fs.s3.awsSecretAccessKey", <your secret key>) SparkHadoopUtil.get.conf.addResource(SparkHadoopUtil.get.newConfiguration(conf)) ... sqlContext.read.parquet("s3://...")