scala - ¿Cuáles son las posibles razones para recibir TimeoutException ?: Futures expiró después de
apache-spark apache-spark-sql (4)
Pregunta: Me preguntaba ¿qué puede causar esta excepción?
Responder :
spark.network.timeout
Tiempo de espera predeterminado para todas las interacciones de red ...spark.network.timeout (spark.rpc.askTimeout)
,spark.sql.broadcastTimeout
,spark.kryoserializer.buffer.max
(si está utilizando la serialización de kryo), etc. .se ajustan con valores mayores que los predeterminados para manejar consultas complejas. Puede comenzar con estos valores y ajustarlos según sus cargas de trabajo SQL.
Nota: Doc dice que
Las siguientes opciones (ver spark.sql. Propiedades) también se pueden utilizar para ajustar el rendimiento de la ejecución de la consulta. Es posible que estas opciones sean obsoletas en futuras versiones a medida que se realicen más optimizaciones automáticamente. *
Además, para su mejor comprensión, puede ver BroadCastHashJoin donde el método de ejecución es el punto de activación para el seguimiento de la pila anterior.
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)
}
}
Esta pregunta ya tiene una respuesta aquí:
Estoy trabajando en un programa Spark SQL y recibo la siguiente excepción:
16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at scala.Option.getOrElse(Option.scala:121)
at com.somecompany.ml.Main$.main(Main.scala:46)
at com.somecompany.ml.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])
La última parte de mi código que reconozco del seguimiento de la pila es
com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
que me lleva a esta línea:
profilesDF.cache()
Antes del almacenamiento en caché, realizo un unión entre 2 marcos de datos.
He visto una respuesta sobre la persistencia de ambos marcos de datos antes de la unión
here
. Todavía necesito almacenar en caché el marco de datos unido ya que lo estoy usando en varias de mis transformaciones
¿Y me preguntaba qué puede causar esta excepción? Su búsqueda me llevó a un enlace relacionado con la excepción de tiempo de espera de rpc o algunos problemas de seguridad que no es mi problema. Si también tiene alguna idea sobre cómo resolverlo, obviamente lo agradecería, pero incluso entender el problema me ayudará a resolverlo.
Gracias por adelantado
Es bueno saber que la sugerencia de Ram funciona en algunos casos. Me gustaría mencionar que me topé con esta excepción un par de veces (incluida la que se describe here ).
La mayor parte del tiempo, se debió a OOM casi silenciosos en algún ejecutor. Verifique en SparkUI las tareas fallidas, última columna de esta tabla: Puede notar mensajes OOM.
Si comprende bien los aspectos internos de la chispa, los datos transmitidos pasan a través del controlador. Por lo tanto, el controlador tiene algún mecanismo de subproceso para recopilar los datos de los ejecutores y enviarlos de vuelta a todos. Si en algún momento falla un ejecutor, puede terminar con estos tiempos de espera.
Había establecido
master as local[n]
cuando envié el trabajo a
Yarn-cluster
.
No establezca el código maestro en el código cuando se ejecuta en el clúster, en su lugar use
--master
.
Si habilitó dynamicAllocation, intente deshabilitar esta configuración (spark.dynamicAllocation.enabled = false). Puede establecer esta configuración de chispa en conf / spark-defaults.conf, como --conf o dentro del código.
Ver también: