apache kafka streams - quickstart - Kafka Streams API: KStream to KTable
kafka tutorial (1)
Tengo un tema de Kafka en el que envío eventos de ubicación (clave = id_usuario, valor = ubicación_usuario). Soy capaz de leerlo y procesarlo como KStream
:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
Eso funciona bien, pero me gustaría tener una KTable
con la última posición conocida de cada usuario. ¿Cómo podría hacerlo?
Puedo hacerlo escribiendo y leyendo un tema intermedio:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
¿Hay una forma sencilla de obtener una KTable
de un KStream
? Esta es mi primera aplicación que usa Kafka Streams, así que probablemente me esté perdiendo algo obvio.
No hay una manera directa en este momento para hacer esto. Su enfoque es absolutamente válido como se analiza en las preguntas frecuentes de Confluent: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
Este es el enfoque más simple con respecto al código. Sin embargo, tiene las desventajas de que (a) necesita gestionar un tema adicional y que (b) da como resultado un tráfico de red adicional porque los datos se escriben y se leen desde Kafka.
Hay una alternativa, usando un "dummy-reduce":
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream
KTable<String, Long> table = stream.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
},
"dummy-aggregation-store");
Este enfoque es algo más complejo en relación con el código en comparación con la opción 1, pero tiene la ventaja de que (a) no se requiere la administración manual de temas y (b) no es necesario volver a leer los datos de Kafka.
En general, debes decidir por ti mismo, qué enfoque te gusta más:
En la opción 2, Kafka Streams creará un tema de registro de cambios interno para hacer una copia de seguridad de KTable para la tolerancia a fallas. Por lo tanto, ambos enfoques requieren algún almacenamiento adicional en Kafka y dan como resultado tráfico de red adicional. En general, es una compensación entre el código un poco más complejo en la opción 2 frente a la administración manual de temas en la opción 1.