apache-spark - examples - apache spark wikipedia
¿Cuándo son realmente confiables los acumuladores? (3)
Creo que Matei respondió esto en la documentación referida:
Como se discutió en here es bastante difícil proporcionar una buena semántica para el caso general (actualizaciones del acumulador dentro de las etapas sin resultado), por las siguientes razones:
Un RDD puede ser computado como parte de múltiples etapas. Por ejemplo, si actualiza un acumulador dentro de un MappedRDD y luego lo mezcla, puede ser una etapa. Pero si luego vuelves a llamar a map () en MappedRDD y barajas el resultado de eso, obtienes una segunda etapa en la que ese mapa está en proceso. ¿Desea contar la actualización de este acumulador dos veces o no?
Es posible reenviar etapas completas si los archivos aleatorios son eliminados por el limpiador periódico o se pierden debido a una falla de nodo, por lo que todo lo que rastrea los RDD deberá hacerlo durante largos periodos de tiempo (siempre que el RDD sea referenciable en el programa de usuario ), que sería bastante complicado de implementar.
Así que voy a marcar esto como "no arreglaré" por ahora, excepto por la parte de las etapas de resultados realizadas en SPARK-3628.
Quiero usar un acumulador para reunir algunas estadísticas sobre los datos que estoy manipulando en un trabajo de Spark. Idealmente, lo haría mientras el trabajo computa las transformaciones requeridas, pero como Spark volvería a calcular las tareas en diferentes casos, los acumuladores no reflejarían las métricas reales. Así es como la documentación describe esto:
Para las actualizaciones de acumuladores realizadas solo dentro de las acciones, Spark garantiza que la actualización de cada tarea al acumulador solo se aplicará una vez, es decir, las tareas reiniciadas no actualizarán el valor. En las transformaciones, los usuarios deben ser conscientes de que la actualización de cada tarea se puede aplicar más de una vez si las tareas o las etapas del trabajo se vuelven a ejecutar.
Esto es confuso ya que la mayoría de las acciones no permiten ejecutar código personalizado (donde se pueden usar los acumuladores), en su mayoría toman los resultados de transformaciones previas (de forma perezosa). La documentación también muestra esto:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
Pero si agregamos data.count()
al final, ¿se garantizará que sea correcto (no tenga duplicados) o no? Claramente, acc
no se usa "solo dentro de acciones", ya que el mapa es una transformación. Entonces no debería ser garantizado
Por otro lado, la discusión sobre las entradas relacionadas de Jira habla de "tareas de resultado" en lugar de "acciones". Por ejemplo here y here . Esto parece indicar que, de hecho, se garantizará que el resultado es correcto, ya que estamos utilizando acc
inmediatamente antes y acción y, por lo tanto, debería computarse como una sola etapa.
Supongo que este concepto de "tarea de resultado" tiene que ver con el tipo de operaciones involucradas, siendo el último que incluye una acción, como en este ejemplo, que muestra cómo varias operaciones se dividen en etapas (en magenta, imagen tomada desde here ):
Entonces, hipotéticamente, una acción count()
al final de esa cadena sería parte de la misma etapa final, y me garantizarían que los acumuladores utilizados en el último mapa no incluirán ningún duplicado.
¡La aclaración sobre este tema sería genial! Gracias.
Las actualizaciones del acumulador se envían nuevamente al controlador cuando una tarea se completa con éxito. Por lo tanto, se garantiza que los resultados de su acumulador serán correctos cuando esté seguro de que cada tarea se habrá ejecutado exactamente una vez y cada tarea funcionó como esperaba.
Prefiero depender de reduce
y aggregate
lugar de acumuladores porque es bastante difícil enumerar todas las formas en que se pueden ejecutar las tareas.
- Una acción inicia tareas.
- Si una acción depende de una etapa anterior y los resultados de esa etapa no están (completamente) en la memoria caché, entonces se iniciarán las tareas de la etapa anterior.
- La ejecución especulativa inicia tareas duplicadas cuando se detectan un número pequeño de tareas lentas.
Dicho esto, hay muchos casos simples en los que los acumuladores pueden ser plenamente confiables.
val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)
¿Se garantizaría que esto sea correcto (no tiene duplicados)?
Sí, si la ejecución especulativa está deshabilitada. El map
y el count
serán una sola etapa, así que, como dices, no hay forma de que una tarea se pueda ejecutar con éxito más de una vez.
Pero un acumulador se actualiza como un efecto secundario. Por lo tanto, debe tener mucho cuidado al pensar cómo se ejecutará el código. Considere esto en lugar de accumulating.count
:
// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)
Esto también creará una tarea para cada partición, y se garantizará que cada tarea se ejecute exactamente una vez. Pero el código en el map
no se ejecutará en todos los elementos, solo el primero en cada partición.
El acumulador es como una variable global. Si comparte una referencia al RDD que puede incrementar el acumulador, entonces otro código (otros hilos) también puede hacer que se incremente.
// Same setup as before.
val x = new X(accumulating) // We don''t know what X does.
// It may trigger the calculation
// any number of times.
accumulating.count
assert(acc >= 10)
Para responder a la pregunta "¿Cuándo son realmente confiables los acumuladores?"
Respuesta: cuando están presentes en una operación de acción .
Según la documentación en la Tarea de Acción, incluso si hay tareas reiniciadas, actualizará el Acumulador solo una vez.
Para las actualizaciones de acumuladores realizadas solo dentro de las acciones, Spark garantiza que la actualización de cada tarea al acumulador solo se aplicará una vez, es decir, las tareas reiniciadas no actualizarán el valor. En las transformaciones, los usuarios deben ser conscientes de que la actualización de cada tarea se puede aplicar más de una vez si las tareas o las etapas del trabajo se vuelven a ejecutar.
Y la acción permite ejecutar código personalizado.
Por Ej.
val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
if(x!=""){
accNotEmpty += 1
}
})
Pero, ¿por qué Map + Action viz. Resultado ¿Las operaciones de tareas no son confiables para una operación de Acumulador?
- La tarea falló debido a alguna excepción en el código. Spark intentará 4 veces (cantidad predeterminada de intentos). Si la tarea falla, siempre dará una excepción. Si por casualidad tiene éxito, Spark continuará y simplemente actualizará el valor del acumulador para el estado exitoso y los valores del acumulador de estados fallidos serán ignorados.
Veredicto: manejado correctamente - Etapa fallida: si un nodo ejecutor falla, no es culpa del usuario, sino un fallo de hardware. Y si el nodo desciende en la fase de mezcla. Como la salida aleatoria se almacena localmente, si un nodo falla, la salida aleatoria desaparecerá. Así que Spark se va volver a la etapa que generó la salida aleatoria, mira qué tareas deben volverse a ejecutar y las ejecuta en uno de los nodos que aún está activo. Después de regenerar la salida aleatoria faltante, la etapa que generó la salida del mapa ejecutó algunas de sus tareas varias veces. El parque cuenta las actualizaciones del acumulador de todos ellos.
Veredicto: no se maneja en la Tarea de resultados. El acumulador dará resultados incorrectos. - Si una tarea se ejecuta lentamente, Spark puede iniciar una copia especulativa de esa tarea en otro nodo.
Veredicto: no manejado. El acumulador dará salida incorrecta. - El RDD que se almacena en caché es enorme y no puede residir en la memoria. Por lo tanto, cada vez que se utiliza el RDD, se volverá a ejecutar la operación de asignación para obtener el RDD y de nuevo se actualizará el acumulador.
Veredicto: no manejado. El acumulador dará salida incorrecta.
Por lo tanto, es posible que la misma función se ejecute varias veces en los mismos datos. Por lo tanto, Spark no ofrece ninguna garantía para que el acumulador se actualice debido a la operación del Mapa.
Por lo tanto, es mejor usar la operación Acumulador en Acción en Spark.
Para saber más sobre Acumulador y sus problemas, consulte esta publicación del blog - Por Imran Rashid.