spark retainedjobs jars deploy defaultcores cores hadoop apache-spark yarn

hadoop - retainedjobs - spark local ip



Apache Spark: la cantidad de nĂșcleos frente a la cantidad de ejecutores (7)

Para ayudar a que todo esto sea un poco más concreto, he aquí un ejemplo de cómo configurar una aplicación Spark para usar la mayor cantidad de clúster posible: Imagine un clúster con seis nodos ejecutando NodeManagers, cada uno equipado con 16 núcleos y 64 GB de memoria . Las capacidades de NodeManager, yarn.nodemanager.resource.memory-mb y yarn.nodemanager.resource.cpu-vcores, probablemente se deberían establecer en 63 * 1024 = 64512 (megabytes) y 15 respectivamente. Evitamos asignar el 100% de los recursos a los contenedores YARN porque el nodo necesita algunos recursos para ejecutar el sistema operativo y los daemons de Hadoop. En este caso, dejamos un gigabyte y un núcleo para estos procesos del sistema. Cloudera Manager ayuda al contabilizar estos y configurar estas propiedades de YARN automáticamente.

El primer impulso probable sería utilizar --num executors 6 --executor-cores 15 --executor-memory 63G . Sin embargo, este es el enfoque equivocado porque:

63 GB + la carga de memoria del ejecutor no cabe dentro de la capacidad de 63 GB de los NodeManagers. El maestro de aplicaciones tomará un núcleo en uno de los nodos, lo que significa que no habrá espacio para un ejecutor de 15 núcleos en ese nodo. 15 núcleos por ejecutor pueden conducir a un mal rendimiento de E / S HDFS.

Una mejor opción sería usar --num-executors 17 --executor-cores 5 --executor-memory 19G . ¿Por qué?

Esta configuración da como resultado tres ejecutores en todos los nodos, excepto el que tiene AM, que tendrá dos ejecutores. --executor-memory se derivó como (63/3 ejecutores por nodo) = 21. 21 * 0.07 = 1.47. 21 - 1.47 ~ 19.

La explicación fue dada en un artículo en el blog de cloudera blog.cloudera.com/blog/2015/03/…

Estoy tratando de entender la relación entre el número de núcleos y el número de ejecutores cuando ejecuto un trabajo Spark en YARN.

El entorno de prueba es el siguiente:

  • Número de nodos de datos: 3
  • Especificación de la máquina del nodo de datos:
    • CPU: Core i7-4790 (n. ° de núcleos: 4, n. ° de hilos: 8)
    • RAM: 32 GB (8 GB x 4)
    • HDD: 8TB (2TB x 4)
  • Red: 1Gb

  • Versión Spark: 1.0.0

  • Versión de Hadoop: 2.4.0 (Hortonworks HDP 2.1)

  • Flujo de trabajo de chispa: sc.textFile -> filtro -> mapa -> filtro -> mapToPair -> reduceByKey -> mapa -> saveAsTextFile

  • Datos de entrada

    • Tipo: archivo de texto único
    • Tamaño: 165GB
    • Número de líneas: 454,568,833
  • Salida

    • Número de líneas después del segundo filtro: 310,640,717
    • Número de líneas del archivo de resultados: 99,848,268
    • Tamaño del archivo de resultados: 41 GB

El trabajo se ejecutó con las siguientes configuraciones:

  1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (ejecutores por nodo de datos, usa tanto como núcleos)

  2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (nº de núcleos reducidos)

  3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (menos core, más ejecutor)

Tiempos transcurridos:

  1. 50 min 15 seg

  2. 55 min 48 sec

  3. 31 min 23 seg

Para mi sorpresa, (3) fue mucho más rápido.
Pensé que (1) sería más rápido, ya que habría menos comunicación entre ejecutores al barajar.
Aunque # de núcleos de (1) es menor que (3), # de núcleos no es el factor clave ya que 2) funcionó bien.

(Las seguidas se agregaron después de la respuesta de pwilmot).

