error-handling apache-kafka apache-kafka-streams

error handling - Manejando mensajes malos usando la API de Streams de Kafka



error-handling apache-kafka (2)

Tengo un flujo de procesamiento de flujo básico que parece

master topic -> my processing in a mapper/filter -> output topics

y me pregunto cuál es la mejor manera de manejar los "mensajes erróneos". Esto podría ser cosas como mensajes que no puedo deserializar adecuadamente, o tal vez la lógica de procesamiento / filtrado falla de alguna manera inesperada (no tengo dependencias externas, por lo que no debería haber errores transitorios de ese tipo).

Estaba considerando envolver todo mi código de procesamiento / filtrado en una captura de prueba y, si se generaba una excepción, se enrutaba a un "tema de error". Luego puedo estudiar el mensaje y modificarlo o corregir mi código según corresponda y luego reproducirlo en master. Si dejo que las excepciones se propaguen, la secuencia parece estar atascada y no se recogen más mensajes.

  • ¿Este enfoque se considera la mejor práctica?
  • ¿Hay alguna forma conveniente de Kafka para manejar esto? No creo que exista un concepto de DLQ ...
  • ¿Cuáles son las formas alternativas de detener el atasco de Kafka en un "mal mensaje"?
  • ¿Qué métodos alternativos de manejo de errores existen?

Para completar aquí está mi código (pseudo-ish):

class Document { // Fields } class AnalysedDocument { Document document; String rawValue; Exception exception; Analysis analysis; // All being well AnalysedDocument(Document document, Analysis analysis) {...} // Analysis failed AnalysedDocument(Document document, Exception exception) {...} // Deserialisation failed AnalysedDocument(String rawValue, Exception exception) {...} } KStreamBuilder builder = new KStreamBuilder(); KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder .stream(Serdes.String(), Serdes.String(), "master") .mapValues(new ValueMapper<String, AnalysedDocument>() { @Override public AnalysedDocument apply(String rawValue) { Document document; try { // Deserialise document = ... } catch (Exception e) { return new AnalysedDocument(rawValue, exception); } try { // Perform analysis Analysis analysis = ... return new AnalysedDocument(document, analysis); } catch (Exception e) { return new AnalysedDocument(document, exception); } } }); // Branch based on whether analysis mapping failed to produce errorStream and successStream errorStream.to(Serdes.String(), customPojoSerde(), "error"); successStream.to(Serdes.String(), customPojoSerde(), "analysed"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();

Cualquier ayuda muy apreciada.


En este momento, Kafka Streams ofrece solo capacidades limitadas de manejo de errores. Hay trabajo en progreso para simplificar esto. Por ahora, su enfoque general parece ser un buen camino a seguir.

Un comentario sobre el manejo de los errores de serialización / serialización: el manejo de esos errores manualmente, requiere que usted realice la serialización / serialización "manualmente". Esto significa que debe configurar ByteArraySerde s para la clave y el valor para el tema de entrada / salida de su aplicación Streams y agregar un map() que KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType> la de / serialización (es decir, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType> - o al revés si también desea detectar excepciones de serialización). De lo contrario, no puede try-catch excepciones de deserialización.

Con su enfoque actual, usted "solo" valida que la cadena dada representa un documento válido, pero podría ser el caso, que el mensaje en sí mismo está dañado y no se puede convertir en una String en el operador de origen en primer lugar. Por lo tanto, no cubre realmente la excepción de deserialización con su código. Sin embargo, si está seguro de que una excepción de deserialización nunca puede ocurrir, su enfoque también sería suficiente.

Actualizar

Estos problemas se abordan a través de KIP-161 y se incluirán en la próxima versión 1.0.0. Le permite registrar una devolución de llamada a través del parámetro default.deserialization.exception.handler . El controlador se invocará cada vez que se produzca una excepción durante la deserialización y le permita devolver una DeserializationResponse ( CONTINUE -> soltar el registro en un movimiento, o FAIL es el valor predeterminado).

Actualización 2

Con KIP-210 (será parte de Kafka 1.1) también es posible manejar errores en el lado del productor, similar a la parte del consumidor, registrando un ProductionExceptionHandler través de config default.production.exception.handler que puede devolver CONTINUE .


Actualización del 23 de marzo de 2018: Kafka 1.0 proporciona un manejo mucho mejor y más fácil para los mensajes de error erróneos ("pastillas venenosas") a través de KIP-161 que lo que describí a continuación. Consulte default.deserialization.exception.handler en los documentos de Kafka 1.0.

Esto podría potencialmente ser cosas como mensajes que no puedo deserializar adecuadamente [...]

Ok, mi respuesta aquí se centra en los (des) problemas de serialización, ya que este podría ser el escenario más difícil de manejar para la mayoría de los usuarios.

[...] o quizás la lógica de procesamiento / filtrado falla de alguna manera inesperada (no tengo dependencias externas, por lo que no debería haber errores transitorios de ese tipo).

El mismo pensamiento (para la deserialización) también se puede aplicar a las fallas en la lógica de procesamiento. Aquí, la mayoría de las personas tienden a gravitar hacia la opción 2 a continuación (menos la parte de deserialización), pero YMMV.

Estaba considerando envolver todo mi código de procesamiento / filtrado en una captura de prueba y, si se generaba una excepción, se enrutaba a un "tema de error". Luego puedo estudiar el mensaje y modificarlo o corregir mi código según corresponda y luego reproducirlo en master. Si dejo que las excepciones se propaguen, la secuencia parece estar atascada y no se recogen más mensajes.

