tutorial kafka applications apache-kafka apache-zookeeper

apache kafka - applications - Zookeeper & Kafka error KeeperErrorCode=NodeExists



use case apache kafka (2)

En mi caso, parece afectar la funcionalidad ya que no puedo consumir mensajes. Ver codigo abajo

Instancia de Vertx = VertxConfig.getInstance ();

Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// consumerConfig.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "firstest"); // consumerConfig.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Properties producerConfig = new Properties(); producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerConfig.put("acks", "1"); String topic = "dstv-queue-3"; consumer = KafkaConsumer.create(instance, consumerConfig); producer = KafkaProducer.create(instance, producerConfig, String.class, String.class); consumer.subscribe(topic); instance.setPeriodic(2000, worker -> { KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message"); producer.write(record, writeHandler -> { RecordMetadata metadata = writeHandler.result(); //if meta data returned.. if (metadata != null) { long offset = metadata.getOffset(); int partition = metadata.getPartition(); System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition); } }); }); AtomicLong counter = new AtomicLong(); consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received"));

He escrito un consumer y producer kafka que funciona bien hasta hoy. Esta mañana, cuando comencé zooekeeper y kafka , mi consumidor no pudo leer los mensajes y en el Zookeeper log leí este error.

INFO Got user-level KeeperException when processing sessionid:0x151c41e62e10000 type:create cxid:0x2a zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

¿Usted me podría ayudar? ¿Qué podría haber cambiado en pocos días? No entiendo. Muchas gracias.


Tuve este error en mi Kafka 2.11 que se ejecuta en Windows 7. Creo que esta excepción no es un problema, ya que solo es el nivel de información. Solo asegúrate de que el corredor todavía esté corriendo. Incluso con este error, todavía podría:

  1. Crea y enumera un tema kafka-topics.bat .
  2. Consumir un tema kafka-console-consumer.bat .
  3. Envíe programáticamente un mensaje producer.send(new ProducerRecord<String, String>("topic", "hello")) .