tutorial spark hadoop apache-spark spark-streaming yarn cloudera

hadoop - spark - ¿Cuál es la forma correcta de iniciar/detener trabajos de chispa en el hilo?



apache spark vs hadoop (4)

El último elemento del rompecabezas es cómo detener la aplicación Spark Streaming implementada en YARN de una manera elegante. El método estándar para detener (o más bien matar) la aplicación YARN es usar una yarn application -kill [applicationId] comando yarn application -kill [applicationId] . Y este comando detiene la aplicación Spark Streaming, pero esto podría ocurrir en el medio de un lote. Por lo tanto, si el trabajo lee datos de Kafka, guarda los resultados de procesamiento en HDFS y finalmente compromete los desplazamientos de Kafka, debe esperar datos duplicados en HDFS cuando el trabajo se detuvo justo antes de comprometer las compensaciones.

El primer intento de resolver el problema de apagado elegante fue llamar al método de detención de contexto de transmisión de Spark en un gancho de cierre.

sys.addShutdownHook { streamingContext.stop(stopSparkContext = true, stopGracefully = true) }

Desafortunadamente, un gancho de apagado se llama demasiado tarde para terminar el lote iniciado y la aplicación Spark se elimina casi de inmediato. Además, no hay garantía de que JVM llame a un gancho de cierre.

En el momento de escribir esta publicación de blog, la única forma confirmada de cerrar correctamente la aplicación Spark Streaming en YARN es notificar de alguna manera la aplicación sobre el apagado planificado y luego detener el contexto de transmisión mediante programación (pero no desde el cierre). yarn application -kill comando: la yarn application -kill debe usarse solo como último recurso si la aplicación notificada no se detiene después del tiempo de espera definido.

Se puede notificar a la aplicación sobre el apagado planificado utilizando un archivo de marcador en HDFS (la manera más fácil) o usando el punto final Socket / HTTP simple expuesto en el controlador (manera sofisticada).

Como me gusta el principio de KISS, a continuación puede encontrar el pseudocódigo del script de shell para iniciar / detener la aplicación Spark Streaming utilizando el archivo de marcador:

start() { hdfs dfs -touchz /path/to/marker/my_job_unique_name spark-submit ... } stop() { hdfs dfs -rm /path/to/marker/my_job_unique_name force_kill=true application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`) for i in `seq 1 10`; do application_status=$(yarn application -status ${application_id} | grep "State : /(RUNNING/|ACCEPTED/)") if [ -n "$application_status" ]; then sleep 60s else force_kill=false break fi done $force_kill && yarn application -kill ${application_id} }

En la aplicación Spark Streaming, el hilo de fondo debe monitorear el archivo de marcador, y cuando el archivo desaparece, detenga la llamada de contexto.

streamingContext.stop(stopSparkContext = true, stopGracefully = true)

También puede consultar http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html

He estado experimentando y buscando en Google durante muchas horas, sin suerte.

Tengo una aplicación de streaming de chispa que funciona bien en un grupo de chispa local. Ahora necesito desplegarlo en cloudera 5.4.4. Necesito poder iniciarlo, ejecutarlo en segundo plano de forma continua y poder detenerlo.

Intenté esto:

$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs

Pero solo imprime estas líneas sin fin.

15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING) 15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)

Pregunta número 1 : dado que es una aplicación de transmisión, necesita ejecutarse continuamente. Entonces, ¿cómo lo ejecuto en un modo de "fondo"? Todos los ejemplos que puedo encontrar de enviar trabajos de chispa en el hilo parecen suponer que la aplicación hará algún trabajo y terminará y, por lo tanto, querrá ejecutarlo en primer plano. Pero ese no es el caso para la transmisión.

El siguiente paso ... en este momento la aplicación no parece estar funcionando. Me imagino que podría ser un error o una mala configuración de mi parte, así que traté de mirar en los registros para ver qué está pasando:

$ yarn logs -applicationId application_1438092860895_012

Pero me dice:

/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.

Entonces pregunta número 2 : si la aplicación está EN EJECUCIÓN, ¿por qué no tiene archivos de registro?

Entonces eventualmente tuve que matarlo:

$ yarn application -kill application_1438092860895_012

Esto nos lleva a la pregunta número 3 : suponiendo que eventualmente pueda lanzar la aplicación y ejecutarla en segundo plano, ¿es la "forma preferida" de detener la aplicación de hilados?


  1. ¿Cuál es tu fuente de datos? Si es confiable, como el receptor directo de Kafka, el cierre de kill kill debería estar bien. Cuando la aplicación se reinicie, se leerá desde el último desplazamiento completo del lote. Si la fuente de datos no es confiable, o si usted mismo desea manejar un apagado elegante, debe implementar algún tipo de enlace externo en el contexto de transmisión. Me enfrenté al mismo problema y terminé implementando un pequeño truco para agregar una nueva pestaña en el webui que actúa como un botón de detención.

  1. Puedes cerrar la consola spark-submit . El trabajo se está ejecutando en segundo plano cuando se escribe en estado EN EJECUCIÓN .
  2. Los registros son visibles justo después de que se complete la aplicación . Durante el tiempo de ejecución, todos los registros se pueden acceder directamente en los nodos de trabajadores localmente (se puede ver en la interfaz de usuario web del administrador de recursos de YARN) y se agregan a HDFS una vez que el trabajo finaliza .
  3. yarn application -kill es probablemente la mejor manera de detener la aplicación de transmisión de Spark, pero no es perfecto. Sería mejor hacer un apagado elegante para detener todos los receptores de transmisión y detener el contexto de transmisión, pero personalmente no sé cómo hacerlo.

Finalmente encuentro una forma de cerrar con seguridad el trabajo de chispa.

  1. escribir un hilo de servidor de socket esperar para detener el contexto de transmisión

package xxx.xxx.xxx import java.io.{BufferedReader, InputStreamReader} import java.net.{ServerSocket, Socket} import org.apache.spark.streaming.StreamingContext object KillServer { class NetworkService(port: Int, ssc: StreamingContext) extends Runnable { val serverSocket = new ServerSocket(port) def run() { Thread.currentThread().setName("Zhuangdy | Waiting for graceful stop at port " + port) while (true) { val socket = serverSocket.accept() (new Handler(socket, ssc)).run() } } } class Handler(socket: Socket, ssc: StreamingContext) extends Runnable { def run() { val reader = new InputStreamReader(socket.getInputStream) val br = new BufferedReader(reader) if (br.readLine() == "kill") { ssc.stop(true, true) } br.close(); } } def run(port:Int, ssc: StreamingContext): Unit ={ (new NetworkService(port, ssc)).run } }

  1. en su método main donde inicia el contexto de transmisión, agregue el siguiente código

    ssc.start() KillServer.run(11212, ssc) ssc.awaitTermination()

  2. Escriba spark-submit para enviar trabajos al hilo, y dirija la salida a un archivo que usará más tarde

spark-submit --class "com.Mainclass" / --conf "spark.streaming.stopGracefullyOnShutdown=true" / --master yarn-cluster --queue "root" / --deploy-mode cluster / --executor-cores 4 --num-executors 8 --executor-memory 3G / hdfs:///xxx.jar > output 2>&1 &

  1. ¡Finalmente, el trabajo de encendido de chispa de apagado seguro sin pérdida de datos o resultado de cómputo no persiste! (El socket del servidor que está utilizando para detener el contexto de transmisión correctamente se está ejecutando en el controlador, por lo que grep el resultado del paso 3 para obtener el controlador addr, y el uso de echo nc para enviar un comando socket kill)

#!/bin/bash driver=`cat output | grep ApplicationMaster | grep -Po ''/d+./d+./d+./d+''` echo "kill" | nc $driver 11212 driverid=`yarn application -list 2>&1 | grep ad.Stat | grep -Po ''application_/d+_/d+''` yarn application -kill $driverid