  • ¿Este enfoque se considera la mejor práctica?

Sí, en este momento este es el camino a seguir. Esencialmente, los dos patrones más comunes son (1) omitir mensajes dañados o (2) enviar registros dañados a un tema de cuarentena, también conocido como una cola de mensajes no entregados.

  • ¿Hay alguna forma conveniente de Kafka para manejar esto? No creo que exista un concepto de DLQ ...

Sí, hay una manera de manejar esto, incluido el uso de una cola de mensajes no entregados. Sin embargo, es (al menos IMHO) no tan conveniente todavía. Si tiene algún comentario sobre cómo la API debería permitirle manejar esto, por ejemplo, a través de un método nuevo o actualizado, una configuración ("si la serialización / deserialización falla, envíe el registro problemático a ESTE tema de cuarentena"). saber. :-)

  • ¿Cuáles son las formas alternativas de detener el atasco de Kafka en un "mal mensaje"?
  • ¿Qué métodos alternativos de manejo de errores existen?

Vea mis ejemplos a continuación.

FWIW, la comunidad Kafka también está discutiendo la adición de una nueva herramienta CLI que le permite omitir los mensajes dañados. Sin embargo, como usuario de la API de Kafka Streams, creo que lo ideal es que usted maneje estos escenarios directamente en su código, y recurra a las utilidades CLI solo como último recurso.

Aquí hay algunos patrones para que el DSL de Kafka Streams maneje registros / mensajes corruptos, también conocidos como "píldoras venenosas". Esto se toma de http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

Opción 1: Omitir registros corruptos con flatMap

Esto es posiblemente lo que la mayoría de los usuarios les gustaría hacer.

  • Utilizamos flatMap porque le permite generar cero, uno o más registros de salida por registro de entrada. En el caso de un registro dañado, no generamos nada (cero registros), ignorando / omitiendo el registro dañado.
  • Benefíciese de este enfoque en comparación con los otros enumerados aquí: ¡Necesitamos deserializar manualmente un registro solo una vez!
  • El inconveniente de este enfoque: flatMap "marca" el flujo de entrada para la posible partición de datos, es decir, si realiza una operación basada en claves, como agrupaciones ( groupBy / groupByKey ) o uniones después, sus datos se volverán a particionar entre bastidores . Dado que este podría ser un paso costoso, no queremos que eso suceda innecesariamente. Si SABES que las claves de registro siempre son válidas O que no necesitas operar con las claves (por lo tanto, manteniéndolas como claves "en bruto" en formato byte[] ), puedes cambiar de flatMap a flatMapValues , que no resultará en la re partición de datos, incluso si se une / agrupa / agrega la secuencia más tarde.

