poll partitions kafka interval cluster apache-kafka kafka-consumer-api

apache kafka - partitions - ¿Cómo leer datos utilizando la API de Kafka Consumer desde el principio?



max poll interval ms (6)

Por favor, ¿alguien puede decirme cómo leer los mensajes usando la API de Kafka Consumer desde el principio cada vez que ejecuto el archivo de información del consumidor?



Si está utilizando la api del consumidor de Java más específicamente org.apache.kafka.clients.consumer.Consumer, puede probar los métodos de buscar *.

consumer.seekToBeginning(consumer.assignment())

Aquí, consumer.assignment () devuelve todas las particiones asignadas a un consumidor determinado y seekToBeginning comenzará desde el primer desplazamiento para la colección de particiones dada.


Una opción para hacer esto sería tener una identificación de grupo única cada vez que comiences, lo que significará que Kafka te enviará los mensajes en el tema desde el principio. Haga algo como esto cuando configure sus propiedades para KafkaConsumer :

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

La otra opción es usar consumer.seekToBeginning(consumer.assignment()) pero esto no funcionará a menos que Kafka primero reciba un latido de su consumidor al hacer que el consumidor llame al método de encuesta. Entonces llame a poll() , luego haga un seekToBeginning() y luego vuelva a llamar a poll() si desea todos los registros desde el principio. Es un poco hackey pero esta parece ser la forma más confiable de hacerlo desde la versión 0.9.

// At this point, there is no heartbeat from consumer so seekToBeinning() wont work // So call poll() consumer.poll(0); // Now there is heartbeat and consumer is "alive" consumer.seekToBeginning(consumer.assignment()); // Now consume ConsumerRecords<String, String> records = consumer.poll(0);


Una posible solución es utilizar una implementación de ConsumerRebalanceListener mientras se suscribe a uno o más temas. ConsumerRebalanceListener contiene métodos de devolución de llamada cuando se asignan o eliminan nuevas particiones de un consumidor. El siguiente ejemplo de código ilustra esto:

public class SkillsConsumer { private String topic; private KafkaConsumer<String, String> consumer; private static final int POLL_TIMEOUT = 5000; public SkillsConsumer(String topic) { this.topic = topic; Properties properties = ConsumerUtil.getConsumerProperties(); properties.put("group.id", "consumer-skills"); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(Collections.singletonList(this.topic), new PartitionOffsetAssignerListener(this.consumer)); }

}

public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener { private KafkaConsumer consumer; public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) { this.consumer = kafkaConsumer; } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { //reading all partitions from the beginning for(TopicPartition partition : partitions) consumer.seekToBeginning(partition); }

}

Ahora, cuando las particiones se asignan al consumidor, cada partición se leerá desde el principio.


mientras usa el conjunto de consumidores de alto nivel props.put("auto.offset.reset", "smallest"); en tiempos de crear el ConsumerConfig


Esto funciona con el consumidor 0.9.x. Básicamente, cuando crea un consumidor, debe asignar un ID de grupo de consumidores a este consumidor mediante la propiedad ConsumerConfig.GROUP_ID_CONFIG . Genere el ID del grupo de consumidores aleatoriamente cada vez que inicie el consumidor haciendo algo como esto properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); (Las propiedades son una instancia de java.util.Properties que pasará al constructor new KafkaConsumer(properties) ).

Generar el cliente de forma aleatoria significa que el nuevo grupo de consumidores no tiene ningún desplazamiento asociado a él en kafka. Entonces, lo que tenemos que hacer después de esto es establecer una política para este escenario. Como dice la documentación de la propiedad auto.offset.reset :

Qué hacer cuando no hay un desplazamiento inicial en Kafka o si el desplazamiento actual ya no existe en el servidor (por ejemplo, porque esos datos se han eliminado):

  • primera: restablece automáticamente el desplazamiento a la compensación más antigua
  • último: restablece automáticamente el desplazamiento al último desplazamiento
  • ninguno: lanza una excepción al consumidor si no se encuentra un desplazamiento anterior o el grupo del consumidor
  • Cualquier otra cosa: tirar excepción al consumidor.

Por lo tanto, a partir de las opciones mencionadas anteriormente, debemos elegir la política earliest para que el nuevo grupo de consumidores comience desde el principio cada vez.

Tu código en java, se verá algo así:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer(properties);

Lo único que necesita resolverlo ahora, es cuando tiene varios consumidores que pertenecen al mismo grupo de consumidores pero se distribuyen cómo generar un ID aleatorio y distribuirlo entre esas instancias para que todos pertenezcan al mismo grupo de consumidores.

¡Espero eso ayude!