java - quickstart - kafka partitions
Java, Cómo obtener el número de mensajes en un tema en apache kafka (17)
A veces, el interés está en saber la cantidad de mensajes en cada partición, por ejemplo, al probar un particionador personalizado. Los pasos siguientes se han probado para que funcionen con Kafka 0.10.2.1-2 de Confluent 3.2.
Dado un tema de Kafka,
kt
y la siguiente línea de comando:
$ kafka-run-class kafka.tools.GetOffsetShell /
--broker-list host01:9092,host02:9092,host02:9092 --topic kt
Eso imprime la salida de muestra que muestra el recuento de mensajes en las tres particiones:
kt:2:6138
kt:1:6123
kt:0:6137
El número de líneas podría ser más o menos dependiendo del número de particiones para el tema.
Estoy usando apache kafka para mensajería. He implementado el productor y el consumidor en Java. ¿Cómo podemos obtener la cantidad de mensajes en un tema?
Comando Apache Kafka para obtener mensajes no manejados en todas las particiones de un tema:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk ''NR>1 {sum += $6}
END {print sum}''
Huellas dactilares:
5
La columna 6 son los mensajes no manejados. Agréguelos así:
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand /
--group my-group /
--bootstrap-server localhost:9092 /
--describe
awk lee las filas, omite la línea del encabezado y suma la sexta columna y al final imprime la suma.
Huellas dactilares
kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
Dado que
ConsumerOffsetChecker
ya no es compatible, puede usar este comando para verificar todos los mensajes en el tema:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group
Donde
LAG
es el recuento de mensajes en la partición de tema:
También puedes intentar usar
kafkacat
.
Este es un proyecto de código abierto que puede ayudarlo a leer mensajes de un tema y partición e imprimirlos en stdout.
Aquí hay una muestra que lee los últimos 10 mensajes del tema
sample-kafka-topic
topic y luego sale:
Group Topic Pid Offset logSize Lag Owner
test_group test 0 11051 11053 2 none
test_group test 1 10810 10812 2 none
test_group test 2 11027 11028 1 none
De hecho, uso esto para comparar mi POC. El elemento que desea usar ConsumerOffsetChecker. Puede ejecutarlo usando el script bash como se muestra a continuación.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
Y a continuación se muestra el resultado: Como puede ver en el cuadro rojo, 999 es el número de mensajes actualmente en el tema.
Actualización: ConsumerOffsetChecker está en desuso desde 0.10.0, es posible que desee comenzar a usar ConsumerGroupCommand.
Ejecute lo siguiente (suponiendo que
kafka-console-consumer.sh
esté en la ruta):
kafka-console-consumer.sh --from-beginning /
--bootstrap-server yourbroker:9092 --property print.key=true /
--property print.value=false --property print.partition /
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
La única forma de pensar en esto desde el punto de vista del consumidor es consumir los mensajes y contarlos en ese momento.
El agente de Kafka expone los contadores JMX para la cantidad de mensajes recibidos desde el inicio, pero no puede saber cuántos de ellos ya se han purgado.
En los escenarios más comunes, los mensajes en Kafka se ven mejor como una secuencia infinita y no es relevante obtener un valor discreto de cuántos se mantienen actualmente en el disco. Además, las cosas se vuelven más complicadas cuando se trata de un grupo de corredores que tienen un subconjunto de mensajes en un tema.
La forma más sencilla que he encontrado es utilizar la API
/topic/topicName
topicName de REST de
/topic/topicName
y especificar la clave:
"Accept"
/ valor: encabezado
"application/json"
para obtener una respuesta JSON.
No es Java, pero puede ser útil.
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
| awk -F ":" ''{sum += $3} END {print sum}''
No lo he intentado yo mismo, pero parece tener sentido.
También puede usar
kafka.tools.ConsumerOffsetChecker
(
source
).
Para obtener todos los mensajes almacenados para el tema, puede buscar al consumidor al principio y al final de la secuencia para cada partición y sumar los resultados
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
Podemos usar Java simple a continuación para obtener el recuento de mensajes sobre el tema
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9091"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); List<PartitionInfo> parts = consumer.partitionsFor("topic"); List<TopicPartition> partitions= new ArrayList<>(); for (PartitionInfo p : parts) { partitions.add(new TopicPartition(topic, p.partition())); } consumer.assign(partitions); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment); Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment); long totalMessaheCnt=0; for (TopicPartition tp : offsets.keySet()) { totalMessaheCnt += endOffsets.get(tp)-beginningOffsets.get(tp) }
Puedes usar kafkatool . Por favor revise este enlace -> http://www.kafkatool.com/download.html
Kafka Tool es una aplicación GUI para administrar y usar clústeres de Apache Kafka. Proporciona una interfaz de usuario intuitiva que permite ver rápidamente objetos dentro de un clúster de Kafka, así como los mensajes almacenados en los temas del clúster.
Si tiene acceso a la interfaz JMX del servidor, las compensaciones de inicio y fin están presentes en:
kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
(debe reemplazar
TOPICNAME
&
PARTITIONNUMBER
).
Tenga en cuenta que debe verificar cada una de las réplicas de una partición determinada, o debe averiguar cuál de los corredores es el líder de una partición
determinada
(y esto puede cambiar con el tiempo).
Alternativamente, puede usar los métodos de
Kafka Consumer
endOffsets
y
endOffsets
.
Usando el cliente Java de Kafka 2.11-1.0.0, puede hacer lo siguiente:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d/n", partition.topic(), offsets.get(partition));
}
}
}
La salida es algo como esto:
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
Use https://prestodb.io/docs/current/connector/kafka-tutorial.html
Un motor súper SQL, proporcionado por Facebook, que se conecta en varias fuentes de datos (Cassandra, Kafka, JMX, Redis ...).
PrestoDB se ejecuta como un servidor con trabajadores opcionales (hay un modo independiente sin trabajadores adicionales), luego utiliza un pequeño JAR ejecutable (llamado presto CLI) para realizar consultas.
Una vez que haya configurado bien el servidor Presto, puede usar SQL tradicional:
SELECT count(*) FROM TOPIC_NAME;
Extractos de los documentos de Kafka
Depreciaciones en 0.9.0.0
El kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) ha quedado en desuso. En el futuro, utilice kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) para esta funcionalidad.
Estoy ejecutando Kafka Broker con SSL habilitado tanto para el servidor como para el cliente. Debajo del comando que uso
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
donde / tmp / ssl_config es el siguiente
security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password