scheduling apache-spark

scheduling - ¿Qué factores deciden el número de ejecutores en un modo independiente?



apache-spark (4)

Dada una aplicación Spark

  1. ¿Qué factores deciden el número de ejecutores en un modo independiente? En Mesos y YARN de acuerdo con this documentos, podemos especificar el número de ejecutores / núcleos y memoria.

  2. Una vez que una serie de ejecutores se inician. ¿Spark inicia las tareas de forma rotatoria o es lo suficientemente inteligente como para ver si algunos de los ejecutores están inactivos / ocupados y luego programan las tareas en consecuencia?

  3. Además, ¿cómo decide Spark el número de tareas? write un programa simple de temperatura máxima con un pequeño conjunto de datos y Spark generó dos tareas en un solo ejecutor. Esto está en el modo autónomo Spark.


Contestando tus preguntas:

  1. El modo independiente utiliza la misma variable de configuración que los modos Mesos e Hilo para establecer el número de ejecutores. La variable spark.cores.max define el número máximo de núcleos utilizados en el contexto de la chispa. El valor predeterminado es infinito, por lo que Spark utilizará todos los núcleos del clúster. La variable spark.task.cpus define cuántas CPU Spark asignará para una sola tarea, el valor predeterminado es 1. Con estas dos variables, puede definir el número máximo de tareas paralelas en su clúster.

  2. Cuando crea una subclase RDD, puede definir en qué máquinas ejecutar su tarea. Esto se define en el método getPreferredLocations . Pero como las firmas del método sugieren que esto es solo una preferencia, entonces si Spark detecta que una máquina no está ocupada, iniciará la tarea en esta máquina inactiva. Sin embargo, no conozco el mecanismo utilizado por Spark para saber qué máquinas están inactivas. Para lograr la localidad, nosotros (Stratio) decidimos hacer cada Partions más pequeñas para que la tarea requiera menos tiempo y alcance la localidad.

  3. El número de tareas de cada operación de Spark se define de acuerdo con la longitud de las particiones del RDD. Este vector es el resultado del método getPartitions que debe anular si desea desarrollar una nueva subclase RDD. Este método devuelve cómo se divide un RDD, donde está la información y las particiones. Cuando une dos o más RDD que utilizan, por ejemplo, operaciones de unión o unión, el número de tareas del RDD resultante es el número máximo de tareas de los RDD involucrados en la operación. Por ejemplo: si se une a RDD1 que tiene 100 tareas y RDD2 que tiene 1000 tareas, la siguiente operación del RDD resultante tendrá 1000 tareas. Tenga en cuenta que un alto número de particiones no es necesariamente sinónimo de más datos.

Espero que esto sea de ayuda.


Estoy de acuerdo con @jlopezmat sobre cómo Spark elige su configuración. Con respecto a su código de prueba, está viendo dos tareas debido a la forma en que se implementa textFile . Desde SparkContext.scala :

/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) }

y si comprobamos cuál es el valor de defaultMinPartitions :

/** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: Int = math.min(defaultParallelism, 2)


Respondiendo algunos puntos que no fueron abordados en respuestas anteriores:

  • en el modo Independiente, debe jugar con --executor-cores y --max-executor-cores para establecer el número de ejecutores que se iniciarán (siempre que tenga suficiente memoria para ajustarse a ese número si especifica --executor-memory )

  • Spark no asigna la tarea de forma rotatoria, sino que utiliza un mecanismo llamado " Programación de retardo ", que es una técnica basada en la extracción que permite a cada ejecutor ofrecer su disponibilidad al maestro, que decidirá si enviar o no una tarea. en eso.


Spark elige el número de tareas en función del número de particiones en el conjunto de datos original. Si está utilizando HDFS como su fuente de datos, entonces la cantidad de particiones será igual a la cantidad de bloques HDFS, de forma predeterminada. Puede cambiar el número de particiones de diferentes maneras. Los dos primeros: como argumento adicional al método SparkContext.textFile ; llamando al método RDD.repartion .