spark que medicina común aws apache-spark emr amazon-emr bigdata

apache-spark - medicina - que es emr



“Contenedor matado por YARN por exceder los límites de memoria. 10.4 GB de 10.4 GB de memoria física utilizada "en un clúster EMR con 75 GB de memoria (5)

Estoy ejecutando un clúster Spark de 5 nodos en AWS EMR cada tamaño m3.xlarge (1 maestro 4 esclavos). Corrí con éxito a través de un archivo CSV comprimido bzip2 de 146Mb y terminé con un resultado perfectamente agregado.

Ahora estoy tratando de procesar un archivo CSV bzip2 de ~ 5GB en este clúster pero recibo este error:

16/11/23 17:29:53 WARN TaskSetManager: tarea perdida 49.2 en la etapa 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (ejecutor 16 finalizado por una de las tareas en ejecución) Reason: Container matado por YARN por exceder los límites de memoria. 10.4 GB de 10.4 GB de memoria física utilizada. Considera potenciar spark.yarn.executor.memoryOverhead.

Estoy confundido en cuanto a por qué obtengo un límite de memoria de ~ 10.5GB en un cluster de ~ 75GB (15GB por instancia de 3m.xlarge) ...

Aquí está mi configuración de EMR:

[ { "classification":"spark-env", "properties":{ }, "configurations":[ { "classification":"export", "properties":{ "PYSPARK_PYTHON":"python34" }, "configurations":[ ] } ] }, { "classification":"spark", "properties":{ "maximizeResourceAllocation":"true" }, "configurations":[ ] } ]

Por lo que he leído, configurar la propiedad maximarResourceAllocation debería decirle a EMR que configure Spark para utilizar completamente todos los recursos disponibles en el clúster. Es decir, debería tener ~ 75GB de memoria disponible ... Entonces, ¿por qué obtengo un error de límite de memoria de ~ 10.5GB? Aquí está el código que estoy ejecutando:

def sessionize(raw_data, timeout): # https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html window = (pyspark.sql.Window.partitionBy("user_id", "site_id") .orderBy("timestamp")) diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1) .over(window)) time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff) .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0))) window = (pyspark.sql.Window.partitionBy("user_id", "site_id") .orderBy("timestamp") .rowsBetween(-1, 0)) sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window)))) return sessions def aggregate_sessions(sessions): median = pyspark.sql.functions.udf(lambda x: statistics.median(x)) aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg( pyspark.sql.functions.first("site_id").alias("site_id"), pyspark.sql.functions.first("user_id").alias("user_id"), pyspark.sql.functions.count("id").alias("hits"), pyspark.sql.functions.min("timestamp").alias("start"), pyspark.sql.functions.max("timestamp").alias("finish"), median(pyspark.sql.functions.collect_list("foo")).alias("foo"), ) return aggregated spark_context = pyspark.SparkContext(appName="process-raw-data") spark_session = pyspark.sql.SparkSession(spark_context) raw_data = spark_session.read.csv(sys.argv[1], header=True, inferSchema=True) # Windowing doesn''t seem to play nicely with TimestampTypes. # # Should be able to do this within the ``spark.read.csv`` call, I''d # think. Need to look into it. convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp) raw_data = raw_data.withColumn("timestamp", convert_to_unix(pyspark.sql.functions.col("timestamp"))) sessions = sessionize(raw_data, SESSION_TIMEOUT) aggregated = aggregate_sessions(sessions) aggregated.foreach(save_session)

Básicamente, nada más que ventanas y un grupoBy para agregar los datos.

Comienza con algunos de esos errores, y hacia la detención aumenta en la cantidad del mismo error.

He intentado ejecutar spark-submit con --conf spark.yarn.executor.memoryOverhead pero eso tampoco parece resolver el problema.


Intenta repartir Funciona en mi caso.