Ejemplo de código:

Serde<byte[]> bytesSerde = Serdes.ByteArray(); Serde<String> stringSerde = Serdes.String(); Serde<Long> longSerde = Serdes.Long(); // Input topic, which might contain corrupted messages KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic); // Note how the returned stream is of type KStream<String, Long>, // rather than KStream<byte[], byte[]>. KStream<String, Long> doubled = input.flatMap( (k, v) -> { try { // Attempt deserialization String key = stringSerde.deserializer().deserialize(inputTopic, k); long value = longSerde.deserializer().deserialize(inputTopic, v); // Ok, the record is valid (not corrupted). Let''s take the // opportunity to also process the record in some way so that // we haven''t paid the deserialization cost just for "poison pill" // checking. return Collections.singletonList(KeyValue.pair(key, 2 * value)); } catch (SerializationException e) { // log + ignore/skip the corrupted message System.err.println("Could not deserialize record: " + e.getMessage()); } return Collections.emptyList(); } );

Opción 2: cola de mensajes muertos con branch

En comparación con la opción 1 (que ignora los registros dañados), la opción 2 retiene los mensajes dañados al filtrarlos del flujo de entrada "principal" y escribirlos en un tema de cuarentena (piense: cola de letra muerta). El inconveniente es que, para registros válidos, debemos pagar el costo de deserialización manual dos veces.

KStream<byte[], byte[]> input = ...; KStream<byte[], byte[]>[] partitioned = input.branch( (k, v) -> { boolean isValidRecord = false; try { stringSerde.deserializer().deserialize(inputTopic, k); longSerde.deserializer().deserialize(inputTopic, v); isValidRecord = true; } catch (SerializationException ignored) {} return isValidRecord; }, (k, v) -> true ); // partitioned[0] is the KStream<byte[], byte[]> that contains // only valid records. partitioned[1] contains only corrupted // records and thus acts as a "dead letter queue". KStream<String, Long> doubled = partitioned[0].map( (key, value) -> KeyValue.pair( // Must deserialize a second time unfortunately. stringSerde.deserializer().deserialize(inputTopic, key), 2 * longSerde.deserializer().deserialize(inputTopic, value))); // Don''t forget to actually write the dead letter queue back to Kafka! partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");

Opción 3: Omitir registros corruptos con filter

Solo menciono esto por completo. Esta opción parece una combinación de las opciones 1 y 2, pero es peor que cualquiera de ellas. En comparación con la opción 1, debe pagar el costo de deserialización manual para los registros válidos dos veces (¡mal!). En comparación con la opción 2, pierde la capacidad de retener registros dañados en una cola de mensajes no entregados.

KStream<byte[], byte[]> validRecordsOnly = input.filter( (k, v) -> { boolean isValidRecord = false; try { bytesSerde.deserializer().deserialize(inputTopic, k); longSerde.deserializer().deserialize(inputTopic, v); isValidRecord = true; } catch (SerializationException e) { // log + ignore/skip the corrupted message System.err.println("Could not deserialize record: " + e.getMessage()); } return isValidRecord; } ); KStream<String, Long> doubled = validRecordsOnly.map( (key, value) -> KeyValue.pair( // Must deserialize a second time unfortunately. stringSerde.deserializer().deserialize(inputTopic, key), 2 * longSerde.deserializer().deserialize(inputTopic, value)));

Cualquier ayuda muy apreciada.

Espero poder ayudar. Si es así, agradecería sus comentarios sobre cómo podríamos mejorar la API de Kafka Streams para manejar fallas / excepciones de una manera mejor / más conveniente que en la actualidad. :-)