standalone spark example cluster tcp hdfs rpc hortonworks-data-platform namenode

tcp - example - Restablecimiento de la conexión por pares mientras se ejecuta Apache Spark Job



spark yarn example (0)

Tenemos dos configuraciones de clúster HDP llamémoslas A y B.

CLUSTER A NODES :

  • Contiene un total de 20 máquinas de productos básicos.
  • Hay 20 nodos de datos.
  • Como Namenode HA está configurado, hay un namenode activo y otro en espera.

CLUSTER B NODES :

  • Contiene un total de 5 máquinas de productos básicos.
  • Hay 5 nodos de datos.
  • No hay HA configurada y este clúster tiene un namenode primario y uno secundario.

Tenemos tres componentes principales en nuestra aplicación que realizan una operación ETL (Extraer, Transformar y Cargar) en los archivos entrantes. Me referiré a estos componentes como E, T y L respectivamente.

COMPONENTE E CARACTERÍSTICAS :

  • Este componente es un Apache Spark Job y se ejecuta únicamente en el clúster B.
  • Su trabajo consiste en recoger archivos de un almacenamiento NAS y colocarlos en HDFS en el clúster B.

COMPONENTE T CARACTERÍSTICAS :

  • Este componente también es un Apache Spark Job y se ejecuta en el clúster B.
  • Su trabajo es recoger los archivos en HDFS escritos por el componente E, transformarlos y luego escribir los archivos transformados en HDFS en el clúster A.

COMPONENTE L CARACTERÍSTICAS :

  • Este componente también es un trabajo Apache Spark y se ejecuta únicamente en el Grupo A.
  • Su trabajo consiste en recoger los archivos escritos por el Componente T y cargar los datos en las tablas Hive presentes en el Grupo A.

El componente L es la gema entre los tres componentes y no hemos tenido problemas técnicos. Hubo fallas técnicas menores inexplicadas en el componente E, pero el componente T es el más problemático.

Los componentes E y T hacen uso del cliente DFS para comunicarse con el namenode.

Lo siguiente es un extracto de la excepción que hemos observado intermitentemente mientras ejecutamos el componente T:

clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately. java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782) at org.apache.hadoop.ipc.Client.call(Client.java:1459) at org.apache.hadoop.ipc.Client.call(Client.java:1392) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy15.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464) at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy16.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109) at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320) at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149) at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233) at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34) at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47) at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142) at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239) at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072) at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956) at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96) at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131) at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123) at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.FilterInputStream.read(FilterInputStream.java:133) at java.io.FilterInputStream.read(FilterInputStream.java:133) at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)

Como mencionamos, nos enfrentamos a esta excepción de forma muy intermitente y, cuando ocurre, nuestra aplicación se bloquea y nos obliga a reiniciarla.

