queue message-queue amazon-sqs

queue - Uso de muchos consumidores en la cola SQS



message-queue amazon-sqs (4)

Sé que es posible consumir una cola SQS usando múltiples hilos. Me gustaría garantizar que cada mensaje se consumirá una vez. Sé que es posible cambiar el tiempo de espera de visibilidad de un mensaje, por ejemplo, igual a mi tiempo de procesamiento. Si mi proceso pasa más tiempo que el tiempo de espera de visibilidad (por ejemplo, una conexión lenta), otro hilo puede consumir el mismo mensaje.

¿Cuál es el mejor enfoque para garantizar que un mensaje se procesará una vez?


¿Cuál es el mejor enfoque para garantizar que un mensaje se procesará una vez?

Estás pidiendo una garantía , no la obtendrás . Puede reducir la probabilidad de que un mensaje se procese más de una vez a una cantidad muy pequeña , pero no obtendrá una garantía .

Explicaré por qué, junto con estrategias para reducir la duplicación.

¿De dónde viene la duplicación?

  1. Cuando coloca un mensaje en SQS, SQS podría recibir ese mensaje más de una vez
    • Por ejemplo: un pequeño inconveniente de la red al enviar el mensaje provocó un error transitorio que se reintentó automáticamente: desde la perspectiva del remitente del mensaje, falló una vez y se envió con éxito una vez, pero SQS recibió ambos mensajes.
  2. SQS puede generar duplicados internamente
    • De manera similar al primer ejemplo: hay muchas computadoras que manejan mensajes debajo de las cubiertas, y SQS necesita asegurarse de que nada se pierda: los mensajes se almacenan en varios servidores, y esto puede resultar en duplicación.

En su mayor parte, al aprovechar el tiempo de espera de visibilidad de mensajes SQS , las posibilidades de duplicación de estas fuentes ya son bastante pequeñas, como una fracción de un porcentaje pequeño.

Si el procesamiento de duplicados realmente no es tan malo esfuércese por hacer que su consumo de mensajes sea idempotente !), Lo consideraría lo suficientemente bueno: reducir aún más las posibilidades de duplicación es complicado y potencialmente costoso ...

¿Qué puede hacer su aplicación para reducir aún más la duplicación?

Ok, aquí vamos por la madriguera del conejo ... a un alto nivel, querrás asignar identificadores únicos a tus mensajes, y compararlos con un caché atómico de identificadores que están en progreso o completados antes de comenzar el procesamiento:

  1. Asegúrese de que sus mensajes tengan identificadores únicos proporcionados al momento de la inserción
    • Sin esto, no tendrás forma de distinguir los duplicados.
  2. Maneje la duplicación al ''final de la línea'' para los mensajes.
    • Si su receptor de mensajes necesita enviar mensajes fuera de la caja para su posterior procesamiento, entonces puede ser otra fuente de duplicación (por razones similares a las anteriores)
  3. Necesitará un lugar para almacenar atómicamente y verificar estos identificadores únicos (y eliminarlos después de un tiempo de espera). Hay dos estados importantes: "InProgress" y "Completed"
    • Las entradas de InProgress deben tener un tiempo de espera basado en la rapidez con la que necesita recuperarse en caso de falla de procesamiento.
    • Las entradas completadas deben tener un tiempo de espera basado en el tiempo que desea su ventana de deduplicación
    • El más simple es probablemente un caché de guayaba , pero solo sería bueno para una sola aplicación de procesamiento. Si tiene muchos mensajes o consumo distribuido, considere una base de datos para este trabajo (con un proceso de fondo para barrer las entradas caducadas)
  4. Antes de procesar el mensaje, intente almacenar el messageId en "InProgress". Si ya está allí, deténgase: acaba de manejar un duplicado.
  5. Compruebe si el mensaje está "Completado" (y deténgase si está allí)
  6. Su hilo ahora tiene un bloqueo exclusivo en ese ID de mensaje: procese su mensaje
  7. Marque el Id. De mensaje como "Completado": mientras este Id. De mensaje permanezca aquí, no procesará ningún duplicado para ese Id . De mensaje .
    • Sin embargo, es probable que no pueda permitirse el almacenamiento infinito.
  8. Elimine el messageId de "InProgress" (o simplemente deje que caduque desde aquí)

