name how java apache-kafka kafka-consumer-api

java - how - set name to jframe



¿Cómo puedo obtener el último desplazamiento de un tema kafka? (4)

Estoy escribiendo un consumidor kafka utilizando Java. Quiero mantener el tiempo real del mensaje, por lo que si hay demasiados mensajes en espera de consumir, como 1000 o más, debería abandonar los mensajes no consumidos y comenzar a consumir desde el último desplazamiento.

Para este problema, trato de comparar el último desplazamiento confirmado y el último desplazamiento de un tema (solo 1 partición), si la diferencia entre estos dos desplazamientos es mayor que una cierta cantidad, estableceré el último desplazamiento del tema como siguiente compensar para que pueda abandonar esos mensajes redundantes.

Ahora mi problema es cómo obtener la última compensación de un tema, algunas personas dicen que puedo usar el antiguo consumidor, pero es demasiado complicado, ¿el nuevo consumidor tiene esta función?


El nuevo consumidor también es complicado.

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();


Para la versión Kafka: 0.10.1.1

// Get the diff of current position and latest offset Set<TopicPartition> partitions = new HashSet<TopicPartition>(); TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); partitions.add(actualTopicPartition); Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); long actualPosition = consumer.position(actualTopicPartition); System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition));


También puede utilizar las herramientas de línea de comandos del servidor kafka:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name


KafkaConsumer<String, String> consumer = ... consumer.subscribe(Collections.singletonList(topic)); TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.poll(0); consumer.seekToEnd(Collections.singletonList(topicPartition)); long currentOffset = consumer.position(topicPartition) -1;

El fragmento anterior devuelve el desplazamiento actual del mensaje confirmado para el tema y el número de partición dados.