apache-kafka apache-kafka-streams

apache kafka - ¿Cómo comprometerse manualmente con Kafka Stream?



apache-kafka apache-kafka-streams (1)

Los commits son manejados por Streams de forma interna y totalmente automática, y por lo tanto, generalmente no hay razón para comprometerse manualmente. Tenga en cuenta que Streams maneja esto de manera diferente a la confirmación automática del consumidor; de hecho, la confirmación automática está deshabilitada para el consumidor utilizado internamente y Streams administra las confirmaciones "manualmente". La razón es que los commits solo pueden ocurrir en ciertos puntos durante el procesamiento para garantizar que no se pierdan datos (existen muchas dependencias internas con respecto a la actualización del estado y los resultados de vaciado).

Para confirmaciones más frecuentes, puede reducir el intervalo de confirmación a través del parámetro commit.interval.ms .

Sin embargo, las confirmaciones manuales son posibles indirectamente, a través de la API del procesador de bajo nivel. Puede usar el objeto de context que se proporciona mediante el método init() para llamar al context#commit() . Tenga en cuenta que esto es solo una "solicitud a Streams" para confirmar lo antes posible, no está emitiendo un commit directamente.

¿Hay alguna manera de comprometerse manualmente con Kafka Stream?

Por lo general, con el uso de KafkaConsumer , hago algo como a continuación:

while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ // process records } consumer.commitAsync(); }

Donde llamo commit manualmente. No veo una API similar para KStream .