apache zookeeper - Kafka: Consigue un agente de bolsa de ZooKeeper
apache-zookeeper apache-kafka (5)
Esa es la forma en que lo hizo uno de mis colegas para obtener una lista de corredores Kafka. Creo que es una forma correcta cuando desea obtener una lista de intermediarios de forma dinámica.
Aquí hay un código de ejemplo que muestra cómo obtener la lista.
public class KafkaBrokerInfoFetcher {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
}
Ejecutar el código en el cluster que consta de tres resultados de corredores en
1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
Por razones particulares, necesito usar tanto ConsumerGroup
(también conocido como ConsumerGroup
alto nivel) como SimpleConsumer
(también conocido como consumidor de bajo nivel) para leer desde Kafka. Para ConsumerGroup
, utilizo la configuración basada en ZooKeeper y estoy completamente satisfecho con ella, pero SimpleConsumer
requiere que los agentes de semillas sean instanciados.
No quiero mantener la lista de ambos: ZooKeeper y los anfitriones del corredor. Por lo tanto, estoy buscando una manera de descubrir automáticamente agentes de un tema en particular de ZooKeeper .
Debido a cierta información indirecta, creo que estos datos se almacenan en ZooKeeper en una de las siguientes rutas:
-
/brokers/topics/<topic>/partitions/<partition-id>/state
- / brokers / ids /
Sin embargo, cuando intento leer los datos de estos nodos, recibo un error de serialización (estoy usando com.101tec.zkclient
para esto):
org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: encabezado de flujo no válido: 7B226A6D en la publicación org.I0Itec.paricacion de la embarcación de la embarcación. (ZkClient.java:740) en org.I0Itec.zkclient.ZkClient.readData (ZkClient.java:773) en org.I0Itec.zkclient.ZkClientkreadkread. readData (ZkClient.java:750) en org.I0Itec.zkclient.ZkClient.readData (ZkClient.java:744) ... 64 elided Cause por: java.io.StreamCorruptedException: encabezado de flujo no válido: 7B226A6D en java.io.ObjectInputput .readStreamHeader (ObjectInputStream.java:804) en java.io.ObjectInputStream. (ObjectInputStream.java:299) en org.I0Itec.zkclient.serializeSt .cc. SerializableSerializer.deserialize (SerializableSerializer.java:31) ... 69 más
Puedo escribir y leer objetos Java personalizados (por ejemplo, cadenas) sin ningún problema, por lo que creo que no es un problema de un cliente, sino una codificación complicada. Por eso, quiero saber:
- Si este es el camino correcto, ¿cómo leer estos nodos correctamente ?
- Si todo el enfoque es incorrecto, ¿cuál es el correcto ?
Para hacer esto usando el shell:
zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
=> [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0
Resulta que Kafka usa ZKStringSerializer
para leer y escribir datos en znodes. Entonces, para corregir el error solo tuve que agregarlo como último parámetro en el constructor ZkClient
:
val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)
Usándolo, escribí varias funciones útiles para descubrir los ID de los corredores, sus direcciones y otras cosas:
import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException
def listBrokers(): List[Int] = {
zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}
def listTopics(): List[String] = {
zkClient.getChildren("/brokers/topics").toList
}
def listPartitions(topic: String): List[Int] = {
val path = "/brokers/topics/" + topic + "/partitions"
if (zkClient.exists(path)) {
zkClient.getChildren(path).toList.map(_.toInt)
} else {
throw new KafkaException(s"Topic ${topic} doesn''t exist")
}
}
def getBrokerAddress(brokerId: Int): (String, Int) = {
val path = s"/brokers/ids/${brokerId}"
if (zkClient.exists(path)) {
val brokerInfo = readZkData(path)
(brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
} else {
throw new KafkaException("Broker with ID ${brokerId} doesn''t exist")
}
}
def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
if (zkClient.exists(path)) {
val leaderStr = zkClient.readData[String](path)
val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
getBrokerAddress(leaderId)
} else {
throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn''t exist")
}
}
en realidad, hay ZkUtils
desde Kafka (al menos para la línea 0.8.x), que puede usar con una pequeña advertencia: tendrá que volver a implementar ZkStringSerializer que convertiría las cadenas como matrices de bytes codificados en UTF-8. Si desea utilizar las API de transmisión de Java8, puede iterar sobre las colecciones de Scala a través de scala.collection.JavaConversions
. Esto es lo que ayudó a mi caso.
public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
KeeperException, InterruptedException {
this.zookeeperAddress = zookeeperAddress;
this.topic = topic;
ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
List<String> brokerList = new ArrayList<String>();
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null) {
brokerList.add(broker.connectionString());
}
}
props.put("serializer.class", KAFKA_STRING_ENCODER);
props.put("metadata.broker.list", String.join(",", brokerList));
producer = new Producer<String, String>(new ProducerConfig(props));
}