aprender - ¿Cómo caduca una compensación para un grupo de consumidores de Apache Kafka?
kafka server (1)
Estaba haciendo algunas pruebas sobre un tema antiguo cuando noté algunos comportamientos extraños. Al leer el registro de Kafka, noté el mensaje "se eliminaron 8 compensaciones caducadas":
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
De hecho, tengo 2 preguntas:
¿Cómo funciona esta compensación de vencimiento para un grupo de consumidores?
¿Puede este desplazamiento caducado explicar este comportamiento en el que mi consumidor no realizaría ninguna encuesta cuando tenía
auto.offset.reset = latest
, pero realizó una encuesta desde el último desplazamiento confirmado cuando teníaauto.offset.reset = earliest
?
Kafka, de forma predeterminada, elimina las compensaciones confirmadas después de un período de tiempo configurable. Consulte el parámetro offsets.retention.minutes
. Es decir, si un grupo de consumidores está inactivo (es decir, no realiza ninguna compensación) durante este tiempo, las compensaciones se eliminan. Por lo tanto, incluso si el consumidor se está ejecutando, si no ejecuta las compensaciones para algunas particiones, esas compensaciones están sujetas a offset.retention.minutes
.
Si inicia un consumidor, sucede lo siguiente:
- buscar una compensación (válida) comprometida (para el grupo de consumidores)
- Si se encuentra un desplazamiento válido, reanude desde allí
- si no se encuentra un desplazamiento válido, reinicie el desplazamiento de acuerdo con el parámetro
auto.offset.reset
Por lo tanto, si sus compensaciones se eliminaron y auto.offset.reset = latest
, su consumidor no encuestará nada hasta que se agreguen nuevos datos al tema. Si auto.offset.reset = earliest
debe consumir todo el tema.
Vea este JIRA para una discusión sobre este https://issues.apache.org/jira/browse/KAFKA-3806 y https://issues.apache.org/jira/browse/KAFKA-4682