apache kafka - ¿Cómo comprometerse manualmente con Kafka Stream?
apache-kafka apache-kafka-streams (1)
Los commits son manejados por Streams de forma interna y totalmente automática, y por lo tanto, generalmente no hay razón para comprometerse manualmente. Tenga en cuenta que Streams maneja esto de manera diferente a la confirmación automática del consumidor; de hecho, la confirmación automática está deshabilitada para el consumidor utilizado internamente y Streams administra las confirmaciones "manualmente". La razón es que los commits solo pueden ocurrir en ciertos puntos durante el procesamiento para garantizar que no se pierdan datos (existen muchas dependencias internas con respecto a la actualización del estado y los resultados de vaciado).
Para confirmaciones más frecuentes, puede reducir el intervalo de confirmación a través del parámetro
commit.interval.ms
.
Sin embargo, las confirmaciones manuales son posibles indirectamente, a través de la API del procesador de bajo nivel.
Puede usar el objeto de
context
que se proporciona mediante el método
init()
para llamar al
context#commit()
.
Tenga en cuenta que esto es solo una "solicitud a Streams" para confirmar lo antes posible, no está emitiendo un commit directamente.
¿Hay alguna manera de comprometerse manualmente con Kafka Stream?
Por lo general, con el uso de
KafkaConsumer
, hago algo como a continuación:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
// process records
}
consumer.commitAsync();
}
Donde llamo commit manualmente.
No veo una API similar para
KStream
.