scala join apache-spark dataframe timeoutexception

scala - ¿Por qué falla la unión con “java.util.concurrent.TimeoutException: los futuros expiran después de[300 segundos]”?



join apache-spark (2)

Esto sucede porque Spark intenta realizar Broadcast Hash Join y uno de los DataFrames es muy grande, por lo que enviarlo consume mucho tiempo.

Usted puede:

  1. Establezca una mayor spark.sql.broadcastTimeout para aumentar el tiempo de espera - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() ambos DataFrames, luego Spark usará Shuffle Join - referencia desde here

Estoy usando Spark 1.5.

Tengo dos marcos de datos de la forma:

scala> libriFirstTable50Plus3DF res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int] scala> linkPersonItemLessThan500DF res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]

libriFirstTable50Plus3DF tiene 766,151 registros, mientras que linkPersonItemLessThan500DF tiene 26,694,353 registros . Tenga en cuenta que estoy usando la repartition(number) en linkPersonItemLessThan500DF ya que tengo la intención de unirme a estos dos más adelante. Estoy siguiendo el código anterior con:

val userTripletRankDF = linkPersonItemLessThan500DF .join(libriFirstTable50Plus3DF, Seq("family_id")) .take(20) .foreach(println(_))

Para lo cual estoy obteniendo esta salida:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200) java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala: at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91) at $iwC$$iwC$$iwC.<init>(<console>:93) at $iwC$$iwC.<init>(<console>:95) at $iwC.<init>(<console>:97) at <init>(<console>:99) at .<init>(<console>:103) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Y no entiendo cual es el problema. ¿Es tan simple como aumentar el tiempo de espera? ¿La unión es demasiado intensiva? ¿Necesito más memoria? ¿El shufffling es intensivo? ¿Alguien puede ayudar?


Solo para agregar un poco de contexto de código a la respuesta muy concisa de @T. Gawęda .

En su aplicación Spark, Spark SQL eligió una combinación hash de difusión para la unión porque "libriFirstTable50Plus3DF tiene 766,151 registros" que resultó ser menor que el llamado umbral de difusión (predeterminado en 10 MB).

Puede controlar el umbral de difusión utilizando la propiedad de configuración spark.sql.autoBroadcastJoinThreshold .

spark.sql.autoBroadcastJoinThreshold Configura el tamaño máximo en bytes para una tabla que se difundirá a todos los nodos de trabajo cuando se realice una unión. Al establecer este valor en -1 se puede deshabilitar la transmisión. Tenga en cuenta que actualmente las estadísticas solo son compatibles con las tablas de Hive Metastore en las que se ha ejecutado el comando ANALIZAR TABLA ESTADÍSTICAS COMPUTARIAS.

Puede encontrar ese tipo particular de unión en el seguimiento de pila:

org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute (BroadcastHashJoin.scala: 110)

BroadcastHashJoin operador físico BroadcastHashJoin en Spark SQL utiliza una variable de difusión para distribuir el conjunto de datos más pequeño a los ejecutores de Spark (en lugar de enviar una copia de él con cada tarea).

Si utilizó la explain para revisar el plan de consulta física, notará que la consulta utiliza el operador físico BroadcastExchangeExec . Aquí es donde puede ver la maquinaria subyacente para transmitir la tabla más pequeña (y el tiempo de espera).

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] }

doExecuteBroadcast es parte del contrato SparkPlan que todos los operadores físicos en Spark SQL siguen y permiten la transmisión si es necesario. BroadcastExchangeExec pasa a necesitarlo.

El parámetro de timeout es lo que está buscando.

private val timeout: Duration = { val timeoutValue = sqlContext.conf.broadcastTimeout if (timeoutValue < 0) { Duration.Inf } else { timeoutValue.seconds } }

Como puede ver, puede deshabilitarlo completamente (usando un valor negativo) que implicaría esperar a que la variable de difusión se sqlContext.conf.broadcastTimeout a los ejecutores por tiempo indefinido o usar sqlContext.conf.broadcastTimeout que es exactamente la propiedad de configuración spark.sql.broadcastTimeout . El valor predeterminado es 5 * 60 segundos, que puede ver en el seguimiento de pila:

java.util.concurrent.TimeoutException: los futuros se agotaron después de [300 segundos]