tutorial streams kafka example apache-kafka apache-kafka-streams

apache-kafka - example - kafka streams tutorial



¿Cómo enviar el resultado final de la agregación de kafka-streams de una KTable con ventana de tiempo? (2)

Desde Kafka Streams versión 2.1, puede lograr esto using suppress .

Hay un ejemplo de la documentación mencionada de Apache Kafka Streams que envía una alerta cuando un usuario tiene menos de tres eventos en una hora:

KGroupedStream<UserId, Event> grouped = ...; grouped .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10))) .count() .suppress(Suppressed.untilWindowCloses(unbounded())) .filter((windowedUserId, count) -> count < 3) .toStream() .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

Como se menciona en la actualización de this respuesta, debe tener en cuenta la compensación. Además, note que suprimir () se basa en tiempo de evento.

Lo que me gustaría hacer es esto:

  1. Consumir registros de un tema de números (Long)
  2. Agregue (cuente) los valores para cada ventana de 5 segundos
  3. Enviar el resultado de agregación FINAL a otro tema

Mi código se ve así:

KStream<String, Long> longs = builder.stream( Serdes.String(), Serdes.Long(), "longs"); // In one ktable, count by key, on a five second tumbling window. KTable<Windowed<String>, Long> longCounts = longs.countByKey(TimeWindows.of("longCounts", 5000L)); // Finally, sink to the long-avgs topic. longCounts.toStream((wk, v) -> wk.key()) .to("long-counts");

Parece que todo funciona como se esperaba, pero las agregaciones se envían al tema de destino para cada registro entrante. Mi pregunta es ¿cómo puedo enviar solo el resultado final de agregación de cada ventana?


En Kafka Streams no existe una "agregación final". Las ventanas se mantienen abiertas todo el tiempo para manejar registros fuera de servicio que llegan después de que finaliza el tiempo de finalización de la ventana. Sin embargo, las ventanas no se guardan para siempre. Se descartan una vez que expira su tiempo de retención. No hay una acción especial en cuanto a cuando una ventana se descarta.

Consulte la documentación de Confluent para obtener más detalles: http://docs.confluent.io/current/streams/

Por lo tanto, para cada actualización de una agregación, se genera un registro de resultados (porque Kafka Streams también actualiza el resultado de la agregación en registros fuera de servicio). Su "resultado final" sería el último registro de resultados (antes de que se descarte una ventana). Dependiendo de su caso de uso, la desduplicación manual sería una forma de resolver el problema (utilizando API de palanca inferior, transform() o process() )

Esta publicación de blog también podría ayudar: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

Otra publicación de blog que aborda este problema sin usar signos de puntuación: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Actualizar

Con KIP-328 , se KTable#suppress() un KTable#suppress() , que permitirá suprimir actualizaciones consecutivas de manera estricta y emitir un único registro de resultados por ventana; La compensación es un aumento de la latencia.