Para la información, la captura de pantalla del monitor de rendimiento es la siguiente:

  • Resumen del nodo de datos Ganglia para (1) - el trabajo comenzó a las 04:37.
  • Resumen del nodo de datos Ganglia para (3) - el trabajo comenzó a las 19:47. Por favor, ignore el gráfico antes de ese momento.

El gráfico se divide aproximadamente en 2 secciones:

  • Primero: desde el inicio hasta la reducciónByKey: intensivo de la CPU, sin actividad de la red
  • Segundo: después de reduceByKey: la CPU baja, la E / S de red está lista.

Como muestra el gráfico, (1) puede usar tanta potencia de CPU como se le dio. Por lo tanto, podría no ser el problema del número de subprocesos.

¿Cómo explicar este resultado?


Creo que hay un pequeño problema en las dos primeras configuraciones. Los conceptos de hilos y núcleos de la siguiente manera. El concepto de subprocesamiento es si los núcleos son ideales, entonces use ese núcleo para procesar los datos. Entonces, la memoria no se utiliza por completo en los dos primeros casos. Si desea marcar este ejemplo, elija las máquinas que tienen más de 10 núcleos en cada máquina. Luego haz la marca de referencia.

Pero no proporcione más de 5 núcleos por ejecutor, habrá cuello tapado en el rendimiento de E / S.

Entonces, las mejores máquinas para hacer esta marca de banco podrían ser los nodos de datos que tienen 10 núcleos.

Especificación de la máquina del nodo de datos: CPU: Core i7-4790 (n. ° de núcleos: 10, n. ° de hilos: 20) RAM: 32 GB (8 GB x 4) HDD: 8 TB (2 TB x 4)


Creo que una de las principales razones es la localidad. El tamaño del archivo de entrada es 165G, los bloques relacionados del archivo ciertamente se distribuyen en múltiples DataNodes, más ejecutores pueden evitar la copia de red.

Trate de establecer el número de bloques de ejecutores num igual, creo que puede ser más rápido.


Cuando ejecutas tu aplicación de chispa encima de HDFS, según blog.cloudera.com/blog/2015/03/…

Me he dado cuenta de que el cliente HDFS tiene problemas con toneladas de hilos concurrentes. Una suposición aproximada es que, como máximo, cinco tareas por ejecutor pueden lograr un rendimiento de escritura completo, por lo que es bueno mantener el número de núcleos por ejecutor por debajo de ese número.

Así que creo que su primera configuración es más lenta que la tercera porque tiene un mal rendimiento de E / S de HDFS


De los excelentes recursos disponibles en la página del paquete Sparklyr de RStudio :

DEFINICIÓN DE CHISPA :

Puede ser útil proporcionar algunas definiciones simples para la nomenclatura Spark:

Nodo : un servidor

Nodo Trabajador : un servidor que es parte del clúster y está disponible para ejecutar trabajos Spark

Nodo maestro : el servidor que coordina los nodos Worker.

Ejecutor : una especie de máquina virtual dentro de un nodo. Un nodo puede tener múltiples ejecutores.

Nodo de controlador : el nodo que inicia la sesión de Spark. Normalmente, este será el servidor donde se encuentra sparklyr.

Controlador (ejecutor) : el nodo de controlador también aparecerá en la lista Ejecutor.



No he jugado con estos ajustes yo mismo, así que esto es solo una especulación, pero si pensamos en este tema como núcleos normales y subprocesos en un sistema distribuido, entonces en su clúster puede usar hasta 12 núcleos (4 * 3 máquinas) y 24 subprocesos (8 * 3 máquinas). En los primeros dos ejemplos, le está dando a su trabajo una cantidad suficiente de núcleos (espacio de cálculo potencial) pero el número de subprocesos (trabajos) que se ejecutan en esos núcleos es tan limitado que no puede usar gran parte de la potencia de procesamiento asignada y por lo tanto el trabajo es más lento a pesar de que hay más recursos de computación asignados.

Mencionas que tu preocupación estaba en el paso aleatorio: aunque es bueno limitar la sobrecarga en el paso de mezcla, generalmente es mucho más importante utilizar la paralelización del clúster. Piensa en el caso extremo: un solo programa de subprocesos con cero aleatorio.