viajes temperatura tarda tablero spark pero motor marcha largos indicador fallas enciende comunes chevrolet arranque arrancar apache-spark amazon-s3 spark-streaming

apache spark - temperatura - El trabajo de transmisión de estado de Spark se bloquea en el punto de comprobación a S3 después de un tiempo de actividad prolongado



spark tarda en arrancar (1)

He estado probando recientemente nuestra aplicación Spark Streaming. La prueba de esfuerzo ingiere aproximadamente 20,000 mensajes / seg con tamaños de mensajes que varían entre 200bytes - 1K en Kafka, donde Spark Streaming lee lotes cada 4 segundos.

Nuestro clúster Spark se ejecuta en la versión 1.6.1 con el administrador de clúster independiente, y estamos usando Scala 2.10.6 para nuestro código.

Después de una ejecución de aproximadamente 15-20 horas, uno de los ejecutores que está iniciando un punto de control (hecho en un intervalo de 40 segundos) está atascado con el siguiente seguimiento de la pila y nunca completa:

java.net.SocketInputStream.socketRead0 (Método nativo) java.net.SocketInputStream.socketRead (SocketInputStream.java:116) java.net.SocketInputStream.read (SocketInputStream.java:170) java.net.SocketInputStream.read (SocketInputStream.java : 141) sun.security.ssl.InputRecord.readFully (InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record (InputRecord.java:593) sun.security.ssl.InputRecord.read (InputRecord.java:532 ) sun.security.ssl.SSLSocketImpl.readRecord (SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake (SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1403) sun .security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory .java: 401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection (DefaultClientConnectionOperator.java:177) org.apache .http.impl.conn.AbstractPoolEntry.open (AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open (AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector.tryConnect (DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute (DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute (AbstractHttpClient.java:863) org.apache .http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService .performRequest (RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest (RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead (RestStorageService.java) : 1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl (RestStorageSe) rvice.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl (RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails (StorageService.java:1120) org.jets3t.service. StorageService.getObjectDetails (StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata (Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32.invoke (Fuente desconocida) sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl) .java: 43) java.lang.reflect.Method.invoke (Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:187) org.apache.hadoop.io.retry .RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native. $ Proxy18.retrieveMetadata (Fuente desconocida) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus (NativeS3FileSystem.java:472) org .apache.hadoop.fs.FileSystem.exists (FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile (ReliableCheckpointRDD.scala: 168) org.apache.spark.rdd.ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.apply (ReliableCheckpointRDD.scala: 136) org.apache.spark.rdd .ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.apply (ReliableCheckpointRDD.scala: 136) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) org.apache.spark.scheduler.Task.run (Task.scala : 89) org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) java.lang.Thread.run (Thread.java:745)

Mientras está atascado, el conductor de la chispa se niega a continuar procesando los lotes entrantes, y crea una gran acumulación de lotes en cola que no se pueden procesar hasta que se libere la tarea que está "atascada".

Además, al observar el volcado del hilo del controlador en streaming-job-executor-0 se ve claramente que está esperando que se complete esta tarea:

java.lang.Object.wait (Método nativo) java.lang.Object.wait (Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult (JobWaiter.scala: 73) org.apache.spark.scheduler .DAGScheduler.runJob (DAGScheduler.scala: 612) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1832) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1845) org.apache.spark.SparkContext .runJob (SparkContext.scala: 1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory (ReliableCheckpointRDD.scala: 135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint (ReliableRDDCheckpointData.scala: 58) org.apache. spark.rdd.RDDCheckpointData.checkpoint (RDDCheckpointData.scala: 74) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1.aplicación $ mcV $ sp (RDD.scala: 1682) org.apache.spark.rdd. RDD $$ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 150) org.apache.spark.r dd.RDD.doCheckpoint (RDD.scala: 1678) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) org.apache. spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) scala.collection.immutable.List.foreach (List.scala: 318)

¿Alguien ha experimentado tal problema?


El bloqueo del socket ocurre debido a un error en la biblioteca HttpClient utilizado por org.jets3t donde el protocolo de enlace SSL no utiliza el tiempo de espera especificado. Puede encontrar los detalles del problema aquí .

Este error se reproduce en las versiones de HttpClient debajo de v4.5.1, donde fue corregido. Desafortunadamente, Spark 1.6.x usa v4.3.2, que no tiene el arreglo provisto.

Hay tres posibles soluciones que he pensado hasta ahora:

  1. Utilice el mecanismo de especulación de Spark a través de la configuración de spark.speculation . Esto ayuda con las cajas de borde de la colgada, ya que se reproduce con poca frecuencia y bajo carga. Tenga en cuenta que esto puede causar algunos falsos positivos al comienzo del trabajo de transmisión, donde la chispa no tiene una buena impresión de cuánto tiempo dura su tarea mediana, pero definitivamente no es algo que cause un retraso notable.

    La documentación dice:

    Si se establece en "verdadero", realiza la ejecución especulativa de tareas. Esto significa que si una o más tareas se ejecutan lentamente en una etapa, serán relanzadas.

    Lo enciendes suministrando las banderas para enviar chispas:

    spark-submit / --conf "spark.speculation=true" / --conf "spark.speculation.multiplier=5" /

    Para más información sobre las diferentes configuraciones que puede pasar, vea la página de Configuración de Spark

  2. Pasando manualmente HttpClient v4.5.1 o superior al classpath de Sparks, para que pueda cargar este JAR antes de uno que tenga en su JAR superior. Esto puede ser un poco difícil ya que el proceso de carga de clases con Spark es un poco engorroso. Esto significa que puedes hacer algo como:

    CP=''''; for f in /path/to/httpcomponents-client-4.5.2/lib/*.jar; do CP=$CP$f:; done SPARK_CLASSPATH="$CP" sbin/start-master.sh # on your master machine SPARK_CLASSPATH="$CP" sbin/start-slave.sh ''spark://master_name:7077''

    O simplemente actualice la versión específica del JAR a SPARK_CLASSPATH en spark-env.sh .

  3. Actualizando a Spark 2.0.0 . La nueva versión de Spark usa HttpClient v4.5.2 que resuelve este problema.