tutorial topic partitions kafka español create cases java apache-kafka kafka-producer-api

java - topic - ¿Cómo comprobar si Kafka Server se está ejecutando?



kafka partitions (4)

A todos los corredores de Kafka se les debe asignar un broker.id . En el inicio, un agente creará un nodo efímero en Zookeeper con una ruta de /broker/ids/$id . Como el nodo es efímero, se eliminará tan pronto como el intermediario se desconecte, por ejemplo, al apagarlo.

Puede ver la lista de los nodos intermediarios efímeros así:

echo dump | nc localhost 2181 | grep brokers

La interfaz del cliente ZooKeeper expone una serie de comandos; dump muestra una lista de todas las sesiones y nodos efímeros para el clúster.

Tenga en cuenta, lo anterior asume:

  • Está ejecutando ZooKeeper en el puerto predeterminado ( 2181 ) en localhost , y ese localhost es el líder del clúster
  • Su configuración de zookeeper.connect Kafka no especifica un entorno chroot para su clúster Kafka, es decir, es solo host:port y no host:port/path

Quiero asegurarme de si el servidor kafka se está ejecutando o no antes de comenzar los trabajos de producción y consumo. Está en el entorno de Windows y aquí está el código de mi servidor kafka en eclipse ...

Properties kafka = new Properties(); kafka.setProperty("broker.id", "1"); kafka.setProperty("port", "9092"); kafka.setProperty("log.dirs", "D://workspace//"); kafka.setProperty("zookeeper.connect", "localhost:2181"); Option<String> option = Option.empty(); KafkaConfig config = new KafkaConfig(kafka); KafkaServer server = new KafkaServer(config, new CurrentTime(), option); server.startup();

En este caso if (server != null) no es suficiente porque siempre es verdadero. Entonces, ¿hay alguna manera de saber que mi servidor kafka está funcionando y listo para el productor? Es necesario que compruebe esto porque provoca la pérdida de algunos paquetes de datos de inicio.

Gracias.


La buena opción es usar AdminClient como se muestra a continuación antes de comenzar a producir o consumir los mensajes

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000; try (AdminClient client = AdminClient.create(properties)) { client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get(); } catch (ExecutionException ex) { LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS); return; }


La respuesta de Paul es muy buena y en realidad es cómo Kafka y Zk trabajan juntos desde el punto de vista de un agente.

Yo diría que otra opción fácil para verificar si un servidor Kafka se está ejecutando es crear un KafkaConsumer sencillo que apunte al cluste y pruebe alguna acción, por ejemplo, listTopics () . Si el servidor kafka no se está ejecutando, obtendrá una TimeoutException y luego podrá usar una sentencia try-catch .

def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = { val props = new Properties() props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString) props.put("group.id", kafkaParams.get("group.id").get.toString) props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val simpleConsumer = new KafkaConsumer[String, String](props) simpleConsumer.listTopics() }


Utilicé la api AdminClient .

Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("connections.max.idle.ms", 10000); properties.put("request.timeout.ms", 5000); try (AdminClient client = KafkaAdminClient.create(properties)) { ListTopicsResult topics = client.listTopics(); Set<String> names = topics.names().get(); if (names.isEmpty()) { // case: if no topic found. } return true; } catch (InterruptedException | ExecutionException e) { // Kafka is not available }