apache-kafka kafka-consumer-api

apache kafka - Kafka cómo leer del tema__consumer_offsets



apache-kafka kafka-consumer-api (5)

Estoy tratando de averiguar qué compensaciones están trabajando mis consumidores actuales de alto nivel. Uso Kafka 0.8.2.1, sin "offset.storage" establecido en el server.properties de Kafka, lo que, creo, significa que las compensaciones se almacenan en Kafka. (También verifiqué que no se almacenan compensaciones en Zookeeper marcando esta ruta en el shell Zk: /consumers/consumer_group_name/offsets/topic_name/partition_number )

Traté de escuchar el tema __consumer_offsets para ver qué consumidor guarda qué valor de las compensaciones, pero no funcionó ...

Intenté lo siguiente:

creó un archivo de configuración para el consumidor de la consola de la siguiente manera:

=> more kafka_offset_consumer.config exclude.internal.topics=false

e intenté dos versiones de los scripts de consumidor de la consola:

#1: bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181 #2 ./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager/$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config

Ninguno de los dos funcionó: simplemente se queda allí, pero no imprime nada, a pesar de que los consumidores están consumiendo / guardando compensaciones activamente.

¿Me estoy perdiendo alguna otra configuración / propiedades?

¡Gracias!

Centro de deportes acuáticos


A partir de Kafka 0.11, el código fuente (Scala) se puede encontrar aquí

Para aquellos que necesitan una traducción de Java, de cualquier proceso del consumidor, supongamos que obtiene un ConsumerRecord<byte[], byte[]> consumerRecord , y puede usar

  1. Obtenga la clave (compruebe si la clave no es nula primero) y use GroupMetadataManager.readMessageKey(consumerRecord.key) . Eso puede devolver diferentes tipos, así que verifique if ( ... instanceof OffsetKey) , luego lo emite y puede obtener varios valores de eso.

  2. Para obtener el valor de registro de Kafka de las compensaciones, puede usar String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))

Un ejemplo mínimo de Java traducido del código Scala ...

ls /consumers/consumer_group_name/offsets/topic_name/partition_number

Tenga en cuenta que también es posible usar las API de AdminClient para describir grupos en lugar de consumir estos mensajes sin procesar

Extracto de código fuente de Scala

get /consumers/consumer_group_name/offsets/topic_name/partition_number 185530404 cZxid = 0x70789ad05 ctime = Mon Nov 23 17:49:46 GMT 2015 mZxid = 0x7216cdc5c mtime = Thu Dec 03 20:18:57 GMT 2015 pZxid = 0x70789ad05 cversion = 0 dataVersion = 3537384 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 9 numChildren = 0


Encontré esta pregunta cuando trataba de consumir también del tema __consumer_offsets . Logré resolverlo para diferentes versiones de Kafka y pensé en compartir lo que había encontrado

Para Kafka 0.8.2.x

Nota : Esto usa la conexión Zookeeper

#Create consumer config echo "exclude.internal.topics=false" > /tmp/consumer.config #Consume all offsets ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config / --formatter "kafka.server.OffsetManager/$OffsetsMessageFormatter" / --zookeeper localhost:2181 --topic __consumer_offsets --from-beginning

Para Kafka 0.9.xx y 0.10.xx

#Create consumer config echo "exclude.internal.topics=false" > /tmp/consumer.config #Consume all offsets ./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config / --formatter "kafka.coordinator.GroupMetadataManager/$OffsetsMessageFormatter" / --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning

Para 0.11.xx - 2.x

#Create consumer config echo "exclude.internal.topics=false" > /tmp/consumer.config #Consume all offsets ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config / --formatter "kafka.coordinator.group.GroupMetadataManager/$OffsetsMessageFormatter" / --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning


Ok, he descubierto cuál era el problema. Mi Kafka en realidad estaba usando Zookeeper como almacenamiento de compensación, no Kafka ... La razón por la que no lo detecté de inmediato fue porque estaba verificando incorrectamente el contenido de ZK:

estaba haciendo

byte[] key = consumerRecord.key; if (key != null) { Object o = GroupMetadataManager.readMessageKey(key); if (o != null && o instanceOf OffsetKey) { OffsetKey offsetKey = (OffsetKey) o; Object groupTopicPartition = offsetKey.key; byte[] value = consumerRecord.value; String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value); // TODO: Print, store, or compute results with the new key and value } }

y no viendo nada allí. En cambio, tenía que ''obtener'' contenido, que mostraba compensaciones correctas para mis consumidores, como el siguiente:

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach { // Only print if the message is an offset record. // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp. case offsetKey: OffsetKey => val groupTopicPartition = offsetKey.key val value = consumerRecord.value val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8)) output.write("::".getBytes(StandardCharsets.UTF_8)) output.write(formattedValue.getBytes(StandardCharsets.UTF_8)) output.write("/n".getBytes(StandardCharsets.UTF_8)) case _ => // no-op }


Para Kafka-2.X usa el siguiente comando

kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"


Si agrega --from-beginning lo más probable es que le dé algunos resultados, al menos lo hizo cuando lo intenté yo mismo. Y o si no proporciona ese argumento pero lee más mensajes (y activa las confirmaciones de compensación) mientras escucha al consumidor, eso también debería mostrar mensajes allí.