java - ¿Cuándo lanza el cliente Apache Kafka una excepción de "Batch Expired"?
apache-kafka ibm-cloud (6)
El parámetro que controla el tiempo antes de enviarlo al agente es linger.ms
. Su valor predeterminado es 0 (sin demora).
Utilizando el cliente Java Apache Kafka (0.9), estoy tratando de enviar una larga serie de registros al agente usando la clase Kafka Producer .
El método de envío asíncrono regresa inmediatamente por un tiempo, luego comienza a bloquear cada llamada por un corto período de tiempo. Después de unos treinta segundos, el cliente comienza a lanzar excepciones ( TimeoutException ), con el mensaje "Lote caducado" .
¿Qué circunstancias hacen que se lance esta excepción?
Esta excepción indica que está poniendo en cola los registros a una velocidad mayor de la que pueden enviarse.
Cuando llame al método de envío , ProducerRecord se almacenará en un búfer interno para enviarlo al agente. El método se devuelve inmediatamente una vez que el ProducerRecord se ha almacenado en el búfer, independientemente de si se ha enviado.
Los registros se agrupan en lotes para enviarlos al intermediario, para reducir el transporte que se escucha por mensaje y aumentar el rendimiento.
Una vez que se agrega un registro a un lote, hay un límite de tiempo para enviar ese lote para garantizar que se haya enviado dentro de una duración específica. Esto se controla mediante el parámetro de configuración del productor, request.timeout.ms , que por defecto es de treinta segundos.
Si el lote se ha puesto en cola más tiempo que el límite de tiempo de espera, se lanzará la excepción. Los registros de ese lote se eliminarán de la cola de envío.
El aumento del límite de tiempo de espera, utilizando el parámetro de configuración, permitirá al cliente poner en cola los lotes durante más tiempo antes de que caduque.
Estoy usando la versión 0.11.0.0 del cliente Java de Kafka. También empecé a ver el mismo patrón al no producir mensajes grandes constantemente. Pasaba por algunos de los mensajes, y fallaba por algunos otros. (Aunque los mensajes pasados y fallidos fueron del mismo tamaño). En mi caso, cada tamaño de mensaje fue de alrededor de 60 KB, que es mucho más alto que el batch.size
de batch.size
predeterminado de Kafka de batch.size
, también mi linger.ms
se estableció en el valor predeterminado de 0. Este error se está produciendo porque el cliente Producer está agotando el tiempo de espera antes de que pueda recibir una respuesta exitosa del servidor. Básicamente, en mi código, esta llamada estaba kafkaProd.send(pr).get()
tiempo de espera: kafkaProd.send(pr).get()
. Para solucionar este problema, tuve que aumentar el request.timeout.ms
predeterminado del cliente Producer a 60000
Tenía un problema similar con Kafka ejecutando en una ventana acoplable. Mi docker-compose.yml fue configurado con
KAFKA_ADVERTISED_HOST_NAME: kafka
ports:
- 9092:9092
Pero cuando intenté enviar un mensaje con camello desde la ventana exterior
to("kafka:test?brokers=localhost:9092")
Tengo una TimeoutException. Lo resolví agregando
127.0.0.1 kafka
al archivo Windows / System32 / drivers / etc / hosts y luego cambiar mi URL de camello a
to("kafka:test?brokers=kafka:9092")
Tengo esta excepción en un contexto completamente diferente.
He configurado un mini clúster de un zookeeper vm, un broker vm y un productor / consumidor vm. Abrí todos los puertos necesarios en el servidor (9092) y en el guardián del zoológico (2181) y luego intenté publicar un mensaje del consumidor / editor vm al agente. Obtuve la excepción mencionada por el OP, pero como solo había publicado un mensaje hasta ahora (o al menos lo intenté), la solución no podía ser aumentar el tiempo de espera o el tamaño del lote. Así que busqué y encontré esta lista de correo que describe un problema similar que tuve al intentar consumir mensajes desde el consumidor / productor vm (ClosedChannelException): http://grokbase.com/t/kafka/users/152jsjekrm/having-trouble-with-the-simplest-remote-kafka-config La última publicación en esta lista de correo describe realmente cómo resolver el problema.
En pocas palabras, si se enfrenta a la excepción ChannelClosedException
y Batch Expired
, es probable que tenga que cambiar esta línea a la siguiente en el archivo server.config
y reiniciar el agente:
advertised.host.name=<broker public IP address>
Si no está configurado, host.name
a la propiedad host.name
(que probablemente tampoco esté configurada) y luego host.name
al nombre de host canónico de la clase InetAddress
Java, que finalmente no es correcto, por supuesto, y por lo tanto confundiendo los nodos remotos.
cuando crea el consumidor, establezca ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG en verdadero.