una tipos terapia tecnologia tareas spark software science restricciones restriccion qué que prohibidas para medicas medica libro las laborales laboral ideas espacio ejercicios ejemplos edición dentro definicion data breve actividades scala apache-spark hadoop apache-spark-sql task

scala - tipos - ¿Por qué tantas tareas en mi trabajo de chispa? Obtención de 200 tareas por defecto



tecnologia spark (2)

Esta es una pregunta clásica de Chispa.

Las dos tareas utilizadas para la lectura (Stage Id 0 en la segunda figura) es la configuración defaultMinPartitions que se establece en 2. Puede obtener este parámetro leyendo el valor en las sc.defaultMinPartitions REPL sc.defaultMinPartitions . También debería estar visible en la interfaz de usuario de Spark bajo el botón "Entorno".

Puede echar un vistazo al code de github para ver que esto es exactamente lo que está sucediendo. Si desea que se usen más particiones en la lectura, solo agréguela como un parámetro, por ejemplo, sc.textFile("a.txt", 20) .

Ahora, la parte interesante proviene de las 200 particiones que vienen en la segunda etapa (Etapa Id 1 en la segunda figura). Bueno, cada vez que se produce un orden aleatorio, Spark debe decidir cuántas particiones tendrá el RDD aleatorio. Como puedes imaginar, el valor predeterminado es 200.

Puedes cambiar eso usando:

sqlContext.setConf("spark.sql.shuffle.partitions", "4”)

Si ejecuta su código con esta configuración, verá que las 200 particiones ya no estarán allí. Cómo configurar este parámetro es una especie de arte. Tal vez elija 2 veces el número de núcleos que tiene (o lo que sea).

Creo que Spark 2.0 tiene una forma de inferir automáticamente el mejor número de particiones para mezclar RDDs. ¡Deseando que llegue!

Finalmente, la cantidad de trabajos que obtiene tiene que ver con la cantidad de acciones RDD que dio como resultado el código de Dataframe optimizado resultante. Si lees las especificaciones de Spark, dice que cada acción RDD activará un trabajo. Cuando la acción involucra un Dataframe o SparkSQL, el optimizador Catalyst descubrirá un plan de ejecución y generará un código basado en RDD para ejecutarlo. Es difícil decir exactamente por qué utiliza dos acciones en su caso. Es posible que deba ver el plan de consulta optimizado para ver exactamente lo que está haciendo.

Tengo un trabajo de chispa que toma un archivo con 8 registros de hdfs, hace una simple agregación y lo guarda de nuevo en hdfs. Me doy cuenta de que hay como cientos de tareas cuando hago esto.

Tampoco estoy seguro de por qué hay varios trabajos para esto? Pensé que un trabajo era más como cuando ocurría una acción. Puedo especular sobre por qué, pero mi entendimiento fue que dentro de este código debería ser un trabajo y debería dividirse en etapas, no en varios trabajos. ¿Por qué no se divide en etapas, por qué se rompe en puestos de trabajo?

En cuanto a las más de 200 tareas, ya que la cantidad de datos y la cantidad de nodos es minúscula, no tiene sentido que haya como 25 tareas para cada fila de datos cuando solo hay una agregación y un par de filtros. ¿Por qué no tendría una sola tarea por partición por operación atómica?

Aquí está el código relevante de Scala:

import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object TestProj {object TestProj { def main(args: Array[String]) { /* set the application name in the SparkConf object */ val appConf = new SparkConf().setAppName("Test Proj") /* env settings that I don''t need to set in REPL*/ val sc = new SparkContext(appConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") /*the below rdd will have schema defined in Record class*/ val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") .map(x=>x.split(" ")) //file record into array of strings based spaces .map(x=>Record( x(0).toInt, x(1).asInstanceOf[String], x(2).asInstanceOf[String], x(3).toInt)) /* the below dataframe groups on first letter of first name and counts it*/ val aggDF = rddCase.toDF() .groupBy($"firstName".substr(1,1).alias("firstLetter")) .count .orderBy($"firstLetter") /* save to hdfs*/ aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg") } case class Record(id: Int , firstName: String , lastName: String , quantity:Int) }

A continuación se muestra la captura de pantalla después de hacer clic en la aplicación

A continuación se muestran las etapas que se muestran al ver el "trabajo" específico de la identificación 0

A continuación se muestra la primera parte de la pantalla al hacer clic en el escenario con más de 200 tareas.

Esta es la segunda parte de la pantalla dentro del escenario.

Abajo está después de hacer clic en la pestaña "ejecutores"

Según lo solicitado, aquí están las etapas para la ID de trabajo 1

Aquí están los detalles para la etapa en el ID de trabajo 1 con 200 tareas


Estoy teniendo un problema similar. Pero en mi caso, la colección que estoy paralelizando tiene menos elementos que la cantidad de tareas programadas por Spark (lo que hace que la chispa se comporte de manera extraña a veces). Usando el número de partición forzada pude arreglar este problema.

Fue algo como esto:

collection = range(10) # In the real scenario it was a complex collection sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario

Entonces, vi en el registro de Spark:

INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks