out of memory - Spark java.lang.OutOfMemoryError: espacio de pila de Java
out-of-memory apache-spark (5)
Mi clúster: 1 maestro, 11 esclavos, cada nodo tiene 6 GB de memoria.
Mi configuración:
spark.executor.memory=4g, Dspark.akka.frameSize=512
Aquí está el problema:
Primero , leo algunos datos (2,19 GB) de HDFS a RDD:
val imageBundleRDD = sc.newAPIHadoopFile(...)
Segundo , haz algo en este RDD:
val res = imageBundleRDD.map(data => {
val desPoints = threeDReconstruction(data._2, bg)
(data._1, desPoints)
})
Por último , salida a HDFS:
res.saveAsNewAPIHadoopFile(...)
Cuando ejecuto mi programa, muestra:
.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since ''akka.jvm-exit-on-fatal-error'' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space
Hay demasiadas tareas?
PD : Todo está bien cuando los datos de entrada son de aproximadamente 225 MB.
¿Como puedó resolver esté problema?
Debe aumentar la memoria del controlador. En tu carpeta $ SPARK_HOME / conf, deberías encontrar el archivo spark-defaults.conf
, editar y configurar spark.driver.memory 4000m
dependiendo de la memoria de tu master, creo. Esto es lo que me solucionó el problema y todo funciona sin problemas
Eche un vistazo a las secuencias de comandos de inicio , allí se establece un tamaño de almacenamiento dinámico de Java, parece que no está configurando esto antes de ejecutar Spark worker.
# Set SPARK_MEM if it isn''t already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
Puede encontrar la documentación para implementar scripts here .
La ubicación para establecer el tamaño del almacenamiento dinámico (al menos en spark-1.0.0) está en conf / spark-env. Las variables relevantes son SPARK_EXECUTOR_MEMORY
y SPARK_DRIVER_MEMORY
. Más documentos están en la guía de implementación
Además, no olvide copiar el archivo de configuración a todos los nodos esclavos.
Para agregar un caso de uso a esto que a menudo no se discute, presentaré una solución al enviar una aplicación Spark
través de spark-submit
en modo local .
Según el gitbook Mastering Apache Spark de Jacek Laskowski :
Puede ejecutar Spark en modo local. En este modo de despliegue JVM único no distribuido, Spark genera todos los componentes de ejecución (controlador, ejecutor, servidor y maestro) en la misma JVM. Este es el único modo en el que se usa un controlador para la ejecución.
Por lo tanto, si está experimentando errores OOM
con el heap
, basta con ajustar la driver-memory
del driver-memory
lugar de la executor-memory
del executor-memory
.
Aquí hay un ejemplo:
spark-1.6.1/bin/spark-submit
--class "MyClass"
--driver-memory 12g
--master local[*]
target/scala-2.10/simple-project_2.10-1.0.jar
Tengo algunas sugerencias:
- Si sus nodos están configurados para tener un máximo de 6g para Spark (y están dejando un poco para otros procesos), entonces use 6g en lugar de 4g,
spark.executor.memory=6g
. Asegúrate de estar usando la mayor cantidad de memoria posible marcando la IU (indicará la cantidad de memoria que estás usando) - Intenta usar más particiones, debes tener de 2 a 4 por CPU. IME aumentar la cantidad de particiones a menudo es la forma más fácil de hacer que un programa sea más estable (y, a menudo más rápido). Para grandes cantidades de datos puede que necesite mucho más de 4 por CPU, ¡en algunos casos he tenido que usar 8000 particiones!
- Disminuya la fracción de memoria reservada para el almacenamiento en caché , utilizando
spark.storage.memoryFraction
. Si no usacache()
opersist
en su código, esto también podría ser 0. Su valor predeterminado es 0.6, lo que significa que solo obtiene 0.4 * 4g de memoria para su pila. El IME reduce la fracción de memoria a menudo hace que los OOM desaparezcan. ACTUALIZACIÓN: a partir de la chispa 1.6 aparentemente ya no necesitaremos jugar con estos valores, la chispa los determinará automáticamente. - Similar a la fracción de memoria aleatoria anterior. Si su trabajo no necesita mucha memoria aleatoria, configúrelo en un valor inferior (esto podría causar que sus mezclas se derramen en el disco, lo que puede tener un impacto catastrófico en la velocidad). A veces, cuando se trata de una operación aleatoria que es OOM, tienes que hacer lo contrario, es decir, configurarlo en algo grande, como 0,8, o asegurarte de permitir que tus mezclas se derramen en el disco (es el valor predeterminado desde 1.0.0).
- Tenga cuidado con las pérdidas de memoria , a menudo son causadas por el cierre accidental de objetos que no necesita en su lambda. La forma de diagnosticar es buscar la "tarea serializada como XXX bytes" en los registros, si XXX es mayor que unas pocas k o más que un MB, es posible que tenga una pérdida de memoria. Ver https://.com/a/25270600/1586965
- Relacionado con arriba; use variables de difusión si realmente necesita objetos grandes.
- Si está almacenando en caché RDD grandes y puede sacrificar parte del tiempo de acceso, considere la serialización del RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage . O incluso almacenarlas en caché en un disco (que a veces no es tan malo si se usan SSD).
- ( Avanzado ) Relacionado con lo anterior, evite
String
y estructuras fuertemente anidadas (comoMap
y clases de casos anidados). Si es posible, intente utilizar solo tipos primitivos e indexe todos los no primitivos, especialmente si espera muchos duplicados. ElijaWrappedArray
sobre estructuras anidadas siempre que sea posible. O incluso despliegue su propia serialización: USTED tendrá la mayor información sobre cómo respaldar eficientemente sus datos en bytes, ¡ ÚSELO ! - ( bit hacky ) De nuevo, cuando guardes en caché, considera utilizar un
Dataset
para almacenar en caché tu estructura, ya que usará una serialización más eficiente. Esto debe considerarse como un truco cuando se compara con el punto anterior. Desarrollar el conocimiento de su dominio en su algoritmo / serialización puede minimizar la memoria / espacio en caché en 100x o 1000x, mientras que todo unDataset
probablemente dará 2x - 5x en la memoria y 10x comprimido (parquet) en el disco.
http://spark.apache.org/docs/1.2.1/configuration.html
EDITAR: (Así que puedo googlearme más fácilmente) Lo siguiente también es indicativo de este problema:
java.lang.OutOfMemoryError : GC overhead limit exceeded