with tutorial rabbit nodejs exchange espaƱol rabbitmq pika

tutorial - rabbitmq with java



Manejo de tareas de larga ejecuciĆ³n en pika/RabbitMQ (4)

  1. Puede realizar una llamada periódica a connection.process_data_events() en su long_running_task(connection) , esta función enviará un latido al servidor cuando se lo llame, y evitará que el cliente pika se cierre.
  2. Establezca el valor de latido cardiaco mayor que el período de call.process_data_events connection.process_data_events() de llamada en su pika BlockingConnection .

Estamos tratando de establecer un sistema básico de cola dirigida donde un productor generará varias tareas y uno o más consumidores tomarán una tarea a la vez, la procesarán y reconocerán el mensaje.

El problema es que el procesamiento puede tomar de 10 a 20 minutos y no estamos respondiendo a los mensajes en ese momento, lo que hace que el servidor nos desconecte.

Aquí hay un pseudo código para nuestro consumidor:

#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=''localhost'')) channel = connection.channel() channel.queue_declare(queue=''task_queue'', durable=True) print '' [*] Waiting for messages. To exit press CTRL+C'' def callback(ch, method, properties, body): long_running_task(connection) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=''task_queue'') channel.start_consuming()

Después de que se completa la primera tarea, se lanza una excepción en el fondo de BlockingConnection, quejándose de que el socket se reinició. Además, los registros de RabbitMQ muestran que el consumidor fue desconectado por no responder a tiempo (por qué restablece la conexión en lugar de enviar un FIN es extraño, pero no nos preocuparemos por eso).

Buscamos mucho porque creíamos que este era el caso de uso normal de RabbitMQ (con muchas tareas largas que deberían dividirse entre muchos consumidores), pero parece que nadie más tenía este problema. Finalmente tropezamos con un hilo donde se recomendaba utilizar latidos del corazón y generar la long_running_task() en un hilo separado.

Entonces el código se ha convertido en:

#!/usr/bin/env python import pika import time import threading connection = pika.BlockingConnection(pika.ConnectionParameters( host=''localhost'', heartbeat_interval=20)) channel = connection.channel() channel.queue_declare(queue=''task_queue'', durable=True) print '' [*] Waiting for messages. To exit press CTRL+C'' def thread_func(ch, method, body): long_running_task(connection) ch.basic_ack(delivery_tag = method.delivery_tag) def callback(ch, method, properties, body): threading.Thread(target=thread_func, args=(ch, method, body)).start() channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=''task_queue'') channel.start_consuming()

Y esto parece funcionar, pero es muy complicado. ¿Estamos seguros de que el objeto ch es seguro para subprocesos? Además, imagine que long_running_task() está usando ese parámetro de conexión para agregar una tarea a una nueva cola (es decir, la primera parte de este proceso largo ya está lista, vamos a enviar la tarea a la segunda parte). Entonces, el hilo está usando el objeto de connection . Es ese hilo seguro?

Más al punto, ¿cuál es la forma preferida de hacer esto? Siento que esto es muy complicado y posiblemente no esté seguro, así que tal vez no lo estamos haciendo bien. ¡Gracias!


Me encuentro con el mismo problema que tuviste.
Mi solución es:

  1. evitar los latidos del corazón en el lado del servidor
  2. evaluar el tiempo máximo que la tarea puede tomar
  3. establecer el tiempo de espera del latido del cliente al tiempo obtenido desde el paso 2

¿Por qué esto?

Como pruebo con los siguientes casos:

caso uno
  1. latido del servidor se enciende, 1800
  2. cliente desarmado

Aún recibo un error cuando la tarea se ejecuta durante mucho tiempo -> 1800

caso dos
  1. apagar el latido del corazón del servidor
  2. desactivar el latido del corazón del cliente

No hay ningún error en el lado del cliente, excepto un problema: cuando el cliente falla (mi sistema operativo se reinicia en algunas fallas), la conexión tcp aún se puede ver en el complemento de administración Rabbitmq. Y es confuso

caso tres
  1. apagar el latido del corazón del servidor
  2. activar el latido del corazón del cliente, configurarlo para el tiempo máximo de ejecución previsto

En este caso, puedo cambiar dinámicamente cada heatbeat en un cliente individual. De hecho, establecí el latido del corazón en las máquinas se estrelló con frecuencia. Además, puedo ver la máquina fuera de línea a través del plugin Rabbitmq Manangement.

Ambiente

Sistema operativo: centos x86_64
pika: 0.9.13
rabbitmq: 3.3.1



Por ahora, su mejor opción es apagar los latidos del corazón, esto evitará que RabbitMQ cierre la conexión si está bloqueando durante demasiado tiempo. Estoy experimentando con la administración de conexión central de Pika y el bucle IO ejecutándose en un hilo de fondo, pero no es lo suficientemente estable como para liberarlo.