poll partitions kafka interval example auto_offset_reset java message-queue apache-kafka

java - partitions - max poll interval ms



Estrategia efectiva para evitar mensajes duplicados en apache kafka consumer (3)

Esto es lo que Kafka FAQ tiene que decir sobre el tema de exactamente una vez:

¿Cómo obtengo exactamente un mensaje de Kafka?

Exactamente una vez, la semántica tiene dos partes: evitar la duplicación durante la producción de datos y evitar duplicados durante el consumo de datos.

Hay dos enfoques para obtener exactamente una semántica durante la producción de datos:

  • Use un solo escritor por partición y cada vez que reciba un error de red, verifique el último mensaje en esa partición para ver si su última escritura tuvo éxito.
  • Incluya una clave principal (UUID o algo así) en el mensaje y deduplique en el consumidor.

Si haces una de estas cosas, el registro que hospeda Kafka será libre de duplicados. Sin embargo, la lectura sin duplicados también depende de la cooperación del consumidor. Si el consumidor está periódicamente controlando su posición, entonces si falla y se reinicia, se reiniciará desde la posición de control. Por lo tanto, si la salida de datos y el punto de control no se escriben atómicamente, también será posible obtener duplicados aquí. Este problema es particular a su sistema de almacenamiento. Por ejemplo, si está utilizando una base de datos, podría cometerlos juntos en una transacción. El cargador HDFS Camus que escribió LinkedIn hace algo como esto para las cargas de Hadoop. La otra alternativa que no requiere una transacción es almacenar el desplazamiento con los datos cargados y deduplicados usando la combinación tema / partición / desplazamiento.

Creo que hay dos mejoras que harían esto mucho más fácil:

  • La identificación del productor se puede hacer de forma automática y mucho más barata si se integra opcionalmente el soporte para esto en el servidor.
  • El consumidor de alto nivel existente no expone gran parte del control más detallado de las compensaciones (por ejemplo, para restablecer su posición). Estaremos trabajando en eso pronto.

Llevo un mes estudiando apache kafka. Sin embargo, estoy atascado en un punto ahora. Mi caso de uso es que tengo dos o más procesos de consumo que se ejecutan en máquinas diferentes. Realicé algunas pruebas en las que publiqué 10,000 mensajes en el servidor kafka. Luego, mientras procesaba estos mensajes, eliminé uno de los procesos del consumidor y lo reinicié. Los consumidores escribían mensajes procesados ​​en un archivo. Así que después de que el consumo terminó, el archivo mostraba más de 10k mensajes. Así que algunos mensajes fueron duplicados.

En el proceso de consumo, he deshabilitado la confirmación automática. Los consumidores cometen manualmente compensaciones por lotes. Así, por ejemplo, si se escriben 100 mensajes en un archivo, el consumidor realiza compensaciones. Cuando el proceso de un solo consumidor se está ejecutando y se bloquea y recupera la duplicación, se evita de esta manera. Pero cuando más de un consumidor se está ejecutando y uno de ellos se bloquea y se recupera, escribe mensajes duplicados en el archivo.

¿Hay alguna estrategia efectiva para evitar estos mensajes duplicados?


Estoy de acuerdo con el deduplicado de RaGe en el lado del consumidor. Y usamos Redis para deduplicar el mensaje Kafka.

Supongamos que la clase de mensaje tiene un miembro llamado ''uniqId'', que es llenado por el lado del productor y se garantiza que es único. Utilizamos una cadena aleatoria de 12 longitudes. (la ''^[A-Za-z0-9]{12}$'' es ''^[A-Za-z0-9]{12}$'' )

El lado del consumidor usa el SETNX de Redis para deduplicar y EXPIRE para purgar las claves caducadas automáticamente. Código de muestra:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson(); Jedis jedis = ... // eg. JedisPool.getResource(); String key = "SPOUT:" + msg.uniqId; // prefix name at will String val = Long.toString(System.currentTimeMillis()); long rsps = jedis.setnx(key, val); if (rsps <= 0) { log.warn("kafka dup: {}", msg.toJson()); // and other logic } else { jedis.expire(key, 7200); // 2 hours is ok for production environment; }

El código anterior detectó mensajes duplicados varias veces cuando Kafka (versión 0.8.x) tuvo situaciones. Con nuestro registro de auditoría de balance de entrada / salida, no se perdió ningún mensaje o se produjo un dup.


La respuesta corta es no.

Lo que estás buscando es exactamente una vez el procesamiento. Si bien a menudo puede parecer factible, nunca se debe confiar en él porque siempre hay advertencias.

Incluso para intentar evitar duplicados, debe utilizar el consumidor simple. La forma en que funciona este enfoque es para cada consumidor, cuando se consume un mensaje de alguna partición, escriba la partición y el desplazamiento del mensaje consumido en el disco. Cuando el consumidor se reinicie después de una falla, lea la última compensación consumida para cada partición del disco.

Pero incluso con este patrón, el consumidor no puede garantizar que no reprocesará un mensaje después de una falla. ¿Qué sucede si el consumidor consume un mensaje y luego falla antes de que la compensación se descargue al disco? Si escribe en el disco antes de procesar el mensaje, ¿qué sucede si escribe el desplazamiento y luego falla antes de procesar realmente el mensaje? Este mismo problema existiría incluso si tuviera que enviar compensaciones a ZooKeeper después de cada mensaje.

Sin embargo, hay algunos casos en los que el procesamiento de una sola vez es más asequible, pero solo para ciertos casos de uso. Esto simplemente requiere que su desplazamiento se almacene en la misma ubicación que la salida de la aplicación de la unidad. Por ejemplo, si escribe un consumidor que cuenta los mensajes, al almacenar el último desplazamiento contado con cada recuento, puede garantizar que el desplazamiento se almacene al mismo tiempo que el estado del consumidor. Por supuesto, para garantizar exactamente el procesamiento de una sola vez, esto requerirá que consuma exactamente un mensaje y actualice el estado exactamente una vez para cada mensaje, y eso es completamente impráctico para la mayoría de las aplicaciones de consumo de Kafka. Por su naturaleza, Kafka consume mensajes en lotes por razones de rendimiento.

Por lo general, su tiempo se gastará más y su aplicación será mucho más confiable si simplemente lo diseña para que sea idempotente.