El marco de datos no era tan grande al principio cuando se cargó con write.csv (). El archivo de datos fue de aproximadamente 10 MB, según se requiera, por ejemplo, un total de memoria de 100 MB para cada tarea de procesamiento en el ejecutor. Revisé el número de particiones para ser 2 en el momento. Luego crece como una bola de nieve durante las siguientes operaciones que se unen a otras tablas, agregando nuevas columnas. Y luego me encontré con la memoria que excede los límites en un paso determinado. Revisé el número de particiones, todavía es 2, derivado del marco de datos original, supongo. Así que intenté reparticionarlo desde el principio, y no hubo más problema de este tipo.

Todavía no leí muchos materiales sobre Spark y YARN. Lo que sé es que hay ejecutores en nodos, un ejecutor podría manejar muchas tareas dependiendo de los recursos. Mi conjetura es que una partición sería asignada atómicamente a una tarea. Y su volumen determina el uso de recursos. Spark no podría cortarlo si una partición crece demasiado.

Una estrategia razonable es determinar los nodos y la memoria del contenedor primero, ya sea 10GB o 5GB. Idealmente, ambos podrían servir cualquier trabajo de procesamiento de datos, solo una cuestión de tiempo. Dada la configuración de memoria de 5GB, la fila razonable para una partición que encuentre, digamos que es 1000 después de la prueba (no fallará ningún paso durante el procesamiento), podríamos hacerlo como el siguiente pseudo código:

RWS_PER_PARTITION = 1000 input_df = spark.write.csv("file_uri", *other_args) total_rows = input_df.count() original_num_partitions = input_df.getNumPartitions() numPartitions = max(total_rows/RWS_PER_PARTITION, original_num_partitions) input_df = input_df.repartition(numPartitions)

¡Deséenlo ayuda!


Si no está utilizando spark-submit y está buscando otra forma de especificar el yarn.nodemanager.vmem-check-enabled mencionado por Duff , aquí hay otras 2 formas:

Método 2

Si está utilizando un archivo de configuración JSON (que pasa a la CLI de AWS o al script de boto3), deberá agregar la siguiente configuración:

[{ "Classification": "yarn-site", "Properties": { "yarn.nodemanager.vmem-check-enabled": "false" } }]

Método 3

Si usa la consola EMR, agregue la siguiente configuración:

classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled=false]


Siento tu dolor..

Tuvimos problemas similares de falta de memoria con Spark en YARN. Tenemos cinco máquinas virtuales de 64 GB, 16 núcleos y, independientemente de a qué configuramos spark.yarn.executor.memoryOverhead , no pudimos obtener suficiente memoria para estas tareas: eventualmente morirían sin importar cuánta memoria les demos. Y esto como una aplicación Spark relativamente sencilla que estaba causando que esto sucediera.

Nos dimos cuenta de que el uso de memoria física era bastante bajo en las máquinas virtuales, pero el uso de memoria virtual era extremadamente alto. Establecimos yarn.nodemanager.vmem-check-enabled en yarn-site.xml en false y nuestros contenedores ya no se eliminaron, y la aplicación pareció funcionar como se esperaba.

Haciendo más investigación, encontré la respuesta a por qué esto sucede aquí: https://www.mapr.com/blog/best-practices-yarn-resource-management

Dado que en Centos / RHEL 6 hay una asignación agresiva de memoria virtual debido al comportamiento del sistema operativo, debe deshabilitar el comprobador de memoria virtual o aumentar el índice yarn.nodemanager.vmem-pmem a un valor relativamente mayor.

Esa página tenía un enlace a una página muy útil de IBM: https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

En resumen, glibc> 2.10 cambió su asignación de memoria. Y aunque la gran cantidad de memoria virtual asignada no es el fin del mundo, no funciona con la configuración predeterminada de YARN.

En lugar de configurar yarn.nodemanager.vmem-check-enabled en falso, también puede jugar configurando la variable de entorno MALLOC_ARENA_MAX en un número bajo en hadoop-env.sh .

Recomiendo leer ambas páginas, la información es muy útil.


Tuve el mismo problema en un clúster pequeño que ejecuta un trabajo relativamente pequeño en la chispa 2.3.1. El trabajo lee el archivo de parquet, elimina duplicados usando groupBy / agg / first luego ordena y escribe un nuevo parquet. Procesó 51 GB de archivos de parquet en 4 nodos (4 vcores, 32Gb RAM).

