scala apache-spark apache-spark-sql spark-dataframe

scala - ¿Cuáles son las posibles razones para recibir TimeoutException ?: Futures expiró después de



apache-spark apache-spark-sql (4)

Pregunta: Me preguntaba ¿qué puede causar esta excepción?

Responder :

spark.sql.broadcastTimeout 300 Tiempo de espera en segundos para el tiempo de espera de transmisión en uniones de transmisión

spark.network.timeout Tiempo de espera predeterminado para todas las interacciones de red ... spark.network.timeout (spark.rpc.askTimeout) , spark.sql.broadcastTimeout , spark.kryoserializer.buffer.max (si está utilizando la serialización de kryo), etc. .se ajustan con valores mayores que los predeterminados para manejar consultas complejas. Puede comenzar con estos valores y ajustarlos según sus cargas de trabajo SQL.

Nota: Doc dice que

Las siguientes opciones (ver spark.sql. Propiedades) también se pueden utilizar para ajustar el rendimiento de la ejecución de la consulta. Es posible que estas opciones sean obsoletas en futuras versiones a medida que se realicen más optimizaciones automáticamente. *

Además, para su mejor comprensión, puede ver BroadCastHashJoin donde el método de ejecución es el punto de activación para el seguimiento de la pila anterior.

protected override def doExecute(): RDD[Row] = { val broadcastRelation = Await.result(broadcastFuture, timeout) streamedPlan.execute().mapPartitions { streamedIter => hashJoin(streamedIter, broadcastRelation.value) } }

Estoy trabajando en un programa Spark SQL y recibo la siguiente excepción:

16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds] java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144) at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129) at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118) at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41) at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93) at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60) at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84) at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581) at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590) at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32) at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52) at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15) at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54) at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54) at scala.Option.getOrElse(Option.scala:121) at com.somecompany.ml.Main$.main(Main.scala:46) at com.somecompany.ml.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) 16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])

La última parte de mi código que reconozco del seguimiento de la pila es com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) que me lleva a esta línea: profilesDF.cache() Antes del almacenamiento en caché, realizo un unión entre 2 marcos de datos. He visto una respuesta sobre la persistencia de ambos marcos de datos antes de la unión here . Todavía necesito almacenar en caché el marco de datos unido ya que lo estoy usando en varias de mis transformaciones

¿Y me preguntaba qué puede causar esta excepción? Su búsqueda me llevó a un enlace relacionado con la excepción de tiempo de espera de rpc o algunos problemas de seguridad que no es mi problema. Si también tiene alguna idea sobre cómo resolverlo, obviamente lo agradecería, pero incluso entender el problema me ayudará a resolverlo.

Gracias por adelantado


Es bueno saber que la sugerencia de Ram funciona en algunos casos. Me gustaría mencionar que me topé con esta excepción un par de veces (incluida la que se describe here ).

La mayor parte del tiempo, se debió a OOM casi silenciosos en algún ejecutor. Verifique en SparkUI las tareas fallidas, última columna de esta tabla: Puede notar mensajes OOM.

Si comprende bien los aspectos internos de la chispa, los datos transmitidos pasan a través del controlador. Por lo tanto, el controlador tiene algún mecanismo de subproceso para recopilar los datos de los ejecutores y enviarlos de vuelta a todos. Si en algún momento falla un ejecutor, puede terminar con estos tiempos de espera.


Había establecido master as local[n] cuando envié el trabajo a Yarn-cluster .

No establezca el código maestro en el código cuando se ejecuta en el clúster, en su lugar use --master .