apache kafka - applications - Zookeeper & Kafka error KeeperErrorCode=NodeExists
use case apache kafka (2)
En mi caso, parece afectar la funcionalidad ya que no puedo consumir mensajes. Ver codigo abajo
Instancia de Vertx = VertxConfig.getInstance ();
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumerConfig.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "firstest"); // consumerConfig.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Properties producerConfig = new Properties();
producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put("acks", "1");
String topic = "dstv-queue-3";
consumer = KafkaConsumer.create(instance, consumerConfig);
producer = KafkaProducer.create(instance, producerConfig, String.class, String.class);
consumer.subscribe(topic);
instance.setPeriodic(2000, worker -> {
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message");
producer.write(record, writeHandler -> {
RecordMetadata metadata = writeHandler.result();
//if meta data returned..
if (metadata != null) {
long offset = metadata.getOffset();
int partition = metadata.getPartition();
System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition);
}
});
});
AtomicLong counter = new AtomicLong();
consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received"));
He escrito un consumer
y producer
kafka
que funciona bien hasta hoy. Esta mañana, cuando comencé zooekeeper
y kafka
, mi consumidor no pudo leer los mensajes y en el Zookeeper log
leí este error.
INFO Got user-level KeeperException when processing sessionid:0x151c41e62e10000 type:create cxid:0x2a zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
¿Usted me podría ayudar? ¿Qué podría haber cambiado en pocos días? No entiendo. Muchas gracias.
Tuve este error en mi Kafka 2.11 que se ejecuta en Windows 7. Creo que esta excepción no es un problema, ya que solo es el nivel de información. Solo asegúrate de que el corredor todavía esté corriendo. Incluso con este error, todavía podría:
- Crea y enumera un tema
kafka-topics.bat
. - Consumir un tema
kafka-console-consumer.bat
. - Envíe programáticamente un mensaje
producer.send(new ProducerRecord<String, String>("topic", "hello"))
.