library awesome python rabbitmq pika

library - awesome python



Consumir colas mĂșltiples en python/pika (2)

Estoy tratando de crear un consumidor que se suscriba a varias colas y luego procese los mensajes a medida que llegan.

El problema es que cuando hay algunos datos ya presentes en la primera cola, consume la primera cola y nunca va a consumir la segunda cola. Sin embargo, cuando la primera cola está vacía, va a la siguiente cola y, a continuación, consume ambas colas simultáneamente.

Primero había implementado el subprocesamiento pero quería evitarlo, cuando pika library lo hace por mí sin mucha complejidad. A continuación se muestra mi código:

import pika mq_connection = pika.BlockingConnection(pika.ConnectionParameters(''x.x.x.x'')) mq_channel = mq_connection.channel() mq_channel.basic_qos(prefetch_count=1) def callback(ch, method, properties, body): print body mq_channel.basic_ack(delivery_tag=method.delivery_tag) mq_channel.basic_consume(callback, queue=''queue1'', consumer_tag="ctag1.0") mq_channel.basic_consume(callback, queue=''queue2'', consumer_tag="ctag2.0") mq_channel.start_consuming()


El problema es más probable que la primera llamada haya emitido un Basic.Consume y ya haya recibido mensajes de una cola previamente completada antes de que se emita la segunda llamada. Es posible que desee intentar establecer el recuento de captación previa de QoS en 1, lo que evitará que RabbitMQ le envíe más de un mensaje a la vez.


Una posible solución es utilizar la conexión sin bloqueo y consumir mensajes.

import pika def callback(channel, method, properties, body): print(body) channel.basic_ack(delivery_tag=method.delivery_tag) def on_open(connection): connection.channel(on_channel_open) def on_channel_open(channel): channel.basic_consume(callback, queue=''queue1'') channel.basic_consume(callback, queue=''queue2'') parameters = pika.URLParameters(''amqp://guest:guest@localhost:5672/%2F'') connection = pika.SelectConnection(parameters=parameters, on_open_callback=on_open) try: connection.ioloop.start() except KeyboardInterrupt: connection.close()

Esto se conectará a múltiples colas y consumirá mensajes en consecuencia.