apache kafka - Kafka cómo leer del tema__consumer_offsets
apache-kafka kafka-consumer-api (5)
Estoy tratando de averiguar qué compensaciones están trabajando mis consumidores actuales de alto nivel.
Uso Kafka 0.8.2.1,
sin
"offset.storage" establecido en el server.properties de Kafka, lo que, creo, significa que las compensaciones se almacenan en Kafka.
(También verifiqué que no se almacenan compensaciones en Zookeeper marcando esta ruta en el shell Zk:
/consumers/consumer_group_name/offsets/topic_name/partition_number
)
Traté de escuchar el tema
__consumer_offsets
para ver qué consumidor guarda qué valor de las compensaciones, pero no funcionó ...
Intenté lo siguiente:
creó un archivo de configuración para el consumidor de la consola de la siguiente manera:
=> more kafka_offset_consumer.config
exclude.internal.topics=false
e intenté dos versiones de los scripts de consumidor de la consola:
#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181
#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager/$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
Ninguno de los dos funcionó: simplemente se queda allí, pero no imprime nada, a pesar de que los consumidores están consumiendo / guardando compensaciones activamente.
¿Me estoy perdiendo alguna otra configuración / propiedades?
¡Gracias!
Centro de deportes acuáticos
A partir de Kafka 0.11, el código fuente (Scala) se puede encontrar aquí
Para aquellos que necesitan una traducción de Java, de cualquier proceso del consumidor, supongamos que obtiene un
ConsumerRecord<byte[], byte[]> consumerRecord
, y puede usar
-
Obtenga la clave (compruebe si la clave no es nula primero) y use
GroupMetadataManager.readMessageKey(consumerRecord.key)
. Eso puede devolver diferentes tipos, así que verifiqueif ( ... instanceof OffsetKey)
, luego lo emite y puede obtener varios valores de eso. -
Para obtener el valor de registro de Kafka de las compensaciones, puede usar
String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))
Un ejemplo mínimo de Java traducido del código Scala ...
ls /consumers/consumer_group_name/offsets/topic_name/partition_number
Tenga en cuenta que también es posible usar las API de AdminClient para describir grupos en lugar de consumir estos mensajes sin procesar
- listConsumerGroupOffsets() : para encontrar todas las compensaciones para un grupo específico
- describeConsumerGroups() : para encontrar detalles sobre los miembros de un grupo
Extracto de código fuente de Scala
get /consumers/consumer_group_name/offsets/topic_name/partition_number
185530404
cZxid = 0x70789ad05
ctime = Mon Nov 23 17:49:46 GMT 2015
mZxid = 0x7216cdc5c
mtime = Thu Dec 03 20:18:57 GMT 2015
pZxid = 0x70789ad05
cversion = 0
dataVersion = 3537384
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
Encontré esta pregunta cuando trataba de consumir también del tema __consumer_offsets . Logré resolverlo para diferentes versiones de Kafka y pensé en compartir lo que había encontrado
Para Kafka 0.8.2.x
Nota : Esto usa la conexión Zookeeper
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config /
--formatter "kafka.server.OffsetManager/$OffsetsMessageFormatter" /
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
Para Kafka 0.9.xx y 0.10.xx
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config /
--formatter "kafka.coordinator.GroupMetadataManager/$OffsetsMessageFormatter" /
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
Para 0.11.xx - 2.x
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config /
--formatter "kafka.coordinator.group.GroupMetadataManager/$OffsetsMessageFormatter" /
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
Ok, he descubierto cuál era el problema. Mi Kafka en realidad estaba usando Zookeeper como almacenamiento de compensación, no Kafka ... La razón por la que no lo detecté de inmediato fue porque estaba verificando incorrectamente el contenido de ZK:
estaba haciendo
byte[] key = consumerRecord.key;
if (key != null) {
Object o = GroupMetadataManager.readMessageKey(key);
if (o != null && o instanceOf OffsetKey) {
OffsetKey offsetKey = (OffsetKey) o;
Object groupTopicPartition = offsetKey.key;
byte[] value = consumerRecord.value;
String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
// TODO: Print, store, or compute results with the new key and value
}
}
y no viendo nada allí. En cambio, tenía que ''obtener'' contenido, que mostraba compensaciones correctas para mis consumidores, como el siguiente:
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is an offset record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>
val groupTopicPartition = offsetKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("/n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
Para Kafka-2.X usa el siguiente comando
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
Si agrega
--from-beginning
lo más probable es que le dé algunos resultados, al menos lo hizo cuando lo intenté yo mismo.
Y o si no proporciona ese argumento pero lee más mensajes (y activa las confirmaciones de compensación) mientras escucha al consumidor, eso también debería mostrar mensajes allí.