apache spark - spark - Leer archivos enviados con chispa por el conductor.
apache spark wikipedia (5)
Estoy enviando un trabajo de Spark para ejecutar en un clúster remoto ejecutando
spark-submit ... --deploy-mode cluster --files some.properties ...
Quiero leer el contenido del archivo some.properties
por el código del controlador , es decir, antes de crear el contexto Spark y lanzar tareas RDD. El archivo se copia al controlador remoto, pero no al directorio de trabajo del controlador.
Las formas de solucionar este problema que conozco son:
- Sube el archivo a HDFS
- Almacena el archivo en la aplicación jar
Ambos son inconvenientes ya que este archivo se cambia con frecuencia en la máquina de desarrollo de envío.
¿Hay alguna forma de leer el archivo que se cargó utilizando el indicador --files
durante el método principal del código del controlador?
Después de la investigación, encontré una solución para el problema anterior. Envíe la configuración any.properties durante el envío de chispas y utilícela por el controlador de chispas antes y después de la inicialización de SparkSession. Espero que te ayude.
cualquier propiedad
spark.key=value
spark.app.name=MyApp
SparkTest.java
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class SparkTest{
public Static void main(String[] args){
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
Config conf = loadConf();
System.out.println(conf.getString("spark.key"));
// Initialize SparkContext and use configuration from properties
SparkConf sparkConf = new SparkConf(true).setAppName(conf.getString("spark.app.name"));
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport().getOrCreate();
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
}
public static Config loadConf() {
String configFileName = "any.properties";
System.out.println(configFileName);
Config configs = ConfigFactory.load(ConfigFactory.parseFile(new java.io.File(configFileName)));
System.out.println(configs.getString("spark.key")); // get value from properties file
return configs;
}
}
Chispa enviar:
spark-submit --class SparkTest --master yarn --deploy-mode client --files any.properties,yy-site.xml --jars ...........
Esta es una buena solución que desarrollé en Python Spark para integrar cualquier dato como un archivo externo a su plataforma de Big Data.
Que te diviertas.
# Load from the Spark driver any local text file and return a RDD (really useful in YARN mode to integrate new data at the fly)
# (See https://community.hortonworks.com/questions/38482/loading-local-file-to-apache-spark.html)
def parallelizeTextFileToRDD(sparkContext, localTextFilePath, splitChar):
localTextFilePath = localTextFilePath.strip('' '')
if (localTextFilePath.startswith("file://")):
localTextFilePath = localTextFilePath[7:]
import subprocess
dataBytes = subprocess.check_output("cat " + localTextFilePath, shell=True)
textRDD = sparkContext.parallelize(dataBytes.split(splitChar))
return textRDD
# Usage example
myRDD = parallelizeTextFileToRDD(sc, ''~/myTextFile.txt'', ''/n'') # Load my local file as a RDD
myRDD.saveAsTextFile(''/user/foo/myTextFile'') # Store my data to HDFS
Las opciones --archivos y --archivos admiten la especificación de nombres de archivos con # similar a Hadoop. Por ejemplo, puede especificar: --files localtest.txt # appSees.txt y esto cargará el archivo que ha nombrado localmente localtest.txt en HDFS, pero estará vinculado por el nombre appSees.txt, y su aplicación debe usar el nombre como appSees.txt para referenciarlo cuando se ejecuta en YARN.
esto funciona para mi aplicación de transmisión de chispa en modo hilado / cliente y hilado / clúster. tal vez pueda ayudarte
Sí, puede acceder a los archivos cargados a través del argumento --files
.
Así es como puedo acceder a los archivos pasados a través de - --files
:
./bin/spark-submit /
--class com.MyClass /
--master yarn-cluster /
--files /path/to/some/file.ext /
--jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-rdbms-3.2.9.jar,lib/datanucleus-core-3.2.10.jar /
/path/to/app.jar file.ext
y en mi código Spark:
val filename = args(0)
val linecount = Source.fromFile(filename).getLines.size
Creo que estos archivos se descargan en los trabajadores en el mismo directorio en el que se coloca el archivo, por lo que simplemente se pasa el nombre de archivo y no la ruta absoluta a Source.fromFile
.
Una forma de solucionar el problema es que puede crear un SparkContext
temporal simplemente llamando a SparkContext.getOrCreate()
y luego leer el archivo que pasó en los --files
con la ayuda de SparkFiles.get(''FILE'')
.
Una vez que lea el archivo, recupere toda la configuración necesaria que necesita en una variable SparkConf()
.
Después de que llame a esta función:
SparkContext.stop(SparkContext.getOrCreate())
Esto destruirá el SparkContext
existente y en la siguiente línea simplemente inicializará un nuevo SparkContext
con las configuraciones necesarias como esta.
sc = SparkContext(conf=conf).getOrCreate()
Te conseguiste un SparkContext
con la configuración deseada