tutorial quickstart partitions kafka applications java messages apache-kafka

java - quickstart - kafka partitions



Java, Cómo obtener el número de mensajes en un tema en apache kafka (17)

A veces, el interés está en saber la cantidad de mensajes en cada partición, por ejemplo, al probar un particionador personalizado. Los pasos siguientes se han probado para que funcionen con Kafka 0.10.2.1-2 de Confluent 3.2. Dado un tema de Kafka, kt y la siguiente línea de comando:

$ kafka-run-class kafka.tools.GetOffsetShell / --broker-list host01:9092,host02:9092,host02:9092 --topic kt

Eso imprime la salida de muestra que muestra el recuento de mensajes en las tres particiones:

kt:2:6138 kt:1:6123 kt:0:6137

El número de líneas podría ser más o menos dependiendo del número de particiones para el tema.

Estoy usando apache kafka para mensajería. He implementado el productor y el consumidor en Java. ¿Cómo podemos obtener la cantidad de mensajes en un tema?


Comando Apache Kafka para obtener mensajes no manejados en todas las particiones de un tema:

kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk ''NR>1 {sum += $6} END {print sum}''

Huellas dactilares:

5

La columna 6 son los mensajes no manejados. Agréguelos así:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand / --group my-group / --bootstrap-server localhost:9092 / --describe

awk lee las filas, omite la línea del encabezado y suma la sexta columna y al final imprime la suma.

Huellas dactilares

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e


Dado que ConsumerOffsetChecker ya no es compatible, puede usar este comando para verificar todos los mensajes en el tema:

kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group

Donde LAG es el recuento de mensajes en la partición de tema:

También puedes intentar usar kafkacat . Este es un proyecto de código abierto que puede ayudarlo a leer mensajes de un tema y partición e imprimirlos en stdout. Aquí hay una muestra que lee los últimos 10 mensajes del tema sample-kafka-topic topic y luego sale:

Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none


De hecho, uso esto para comparar mi POC. El elemento que desea usar ConsumerOffsetChecker. Puede ejecutarlo usando el script bash como se muestra a continuación.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup

Y a continuación se muestra el resultado: Como puede ver en el cuadro rojo, 999 es el número de mensajes actualmente en el tema.

Actualización: ConsumerOffsetChecker está en desuso desde 0.10.0, es posible que desee comenzar a usar ConsumerGroupCommand.


Ejecute lo siguiente (suponiendo que kafka-console-consumer.sh esté en la ruta):

kafka-console-consumer.sh --from-beginning / --bootstrap-server yourbroker:9092 --property print.key=true / --property print.value=false --property print.partition / --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"


En las versiones más recientes de Kafka Manager, hay una columna titulada Desplazamientos recientes sumados .


La única forma de pensar en esto desde el punto de vista del consumidor es consumir los mensajes y contarlos en ese momento.

El agente de Kafka expone los contadores JMX para la cantidad de mensajes recibidos desde el inicio, pero no puede saber cuántos de ellos ya se han purgado.

En los escenarios más comunes, los mensajes en Kafka se ven mejor como una secuencia infinita y no es relevante obtener un valor discreto de cuántos se mantienen actualmente en el disco. Además, las cosas se vuelven más complicadas cuando se trata de un grupo de corredores que tienen un subconjunto de mensajes en un tema.


La forma más sencilla que he encontrado es utilizar la API /topic/topicName topicName de REST de /topic/topicName y especificar la clave: "Accept" / valor: encabezado "application/json" para obtener una respuesta JSON.

Esto está documentado aquí .


No es Java, pero puede ser útil.

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" ''{sum += $3} END {print sum}''


No lo he intentado yo mismo, pero parece tener sentido.

También puede usar kafka.tools.ConsumerOffsetChecker ( source ).


Para obtener todos los mensajes almacenados para el tema, puede buscar al consumidor al principio y al final de la secuencia para cada partición y sumar los resultados

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());


Podemos usar Java simple a continuación para obtener el recuento de mensajes sobre el tema

Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9091"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); List<PartitionInfo> parts = consumer.partitionsFor("topic"); List<TopicPartition> partitions= new ArrayList<>(); for (PartitionInfo p : parts) { partitions.add(new TopicPartition(topic, p.partition())); } consumer.assign(partitions); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment); Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment); long totalMessaheCnt=0; for (TopicPartition tp : offsets.keySet()) { totalMessaheCnt += endOffsets.get(tp)-beginningOffsets.get(tp) }


Puedes usar kafkatool . Por favor revise este enlace -> http://www.kafkatool.com/download.html

Kafka Tool es una aplicación GUI para administrar y usar clústeres de Apache Kafka. Proporciona una interfaz de usuario intuitiva que permite ver rápidamente objetos dentro de un clúster de Kafka, así como los mensajes almacenados en los temas del clúster.


Si tiene acceso a la interfaz JMX del servidor, las compensaciones de inicio y fin están presentes en:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(debe reemplazar TOPICNAME & PARTITIONNUMBER ). Tenga en cuenta que debe verificar cada una de las réplicas de una partición determinada, o debe averiguar cuál de los corredores es el líder de una partición determinada (y esto puede cambiar con el tiempo).

Alternativamente, puede usar los métodos de Kafka Consumer endOffsets y endOffsets .


Usando el cliente Java de Kafka 2.11-1.0.0, puede hacer lo siguiente:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d/n", partition.topic(), offsets.get(partition)); } } }

La salida es algo como esto:

offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13


Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

Un motor súper SQL, proporcionado por Facebook, que se conecta en varias fuentes de datos (Cassandra, Kafka, JMX, Redis ...).

PrestoDB se ejecuta como un servidor con trabajadores opcionales (hay un modo independiente sin trabajadores adicionales), luego utiliza un pequeño JAR ejecutable (llamado presto CLI) para realizar consultas.

Una vez que haya configurado bien el servidor Presto, puede usar SQL tradicional:

SELECT count(*) FROM TOPIC_NAME;


Extractos de los documentos de Kafka

Depreciaciones en 0.9.0.0

El kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) ha quedado en desuso. En el futuro, utilice kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) para esta funcionalidad.

Estoy ejecutando Kafka Broker con SSL habilitado tanto para el servidor como para el cliente. Debajo del comando que uso

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

donde / tmp / ssl_config es el siguiente

security.protocol=SSL ssl.truststore.location=truststore_file_path.jks ssl.truststore.password=truststore_password ssl.keystore.location=keystore_file_path.jks ssl.keystore.password=keystore_password ssl.key.password=key_password