apache-spark - cores - properties spark
FetchFailedException o MetadataFetchFailedException al procesar grandes conjuntos de datos (4)
Cuando ejecuto el código de análisis con un conjunto de datos de 1 GB, se completa sin ningún error. Pero cuando intento 25 gb de datos a la vez, obtengo errores por debajo. Estoy tratando de entender cómo puedo evitar las fallas a continuación. Feliz de escuchar cualquier sugerencia o idea.
Diferentes errores,
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094}
Detalles del cluster:
Hilo: 8 Nodos
Total de núcleos: 64
Memoria: 500 GB
Versión de chispa: 1.5
Spark enviar declaración:
spark-submit --master yarn-cluster /
--conf spark.dynamicAllocation.enabled=true /
--conf spark.shuffle.service.enabled=true /
--executor-memory 4g /
--driver-memory 16g /
--num-executors 50 /
--deploy-mode cluster /
--executor-cores 1 /
--class my.parser /
myparser.jar /
-input xxx /
-output xxxx /
Uno de la pila de seguimiento:
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Además de los problemas de configuración de red y memoria descritos anteriormente, vale la pena señalar que para tablas grandes (por ejemplo, varios TB aquí), org.apache.spark.shuffle.FetchFailedException puede ocurrir debido a un tiempo de espera para recuperar particiones aleatorias. Para solucionar este problema, puede configurar lo siguiente:
SET spark.reducer.maxReqsInFlight=1; -- Only pull one file at a time to use full network bandwidth.
SET spark.shuffle.io.retryWait=60s; -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.
SET spark.shuffle.io.maxRetries=10;
Bueno, es un hilo viejo y hay bastantes respuestas en torno a , pero perdí un par de días por este error y creo que compartir la historia podría ayudar.
En realidad, hay un par de formas en que esto puede suceder. Como mencionó la gran respuesta de Glennie, este es probablemente un problema de memoria, así que asegúrate de tener suficiente memoria para todo . Hay configuraciones de memoria de contenedor, memoria de AM, memoria de mapa, memoria de reducción, etc. a tener en cuenta. Leer this puede ser de mucha ayuda para encontrar las configuraciones correctas. Debería elegir los números usted mismo, pero aquí hay algunas propiedades que he establecido.
hilo-sitio.xml
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>32768</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>4096</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>4096</value>
</property>
mapred-site.xml
<property>
<name>mapreduce.map.memory.mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
Estos pueden corregir algunos otros errores con los que podría encontrarse, como el bloqueo de PySpark en el inicio. Pero en mi caso, aunque algunos errores desaparecieron (como los errores de MetadataFetchFailed), el problema persistió. El error exacto fue:
org.apache.spark.shuffle.FetchFailedException: Error al conectarse a DB-ETA-C / xxxx: 34085
Después de jugar con todas las propiedades posibles de YARN y Spark, desde los tiempos de espera de Spark hasta el servicio shuffle de YARN, me di cuenta de que en los registros de errores el contenedor netstat -tulpn | grep <PORT NUM>
está buscando xxxx
, la IP local (interna) mientras netstat -tulpn | grep <PORT NUM>
netstat -tulpn | grep <PORT NUM>
NÚMERO DE netstat -tulpn | grep <PORT NUM>
devolvió yyyy: 34085 en el que yyyy es la dirección IP externa . No era un problema de memoria, era simplemente un problema de configuración de red.
El servicio Spark se vinculaba solo a la interfaz externa, porque el nombre de host estaba asociado con la IP externa en /etc/hosts
. Después de actualizar el /etc/hosts
, el problema se solucionó.
En pocas palabras: el error obviamente dice que algún contenedor no puede alcanzar otro. Por lo general, esto se debe a la falla de los contenedores debido a problemas de memoria, pero también puede ser un problema de red, así que tenga cuidado con ellos, especialmente si tiene varias interfaces en sus nodos.
Es casi seguro que este error sea causado por problemas de memoria en sus ejecutores. Puedo pensar en un par de maneras de abordar este tipo de problemas.
1) Podría intentar ejecutar con más particiones (haga una partición en su dataframe
). Los problemas de memoria normalmente surgen cuando una o más particiones contienen más datos de los que caben en la memoria.
2) Me doy cuenta de que no ha establecido explícitamente spark.yarn.executor.memoryOverhead
, por lo que se establecerá de forma predeterminada en max(386, 0.10* executorMemory)
que en su caso será de 400MB. Eso me suena bajo. Intentaría aumentarlo para decir 1 GB (tenga en cuenta que si aumenta la memoria Sobresaliente a 1 GB, debe reducir --executor-memory
a 3GB)
3) Busque en los archivos de registro en los nodos que fallan. Quieres buscar el texto "Contenedor de matanza". Si ve que el texto "va más allá de los límites de la memoria física", el aumento de la memoria en la parte superior, según mi experiencia, resolverá el problema.
También he tenido algunos buenos resultados al aumentar el tiempo de espera de chispa spark.network.timeout
a un valor mayor como 800. Los 120 segundos predeterminados harán que muchos de tus ejecutores se spark.network.timeout
bajo una carga pesada.