SOLUCIONES QUE INTENTAMOS:

  • Nuestro primer sospechoso fue que estamos sobrecargando el namenode activo en el clúster A dado que el componente T abre una gran cantidad de clientes DFS en paralelo y realiza operaciones de archivos en diferentes archivos (no hay problemas de contención en los mismos archivos). En nuestro esfuerzo por abordar este problema, analizamos dos parámetros clave para el namenode dfs.namenode.handler.count y ipc.server.listen.queue.size y logramos que este último pase de 128 (valor predeterminado) a 1024.

  • Desafortunadamente, el problema persistía en el componente T. Empezamos a tomar un enfoque diferente sobre el problema. Nos centramos únicamente en encontrar el motivo de la ocurrencia de Connection Reset By Peer. De acuerdo con una gran cantidad de artículos y discusiones de intercambio de pila, el problema se describe de la siguiente manera, el indicador RST ha sido establecido por el par que da como resultado una terminación inmediata de la conexión . En nuestro caso, identificamos que el par era el namenode del cluster A.

  • Manteniendo la bandera RST en mente, profundicé en la comprensión de los aspectos internos de la comunicación TCP solo por la razón de la bandera RST.

  • Cada socket en distribuciones de Linux (no BSD) tiene dos colas asociadas, es decir, la cola accept y la cola de espera.
  • Durante el proceso de enlace TCP, todas las solicitudes se mantienen en la cola de espera hasta que se reciben los paquetes ACK del nodo que comenzó a establecer la conexión. Una vez recibida, la solicitud se transfiere a la cola de aceptación y la aplicación que abrió el socket puede comenzar a recibir paquetes del cliente remoto.
  • El tamaño de la cola de trabajos pendientes está controlado por dos parámetros de nivel kernel llamados net.ipv4.tcp_max_syn_backlog y net.core.somaxconn mientras que la aplicación (namenode en nuestro caso) puede solicitar al kernel el tamaño de cola que desee limitado por un límite superior ( creemos que el tamaño de la cola de aceptación es el tamaño de cola definido por ipc.server.listen.queue.size ).
  • Además, otra cosa interesante a tener en cuenta aquí es que si el tamaño de net.ipv4.tcp_max_syn_backlog es mayor que net.core.somaxconn , entonces el valor del primero se trunca al de este último. Esta afirmación se basa en la documentación de Linux y se puede encontrar en https://linux.die.net/man/2/listen .
  • Volviendo al punto, cuando la acumulación se completa, TCP se comporta de dos maneras y este comportamiento también puede controlarse mediante un parámetro de kernel llamado net.ipv4.tcp_abort_on_overflow . Esto se establece de manera predeterminada en 0 y hace que kernel suelte cualquier nuevo paquete SYN cuando la acumulación está llena, lo que a su vez permite al remitente reenviar paquetes SYN. Cuando se establece en 1, el kernel marcará el indicador RST en un paquete y lo enviará al remitente, lo que terminará abruptamente la conexión.

  • Comprobamos el valor de los parámetros del kernel mencionados anteriormente y descubrimos que net.core.somaxconn está establecido en 1024, net.ipv4.tcp_abort_on_overflow está establecido en 0 y net.ipv4.tcp_max_syn_backlog está establecido en 4096 en todas las máquinas, tanto en el racimos.

  • El único sospechoso que nos queda ahora son los conmutadores que conectan el Grupo A con el Grupo B porque ninguna de las máquinas en el clúster establecerá el indicador RST ya que el parámetro net.ipv4.tcp_abort_on_overflow está establecido en 0.

MIS PREGUNTAS

  • Es evidente a partir de la documentación de HDFS que el cliente DFS utiliza RPC para comunicarse con el namenode para realizar operaciones de archivos. ¿Cada llamada RPC implica el establecimiento de una conexión TCP a namenode?
  • ¿El parámetro ipc.server.listen.queue.size define la longitud de accept queue del socket en el que namenode acepta solicitudes RPC?
  • ¿Puede el namenode cerrar conexiones implícitamente con el cliente DFS cuando está bajo carga pesada, haciendo que el kernel envíe un paquete con el indicador RST establecido, incluso si el parámetro del kernel net.ipv4.tcp_abort_on_overflow está establecido en 0?
  • ¿Los interruptores L2 o L3 (utilizados para conectar las máquinas en nuestros dos clusters) son capaces de establecer la bandera RST porque no pueden manejar los tráficos en ráfagas?

Nuestro próximo acercamiento a este problema es identificar qué máquina o interruptor (no hay un enrutador involucrado) está configurando el indicador RST analizando los paquetes usando tcpdump o wireshark. También toparemos con el tamaño de todas las colas mencionadas anteriormente en 4096 para manejar con eficacia el tráfico de ráfagas.

Los registros de namenode no muestran ningún signo de excepción, excepto que la carga de conexión de Namenode, tal como se ve en Ambari, se asomó a ciertos puntos en el tiempo y no necesariamente cuando se produjo la excepción Restablecer por conexión de la conexión.

Para concluir, quería saber si estamos yendo por el buen camino para resolver este problema o si vamos a llegar a un callejón sin salida.

PD: me disculpo por la longitud del contenido en mi pregunta. Quería presentar todo el contexto a los lectores antes de pedir ayuda o sugerencias. Gracias por su paciencia.