tutorial spark examples example apache-spark

apache spark - spark - Leer archivos enviados con chispa por el conductor.



apache spark wikipedia (5)

Estoy enviando un trabajo de Spark para ejecutar en un clúster remoto ejecutando

spark-submit ... --deploy-mode cluster --files some.properties ...

Quiero leer el contenido del archivo some.properties por el código del controlador , es decir, antes de crear el contexto Spark y lanzar tareas RDD. El archivo se copia al controlador remoto, pero no al directorio de trabajo del controlador.

Las formas de solucionar este problema que conozco son:

  1. Sube el archivo a HDFS
  2. Almacena el archivo en la aplicación jar

Ambos son inconvenientes ya que este archivo se cambia con frecuencia en la máquina de desarrollo de envío.

¿Hay alguna forma de leer el archivo que se cargó utilizando el indicador --files durante el método principal del código del controlador?


Después de la investigación, encontré una solución para el problema anterior. Envíe la configuración any.properties durante el envío de chispas y utilícela por el controlador de chispas antes y después de la inicialización de SparkSession. Espero que te ayude.

cualquier propiedad

spark.key=value spark.app.name=MyApp

SparkTest.java

import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class SparkTest{ public Static void main(String[] args){ String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); Config conf = loadConf(); System.out.println(conf.getString("spark.key")); // Initialize SparkContext and use configuration from properties SparkConf sparkConf = new SparkConf(true).setAppName(conf.getString("spark.app.name")); SparkSession sparkSession = SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport().getOrCreate(); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext()); } public static Config loadConf() { String configFileName = "any.properties"; System.out.println(configFileName); Config configs = ConfigFactory.load(ConfigFactory.parseFile(new java.io.File(configFileName))); System.out.println(configs.getString("spark.key")); // get value from properties file return configs; } }

Chispa enviar:

spark-submit --class SparkTest --master yarn --deploy-mode client --files any.properties,yy-site.xml --jars ...........


Esta es una buena solución que desarrollé en Python Spark para integrar cualquier dato como un archivo externo a su plataforma de Big Data.

Que te diviertas.

# Load from the Spark driver any local text file and return a RDD (really useful in YARN mode to integrate new data at the fly) # (See https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html) def parallelizeTextFileToRDD(sparkContext, localTextFilePath, splitChar): localTextFilePath = localTextFilePath.strip('' '') if (localTextFilePath.startswith("file://")): localTextFilePath = localTextFilePath[7:] import subprocess dataBytes = subprocess.check_output("cat " + localTextFilePath, shell=True) textRDD = sparkContext.parallelize(dataBytes.split(splitChar)) return textRDD # Usage example myRDD = parallelizeTextFileToRDD(sc, ''~/myTextFile.txt'', ''/n'') # Load my local file as a RDD myRDD.saveAsTextFile(''/user/foo/myTextFile'') # Store my data to HDFS


Las opciones --archivos y --archivos admiten la especificación de nombres de archivos con # similar a Hadoop. Por ejemplo, puede especificar: --files localtest.txt # appSees.txt y esto cargará el archivo que ha nombrado localmente localtest.txt en HDFS, pero estará vinculado por el nombre appSees.txt, y su aplicación debe usar el nombre como appSees.txt para referenciarlo cuando se ejecuta en YARN.

esto funciona para mi aplicación de transmisión de chispa en modo hilado / cliente y hilado / clúster. tal vez pueda ayudarte


Sí, puede acceder a los archivos cargados a través del argumento --files .

Así es como puedo acceder a los archivos pasados ​​a través de - --files :

./bin/spark-submit / --class com.MyClass / --master yarn-cluster / --files /path/to/some/file.ext / --jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/datanucleus-core-3.2.10.jar / /path/to/app.jar file.ext

y en mi código Spark:

val filename = args(0) val linecount = Source.fromFile(filename).getLines.size

Creo que estos archivos se descargan en los trabajadores en el mismo directorio en el que se coloca el archivo, por lo que simplemente se pasa el nombre de archivo y no la ruta absoluta a Source.fromFile .


Una forma de solucionar el problema es que puede crear un SparkContext temporal simplemente llamando a SparkContext.getOrCreate() y luego leer el archivo que pasó en los --files con la ayuda de SparkFiles.get(''FILE'') .

Una vez que lea el archivo, recupere toda la configuración necesaria que necesita en una variable SparkConf() .

Después de que llame a esta función:

SparkContext.stop(SparkContext.getOrCreate())

Esto destruirá el SparkContext existente y en la siguiente línea simplemente inicializará un nuevo SparkContext con las configuraciones necesarias como esta.

sc = SparkContext(conf=conf).getOrCreate()

Te conseguiste un SparkContext con la configuración deseada