apache spark - examples - ¿Cómo se dividen las etapas en tareas en Spark?
apache spark wikipedia (3)
Supongamos por lo siguiente que solo se está ejecutando un trabajo de Spark en cada momento.
Lo que llego hasta ahora
Esto es lo que entiendo que sucede en Spark:
-
Cuando se crea un
SparkContext
, cada nodo de trabajo inicia un ejecutor. Los ejecutores son procesos separados (JVM), que se conectan nuevamente al programa del controlador. Cada ejecutor tiene el frasco del programa controlador. Renunciar a un conductor, apaga a los ejecutores. Cada ejecutor puede contener algunas particiones. - Cuando se ejecuta un trabajo, se crea un plan de ejecución de acuerdo con el gráfico de linaje.
- El trabajo de ejecución se divide en etapas, donde las etapas contienen tantas transformaciones y acciones vecinas (en el gráfico de linaje), pero no barajan. Así, las etapas están separadas por barajaduras.
Entiendo que
- Una tarea es un comando enviado desde el controlador a un ejecutor serializando el objeto Function.
- El ejecutor deserializa (con el controlador jar) el comando (tarea) y lo ejecuta en una partición.
pero
Pregunta (s)
¿Cómo divido el escenario en esas tareas?
Específicamente:
- ¿Las tareas están determinadas por las transformaciones y acciones o pueden ser múltiples transformaciones / acciones en una tarea?
- Son las tareas determinadas por la partición (por ejemplo, una tarea por etapa por partición).
- ¿Las tareas están determinadas por los nodos (por ejemplo, una tarea por etapa por nodo)?
Lo que pienso (solo respuesta parcial, incluso si es correcto)
En https://0x0fff.com/spark-architecture-shuffle , el shuffle se explica con la imagen
y tengo la impresión de que la regla es
cada etapa se divide en # tareas de número de particiones, sin tener en cuenta el número de nodos
Para mi primera imagen, diría que tendría 3 tareas de mapa y 3 tareas de reducción.
Para la imagen de 0x0fff, diría que hay 8 tareas de mapa y 3 tareas de reducción (suponiendo que solo haya tres archivos naranja y tres verde oscuro).
Preguntas abiertas en cualquier caso
¿Es eso correcto? Pero incluso si eso es correcto, mis preguntas anteriores no están todas respondidas, porque todavía está abierto, ya sea que varias operaciones (por ejemplo, múltiples mapas) estén dentro de una tarea o estén separadas en una tarea por operación.
Lo que otros dicen
¿Qué es una tarea en Spark? ¿Cómo ejecuta el trabajador Spark el archivo jar? y ¿Cómo divide el planificador Apache Spark los archivos en tareas? son similares, pero no sentí que mi pregunta fue respondida claramente allí.
Esto podría ayudarlo a comprender mejor las diferentes piezas:
- Etapa: es una colección de tareas. Mismo proceso que se ejecuta en diferentes subconjuntos de datos (particiones).
- Tarea: representa una unidad de trabajo en una partición de un conjunto de datos distribuido. Entonces, en cada etapa, número de tareas = número de particiones, o como dijiste "una tarea por etapa por partición".
- Cada ejecutador se ejecuta en un contenedor de hilo, y cada contenedor reside en un nodo.
- Cada etapa utiliza múltiples ejecutores, a cada ejecutor se le asignan múltiples vcores.
- Cada vcore puede ejecutar exactamente una tarea a la vez
- Entonces, en cualquier etapa, se pueden ejecutar múltiples tareas en paralelo. número de tareas en ejecución = número de vcores en uso.
Si entiendo correctamente, hay 2 cosas (relacionadas) que te confunden:
1) ¿Qué determina el contenido de una tarea?
2) ¿Qué determina el número de tareas a ejecutar?
El motor de Spark "pega" operaciones simples en rdds consecutivos, por ejemplo:
rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count
así que cuando se calcula (perezosamente) rdd3, spark generará una tarea por partición de rdd1 y cada tarea ejecutará tanto el filtro como el mapa por línea para dar como resultado rdd3.
El número de tareas está determinado por el número de particiones. Cada RDD tiene un número definido de particiones. Para un RDD de origen que se lee desde HDFS (usando sc.textFile (...) por ejemplo), el número de particiones es el número de divisiones generadas por el formato de entrada. Algunas operaciones en RDD (s) pueden resultar en un RDD con un número diferente de particiones:
rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).
Otro ejemplo es une:
rdd3 = rdd1.join( rdd2 , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).
(La mayoría) de las operaciones que cambian el número de particiones implican una combinación aleatoria, cuando hacemos, por ejemplo:
rdd2 = rdd1.repartition( 1000 )
lo que realmente sucede es que la tarea en cada partición de rdd1 necesita producir una salida final que pueda leerse en la siguiente etapa para que rdd2 tenga exactamente 1000 particiones (¿Cómo lo hacen? Hash u Sort ). Las tareas en este lado a veces se denominan "tareas de mapa (lado)". Una tarea que luego se ejecutará en rdd2 actuará en una partición (¡de rdd2!) Y tendría que descubrir cómo leer / combinar las salidas del lado del mapa relevantes para esa partición. Las tareas de este lado a veces se denominan "tareas de reducción (lateral)".
Las 2 preguntas están relacionadas: el número de tareas en una etapa es el número de particiones (común a los rdds consecutivos "pegados" juntos) y el número de particiones de un rdd puede cambiar entre etapas (especificando el número de particiones a algunos shuffle causando operación por ejemplo).
Una vez que comienza la ejecución de una etapa, sus tareas pueden ocupar espacios de tareas. El número de ranuras de tareas simultáneas es numExecutors * ExecutorCores. En general, estos pueden ser ocupados por tareas de diferentes etapas no dependientes.
Tienes un lindo resumen aquí. Para responder tu pregunta
-
Es necesario iniciar una
task
separada para cada partición de datos para cadastage
. Tenga en cuenta que es probable que cada partición resida en ubicaciones físicas distintas, por ejemplo, bloques en HDFS o directorios / volúmenes para un sistema de archivos local.
Tenga en cuenta que el envío de
Stage
s está dirigido por el
DAG Scheduler
.
Esto significa que las etapas que no son interdependientes pueden enviarse al clúster para su ejecución en paralelo: esto maximiza la capacidad de paralelización en el clúster.
Entonces, si las operaciones en nuestro flujo de datos pueden ocurrir simultáneamente, esperamos ver múltiples etapas lanzadas.
Podemos ver eso en acción en el siguiente ejemplo de juguete en el que hacemos los siguientes tipos de operaciones:
- cargar dos fuentes de datos
- realizar alguna operación de mapa en ambas fuentes de datos por separado
- Únete a ellos
- realizar algunas operaciones de mapa y filtro en el resultado
- guardar el resultado
Entonces, ¿con cuántas etapas terminaremos?
- 1 etapa cada una para cargar las dos fuentes de datos en paralelo = 2 etapas
-
Una tercera etapa que representa la
join
que depende de las otras dos etapas. - Nota: todas las operaciones de seguimiento que trabajan en los datos unidos se pueden realizar en la misma etapa porque deben suceder secuencialmente. No es beneficioso iniciar etapas adicionales porque no pueden comenzar a trabajar hasta que se complete la operación anterior.
Aquí está ese programa de juguetes
val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")
Y aquí está el DAG del resultado.
Ahora: ¿cuántas tareas ? El número de tareas debe ser igual a
Suma de (
Stage
*
#Partitions in the stage
)