Algunas notas

  • Tenga en cuenta que las posibilidades de duplicar sin todo eso ya son bastante bajas. Dependiendo de cuánto tiempo le valga la deduplicación de los mensajes, no dude en omitir o modificar cualquiera de los pasos.
    • Por ejemplo, podría omitir "InProgress", pero eso abre la pequeña posibilidad de que dos hilos trabajen en un mensaje duplicado al mismo tiempo (el segundo que comienza antes que el primero lo ha "Completado")
  • Su ventana de deduplicación es siempre que pueda mantener los ID de mensaje en "Completado". Como es probable que no pueda permitirse un almacenamiento infinito, haga que dure al menos el doble del tiempo de espera de visibilidad del mensaje SQS; hay posibilidades reducidas de duplicación después de eso (además de las posibilidades ya muy bajas, pero aún no está garantizado).
  • Incluso con todo esto, todavía existe la posibilidad de duplicación : todas las precauciones y los tiempos de espera de visibilidad de mensajes SQS ayudan a reducir esta posibilidad a muy pequeña, pero la posibilidad sigue ahí:
    • Su aplicación puede bloquearse / bloquearse / hacer un GC muy largo justo después de procesar el mensaje, pero antes de que el ID de mensaje esté "Completado" (tal vez esté utilizando una base de datos para este almacenamiento y la conexión a la misma esté inactiva)
    • En este caso, "Procesando" finalmente caducará, y otro hilo podría procesar este mensaje (ya sea después de que el tiempo de espera de visibilidad de SQS también caduque o porque SQS tenía un duplicado).

AWS SQS API no "consume" automáticamente el mensaje cuando lo lee con API, etc. El desarrollador debe realizar la llamada para eliminar el mensaje ellos mismos.

SQS tiene una función llamada "política de redrive" como parte de la "Configuración de cola de letra muerta". Simplemente establezca la solicitud de lectura en 1. Si el proceso de consumo falla, la lectura posterior en el mismo mensaje pondrá el mensaje en la cola de mensajes no entregados.

El tiempo de espera de visibilidad de la cola SQS se puede configurar hasta 12 horas. A menos que tenga una necesidad especial, debe implementar un proceso para almacenar el controlador de mensajes en la base de datos para permitir su inspección.


Almacene el mensaje, o una referencia al mensaje, en una base de datos con una restricción única en la ID del mensaje, cuando lo reciba. Si el ID existe en la tabla, ya lo ha recibido, y la base de datos no le permitirá insertarlo nuevamente, debido a la restricción única.


Puede usar setVisibilityTimeout () para mensajes y lotes, a fin de extender el tiempo de visibilidad hasta que el hilo haya completado el procesamiento del mensaje.

Esto podría hacerse mediante el uso de un ServiceExecutorService y programar un evento ejecutable después de la mitad del tiempo de visibilidad inicial. El siguiente fragmento de código crea y ejecuta el VisibilityTimeExtender cada mitad del tiempo de visibilidad con un período de la mitad del tiempo de visibilidad. (El tiempo debe garantizar el procesamiento del mensaje, extendido con visibilidadTiempo / 2)

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); ScheduledFuture<?> futureEvent = scheduler.scheduleAtFixedRate(new VisibilityTimeExtender(..), visibilityTime/2, visibilityTime/2, TimeUnit.SECONDS);

VisibilityTimeExtender debe implementar Runnable, y es donde actualiza el nuevo tiempo de visibilidad.

Cuando el hilo termina de procesar el mensaje, puede eliminarlo de la cola y llamar a futureEvent.cancel (true) para detener el evento programado.