apache spark - examples - ¿Por qué fallan los trabajos de Spark con org.apache.spark.shuffle.MetadataFetchFailedException: falta una ubicación de salida para shuffle 0 en modo de especulación?
apache spark wikipedia (7)
En mi caso (clúster independiente) se produjo la excepción porque el sistema de archivos de algunos esclavos Spark estaba lleno al 100%.
Eliminar todo en las carpetas de
spark/work
de los esclavos resolvió el problema.
Estoy ejecutando un trabajo de Spark en modo de especulación. Tengo alrededor de 500 tareas y alrededor de 500 archivos comprimidos de 1 GB gz. Sigo recibiendo en cada trabajo, para 1-2 tareas, el error adjunto donde se vuelve a ejecutar después decenas de veces (evitando que el trabajo se complete).
org.apache.spark.shuffle.MetadataFetchFailedException: falta una ubicación de salida para shuffle 0
¿Alguna idea de cuál es el significado del problema y cómo superarlo?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Esto me sucedió cuando le di más memoria al nodo trabajador de la que tiene. Como no tenía intercambio, la chispa se estrelló al tratar de almacenar objetos para barajar sin dejar más memoria.
La solución fue agregar el intercambio o configurar el trabajador / ejecutor para usar menos memoria además de usar el nivel de almacenamiento MEMORY_AND_DISK para varias persistencias.
Para mí, estaba haciendo algunas ventanas en datos grandes (aproximadamente 50B filas) y obtenía una carga de barco de
ExternalAppendOnlyUnsafeRowArray:54
- Se alcanzó el umbral de derrame de 4096 filas, cambiando aorg.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
En mis troncos. Obviamente 4096 puede ser pequeño en ese tamaño de datos ... esto me llevó a la siguiente JIRA:
https://issues.apache.org/jira/browse/SPARK-21595
Y finalmente a las siguientes dos opciones de configuración:
-
spark.sql.windowExec.buffer.spill.threshold
-
spark.sql.windowExec.buffer.in.memory.threshold
Ambos predeterminados a 4096; Los crié mucho más alto (2097152) y ahora las cosas parecen ir bien. No estoy 100% seguro de que esto sea lo mismo que el problema planteado aquí, pero es otra cosa que intentar.
Resolví este error aumentando la memoria asignada en executeorMemory y driverMemory. Puede hacer esto en HUE seleccionando el programa Spark que está causando el problema y en propiedades -> Lista de opciones puede agregar algo como esto:
--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2
Por supuesto, los valores de los parámetros variarán según el tamaño de su clúster y sus necesidades.
Tengo el mismo problema en mi clúster YARN de 3 máquinas. Seguí cambiando RAM pero el problema persistió. Finalmente vi los siguientes mensajes en los registros:
17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms
y después de esto, hubo este mensaje:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67
Modifiqué las propiedades en spark-defaults.conf de la siguiente manera:
spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000
¡Eso es! Mi trabajo se completó con éxito después de esto.
Tengo el mismo problema, pero busqué muchas respuestas que no pueden resolver mi problema.
finalmente, depuro mi código paso a paso.
Me parece que el problema causado por el tamaño de los datos no está equilibrado para cada partición, conducido a
MetadataFetchFailedException
que en la etapa de
map
no
reduce
etapa.
simplemente haga
df_rdd.repartition(nums)
antes de
reduceByKey()
Tuvimos un error similar con Spark, pero no estoy seguro de que esté relacionado con su problema.
Utilizamos
JavaPairRDD.repartitionAndSortWithinPartitions
en datos de 100 GB y seguía fallando de manera similar a su aplicación.
Luego observamos los registros de Yarn en los nodos específicos y descubrimos que tenemos algún tipo de problema de falta de memoria, por lo que el Yarn interrumpió la ejecución.
Nuestra solución fue cambiar / agregar
spark.shuffle.memoryFraction 0
en
.../spark/conf/spark-defaults.conf
.
Eso nos permitió manejar una cantidad de datos mucho más grande (pero desafortunadamente no infinita) de esta manera.