tutorial spark science español data java scala apache-spark bigdata

java - science - spark python



¿Cómo saber qué etapa de un trabajo se está ejecutando actualmente en Apache Spark? (2)

Considera que tengo un trabajo como sigo en Spark;

Archivo CSV ==> Filtrar por una columna ==> Tomar muestra ==> Guardar como JSON

Ahora mi requisito es ¿cómo puedo saber qué paso ( Fetching file o Filtering o Sampling ) del trabajo se está ejecutando programáticamente (Preferiblemente utilizando Java API)? ¿Hay alguna manera de esto?

Puedo rastrear Trabajo, Etapa y Tarea usando la clase SparkListener . Y se puede hacer como rastrear un Id de etapa. Pero cómo saber qué etapa Id es para qué paso en la cadena de trabajo.

Lo que deseo enviar una notificación al usuario cuando considere Filtrar por una columna se completa. Para eso hice una clase que amplía la clase SparkListener. Pero no puedo averiguar de dónde puedo obtener el nombre de la ejecución del nombre de la transformación. ¿Es posible rastrear en absoluto?

public class ProgressListener extends SparkListener{ @Override public void onJobStart(SparkListenerJobStart jobStart) { } @Override public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { //System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only } @Override public void onTaskStart(SparkListenerTaskStart taskStart) { //no such method like taskStart.name() } }


¿Consideró esta opción: http://spark.apache.org/docs/latest/monitoring.html
Parece que puede usar la siguiente API de reposo para obtener un determinado estado de trabajo / applications / [id-de-aplicación] / jobs / [job-id]

Puede establecer JobGroupId y JobGroupDescription para que pueda rastrear qué grupo de trabajo se está manejando. es decir, setJobGroup

Asumiendo que llamarás a JobGroupId "prueba"

sc.setJobGroup("1", "Test job")

Cuando llame al http: // localhost: 4040 / api / v1 / applications / [id-de-aplicación] / jobs / [job-id]

Obtendrás un JSON con un nombre descriptivo para ese trabajo:

{ "jobId" : 3, "name" : "count at <console>:25", "description" : "Test Job", "submissionTime" : "2017-02-22T05:52:03.145GMT", "completionTime" : "2017-02-22T05:52:13.429GMT", "stageIds" : [ 3 ], "jobGroup" : "1", "status" : "SUCCEEDED", "numTasks" : 4, "numActiveTasks" : 0, "numCompletedTasks" : 4, "numSkippedTasks" : 0, "numFailedTasks" : 0, "numActiveStages" : 0, "numCompletedStages" : 1, "numSkippedStages" : 0, "numFailedStages" : 0 }


No se puede saber exactamente cuándo, por ejemplo, la operación de filtro comienza o termina.

Eso es porque tienes transformaciones ( filter , map , ...) y acciones ( count , foreach , ...). Spark pondrá tantas operaciones en una etapa como sea posible. Luego, el escenario se ejecuta en paralelo en las diferentes particiones de su entrada. Y aquí viene el problema.

Supongamos que tiene varios trabajadores y el siguiente programa

LOAD ==> MAP ==> FILTER ==> GROUP BY + Agregación

Este programa probablemente tendrá dos etapas: la primera etapa cargará el archivo y aplicará el map y el filter . Luego, la salida se barajará para crear los grupos. En la segunda etapa, se realizará la agregación.

Ahora, el problema es que tiene varios trabajadores y cada uno procesará una parte de sus datos de entrada en paralelo. Es decir, cada ejecutor en su clúster recibirá una copia de su programa (la etapa actual) y lo ejecutará en la partición asignada.

Verá, tendrá múltiples instancias de su map y operadores de filter que se ejecutan en paralelo, pero no necesariamente al mismo tiempo. En un caso extremo, el trabajador 1 terminará con la etapa 1 antes de que el trabajador 20 haya comenzado (y por lo tanto termine con su operación de filter antes que el trabajador 20).

Para los RDD, Spark usa el modelo de iterador dentro de un escenario. Sin embargo, para los conjuntos de datos en la última versión de Spark, crean un solo bucle sobre la partición y ejecutan las transformaciones. Esto significa que, en este caso, Spark no sabe realmente cuándo finalizó un operador de transformación para una sola tarea.

Larga historia corta:

  1. No puede saber cuándo finaliza una operación dentro de una etapa
  2. Incluso si pudieras, hay varias instancias que terminarán en diferentes momentos.

Entonces, ahora ya tenía el mismo problema:

En nuestro proyecto Piglet (permita un poco de publicidad ;-)) generamos el código Spark de los scripts de Pig Latin y queremos crear un perfil de los scripts. Terminé insertando el operador mapPartition entre todos los operadores de usuario que enviarán la identificación de la partición y la hora actual a un servidor que evaluará los mensajes. Sin embargo, esta solución también tiene sus limitaciones ... y aún no estoy completamente satisfecho.

Sin embargo, a menos que pueda modificar los programas, me temo que no puede lograr lo que quiere.