hadoop - para - spark streaming example
¿Cómo hacer que Spark Streaming escriba su salida para que Impala pueda leerlo? (1)
Tengo el siguiente problema con la API de Spark Streaming. Actualmente estoy transmitiendo datos de entrada a través de Flume a Spark Streaming, con el cual planeo hacer un preprocesamiento de los datos. Luego, me gustaría guardar los datos en el sistema de archivos de Hadoop y consultarlos con Impala. Sin embargo, Spark está escribiendo los archivos de datos en directorios separados y se genera un nuevo directorio para cada RDD.
Esto es un problema porque, en primer lugar, las tablas externas en Impala no pueden detectar subdirectorios, sino solo archivos, dentro del directorio al que apuntan, a menos que estén particionados. En segundo lugar, los nuevos directorios se agregan tan rápido por Spark que sería muy malo para el rendimiento crear una nueva partición periódicamente en Impala para cada directorio generado. Por otro lado, si elijo aumentar el intervalo de rotación de las escrituras en Spark, para que los directorios se generen con menos frecuencia, habrá un retraso adicional hasta que Impala pueda leer los datos entrantes. Esto no es aceptable ya que mi sistema tiene que admitir aplicaciones en tiempo real. En Hive, pude configurar las tablas externas para detectar también los subdirectorios sin necesidad de particionar, usando estas configuraciones:
set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;
Pero para mi entender, Impala no tiene una función como esta.
Actualmente estoy usando el siguiente código para leer los datos de Flume y escribirlos en HDFS:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)
Aquí, la ruta variable determina el prefijo del directorio, al cual se agregan los archivos de texto (parte-0000, etc.) y el resto del nombre del directorio es una marca de tiempo generada por Spark. Podría cambiar el código a algo como esto:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))
En este caso, los archivos se agregarán al mismo directorio determinado por la ruta, pero dado que siempre se denominan part-00000, part-00001, part-00002, etc., los archivos generados previamente se sobrescribirán. Mientras examinaba el código fuente de Spark, noté que los nombres de los archivos están determinados por una línea en el método open () de SparkHadoopWriter:
val outputName = "part-" + numfmt.format(splitID)
Y me parece que no hay forma de manipular el splitID a través de Spark API. Para resumir, mis preguntas son las siguientes:
- ¿Hay algún método para hacer que las tablas externas en Impala detecten subdirectorios?
- Si no es así, ¿hay algún método para que Spark escriba sus archivos de salida en un solo directorio o de otra forma en un formulario que pueda ser leído de forma instantánea por Impala?
- Si no, ¿hay algún tipo de actualización esperada con Spark para solucionar este problema o debería simplemente ramificar mi propia versión de Spark con la cual puedo decidir los nombres de los archivos que escribo?
No puedo hablar por Impala.
part-xxxxx es una convención de hadoop que sigue Spark. La mayoría de las herramientas entienden este formato y supongo que Spark no puede hacer mucho al respecto. Los archivos de parte deben ser únicos y agregar un número de partición al nombre del archivo es una técnica común.
Buscaría en Impala para ver cómo leer el archivo de pieza, ya que la mayoría de las herramientas de hadoop lo generan de esta manera.
Si uno quiere personalizar la estructura del directorio, aunque esa no es su pregunta, se puede lograr fácilmente, digamos para cambiar el formato de prefix-timestamp-suffix
. Spark Steaming utiliza RDD.saveAsTextFiles(..)
de Spark RDD.saveAsTextFiles(..)
debajo del capó, que se puede personalizar. Aquí está el código de DStream.scala:
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc)
}