El trabajo estaba fallando constantemente en la etapa de agregación. Escribí el uso de memoria de los ejecutores de bash script watch y descubrí que en la mitad de la etapa, un ejecutor aleatorio comienza a tomar memoria doble durante unos segundos. Cuando correlacioné el tiempo de este momento con los registros de GC, coincidió con el GC completo que vacía gran cantidad de memoria.

Por fin entendí que el problema está relacionado de alguna manera con GC. ParallelGC y G1 causan este problema constantemente, pero ConcMarkSweepGC mejora la situación. El problema aparece solo con una pequeña cantidad de particiones. OpenJDK 64-Bit (build 25.171-b10) el trabajo en EMR donde se instaló OpenJDK 64-Bit (build 25.171-b10) . No sé la causa raíz del problema, podría estar relacionado con la JVM o el sistema operativo. Pero definitivamente no está relacionado con el uso de pilas o de otras en mi caso.

ACTUALIZACIÓN1

Intentado Oracle HotSpot, el problema se reproduce.


Ver,

Tuve el mismo problema en un grupo enorme en el que estoy trabajando ahora. El problema no se resolverá al agregar memoria al trabajador. A veces, en el proceso de agregación, la chispa usará más memoria de la que tiene y los trabajos de la chispa comenzarán a usar la memoria fuera del montón.

Un ejemplo simple es:

Si tiene un conjunto de datos que necesita para reduceByKey , a veces agregará más datos en un trabajador que en otro, y si estos datos pasan a la memoria de un trabajador, aparece ese mensaje de error.

Agregar la opción spark.yarn.executor.memoryOverhead te ayudará si configuras el 50% de la memoria utilizada para el trabajador (solo para prueba, y ver si funciona, puedes agregar menos con más pruebas).

Pero debe comprender cómo funciona Spark con la asignación de memoria en el clúster:

  1. La forma más común en que Spark utiliza el 75% de la memoria de la máquina. El resto va a SO.
  2. Spark tiene dos tipos de memoria durante la ejecución. Una parte es para ejecución y la otra es el almacenamiento. La ejecución se utiliza para Shuffles, Joins, Aggregations y Etc. El almacenamiento se utiliza para almacenar en caché y propagar datos en todo el clúster.

Una cosa buena acerca de la asignación de memoria, si no está usando caché en su ejecución, puede configurar la chispa para usar ese espacio de almacenamiento para trabajar con la ejecución para evitar, en parte, el error OOM. Como se puede ver en la documentación de chispa:

Este diseño asegura varias propiedades deseables. Primero, las aplicaciones que no usan el almacenamiento en caché pueden usar todo el espacio para la ejecución, evitando derrames innecesarios en el disco. En segundo lugar, las aplicaciones que utilizan el almacenamiento en caché pueden reservar un espacio mínimo de almacenamiento (R) donde sus bloques de datos son inmunes a ser desalojados. Por último, este enfoque proporciona un rendimiento razonable inmediato para una variedad de cargas de trabajo sin requerir la experiencia del usuario de cómo se divide internamente la memoria.

Pero, ¿cómo podemos usar eso?

Puede cambiar algunas configuraciones. Agregue la configuración de MemoryOverhead a su llamada de trabajo, pero considere agregar esto también: cambio de spark.memory.fraction para 0.8 o 0.85 y reduzca el spark.memory.storageFraction a 0.35 o 0.2.

Otras configuraciones pueden ayudar, pero debe verificar en su caso. here toda esta configuración.

Ahora, lo que ayuda en mi caso.

Tengo un cluster con 2.5K trabajadores y 2.5TB de RAM. Y nos enfrentábamos a un error de OOM como el tuyo. Solo aumentamos el spark.yarn.executor.memoryOverhead a 2048. Y habilitamos la asignación dinámica . Y cuando llamamos al trabajo, no establecemos la memoria para los trabajadores, dejamos eso para que la Chispa decida. Acabamos de configurar el Overhead.

Pero para algunas pruebas para mi clúster pequeño, cambiar el tamaño de la memoria de ejecución y almacenamiento. Eso solucionó el problema.