amazon-web-services - programa - sqs chile
Encontrando ciertos mensajes en SQS (6)
A pesar de que al solicitar atributos específicos, el valor solo se establecerá en nulo para los mensajes que no contienen el atributo, aún se puede usar para filtrar de una manera. Aquellos que no tienen el atributo establecido de la manera que usted desea pueden tener su visibilidad establecida en 1 y luego liberada, por lo que permanecerán en la cola. Brindaría una forma burda de hacer una cola de prioridad, aunque también podría hacer lo mismo en función del contenido del mensaje.
Sé que SQS no está hecho para eso, pero tengo curiosidad, ¿es posible encontrar mensajes en una cola que cumplan con algunos criterios?
Puedo extraer mensajes en un bucle, buscar en los cuerpos de los mensajes algún patrón (sin siquiera deserializarlos) y filtrar los mensajes que necesitaba. Pero luego es posible terminar con un bucle infinito: los primeros mensajes que leo volverán a la cola cuando llegue al final de la cola ...
Ampliación de la visibilidad de los mensajes posible, pero ¿cómo sé exactamente cuánto tardará en escanear toda la cola y durante cuánto tiempo debería ampliar la visibilidad? ¿Qué pasa si tengo literalmente diez mil mensajes allí?
¿Hay alguna solución aquí? Necesito escanear la cola en busca de algunos mensajes y eliminarlos ...
Entendamos esto a través de algunos ejemplos, así que cree 10 mensajes y envíelos
// Send a message
for (int i = 0; i < 10; i++) {
System.out.println("Sending a message to MyQueue./n");
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
// extra code
String sdate;
Format formatter;
Date date = new Date();
// 2012-12-01
formatter = new SimpleDateFormat("yyyy-MM-dd''T''HH:mm:ss.SSSZ");
sdate = formatter.format(date);
System.out.println(sdate);
messageAttributes.put("Datestamp"+i, new MessageAttributeValue().withDataType("String").withStringValue(sdate));
Map<String, MessageAttributeValue> messageAttributes1 = new HashMap<>();
messageAttributes1.put("attributeName", new MessageAttributeValue().withDataType("String").withStringValue(sdate));
SendMessageRequest request = new SendMessageRequest();
request.withMessageBody("A test message body."+sdate);
request.withQueueUrl(myQueueUrl);
request.withMessageAttributes(messageAttributes);
sqs.sendMessage(request);
}
Ahora incluso tienes 10 mensajes con datetimestamp1 a datetimestamp10
El filtrado con atributo no funcionará
vamos a tratar de filtrar con algún atributo myTag
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
//ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.withMaxNumberOfMessages(10);
receiveMessageRequest.withMessageAttributeNames("myTag");
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
Da 10 mensajes y el valor de myTag es nulo.
message.getMessageAttributes (). get ("Datestamp") es nulo message.getMessageAttributes (). get ("myTag") es nulo
Por lo tanto, no podemos filtrar con el atributo de mensaje como si no se encontrara esa clave. ningún atributo de mensaje o con Todos los atributos de mensaje es igual.
Tan larga respuesta es NOOOOO.
No creo que las respuestas cortas o largas sean "No".
Aquí hay dos soluciones de contrapunto que son "Sí".
- Atravesando la cola, manteniendo una lista visitada
- Uso de patrones de integración empresarial (enrutamiento de mensajes) para dividir sus mensajes en flujos descendentes según los criterios
1. Atravesando la cola, manteniendo una lista visitada
Considere el caso de una cola con N
mensajes, sin que se agreguen o eliminen mensajes. Sin información adicional (por ejemplo, si sabía cuántos mensajes deberían coincidir con sus criterios), debe atravesar todos los N
mensajes.
El punto clave aquí es saber cuándo has atravesado todos los N
mensajes. Hay algunos problemas aquí.
- Para saber exactamente, tendrías que rastrear los mensajes a medida que se agregan a la cola.
- Para saber aproximadamente, puede obtener el atributo
ApproximateNumberOfMessages
de la cola - O puede recibir mensajes en un bucle, mantener una lista visitada, y asumir que eventualmente muestreará y agotará los mensajes de cada servidor en el que se divide la cola.
Para mantener la lista visitada, a medida que recibe mensajes y evalúa sus criterios de coincidencia, puede almacenar el message_id
de todos los mensajes visitados.
Los identificadores de mensajes son casi únicos. Ver este hilo
https://forums.aws.amazon.com/message.jspa?messageID=76119
Si eligiera (3), no estaría seguro de cuántas iteraciones se necesitarían para agotar la cola. Sin embargo, si realiza esto de forma indefinida, se le garantiza que se enviará rápidamente la cola siempre que la distribución aleatoria ponderada sobre los servidores de fragmentos SQS les dé a todos una probabilidad distinta de cero.
2. Uso de patrones de integración empresarial (enrutamiento de mensajes) para dividir sus mensajes en flujos descendentes según los criterios
Si tiene control sobre su arquitectura de mensajería, podría usar un enrutador de mensajes como un procesador de mensajes "front-end" que envía mensajes a varios destinatarios según los criterios.
Y específicamente usarías un enrutador basado en contenido:
http://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html
Probado con diferentes casos. No funciona. La respuesta es no
Datos de prueba
public void fillQueueWithMessages(){
MessageAttributeValue value1 = new MessageAttributeValue();
value1.setDataType("String");
value1.setStringValue("1");
SendMessageRequest send_msg_request = new SendMessageRequest()
.withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
.withMessageBody("test1").addMessageAttributesEntry(value1.getStringValue(), value1);
amazonSqs.sendMessage(send_msg_request);
MessageAttributeValue value2 = new MessageAttributeValue();
value2.setDataType("String");
value2.setStringValue("2");
SendMessageRequest send_msg_request2 = new SendMessageRequest()
.withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
.withMessageBody("test2").addMessageAttributesEntry(value2.getStringValue(), value2);
amazonSqs.sendMessage(send_msg_request2);
SendMessageRequest send_msg_request3 = new SendMessageRequest()
.withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
.withMessageBody("test3").addMessageAttributesEntry(value1.getStringValue(), value1);
amazonSqs.sendMessage(send_msg_request3);
}
Prueba
public void shouldPollMessagesBasedOnMessageAttribute() throws InterruptedException {
ReceiveMessageRequest request =
new ReceiveMessageRequest(env.getProperty("cloud.aws.sqs.readyForTranslation.url"));
request.setMaxNumberOfMessages(3);
request.setWaitTimeSeconds(20);
request.withMessageAttributeNames("1");
List<Message> messages = new ArrayList<Message>();
messages = amazonSqs.receiveMessage(request).getMessages();
assertEquals(2, messages.size());
}
Respuesta corta: no.
Las colas están diseñadas para cosas como tareas. Una máquina toma una nueva tarea (es decir, un mensaje) de la cola, ejecuta la tarea y luego elimina la tarea.
Si está intentando buscar los mensajes para filtrarlos, no puedo evitar preguntarme si está usando la herramienta incorrecta para el trabajo ...
esto en realidad no es del todo cierto,
de hecho, puede "un poco" filtrar los mensajes en una cola usando los atributos de los mensajes.
cada mensaje puede contener atributos que puede agregar al crear el mensaje (deberá proporcionar 3 elementos para cada atributo: nombre, tipo, valor).
más adelante, cuando crea un nuevo objeto ReceiveMessageRequest, puede usar "withMessageAttributeNames" para especificar un atributo, y lo que realmente sucede es que su cola se filtra para el mensaje que contiene ese atributo específico.
por ejemplo:
String queueUrl = sqs.getQueueUrl("myQueue").getQueueUrl();
ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);
receiveRequest.withMaxNumberOfMessages(10);
receiveRequest.withMessageAttributeNames("myTag");
Si su cola contenía 5 mensajes pero solo 1 tenía el atributo "myTag", solo se devolverá ese específico.
Esto fue abrumador para mí, ya que no se menciona en la API ReceiveMessageRequest
básicamente, todo lo que tiene que hacer es dar a cada mensaje un atributo único (preste atención a los límites de los atributos: The message attribute name can contain the following characters: AZ, az, 0-9, underscore (_), hyphen (-), and period (.). The name must not start or end with a period, and it should not have successive periods. The name is case sensitive and must be unique among all attribute names for the message. The name can be up to 256 characters long. The name cannot start with "AWS." or "Amazon." (or any variations in casing)