python - beat - run celery
Los resultados del apio(Redis) no funcionan (3)
Tengo una aplicación web que utiliza Django y estoy usando Celery para el procesamiento de algunas tareas asíncronas.
Para Celery, estoy usando Rabbitmq como agente y Redis como resultado.
Rabbitmq y Redis se ejecutan en el mismo servidor Ubuntu 14.04 alojado en una máquina virtual local.
Los trabajadores de apio se ejecutan en máquinas remotas (Windows 10) (ningún trabajador se ejecuta en el servidor Django).
Tengo tres problemas (creo que están relacionados de alguna manera!).
- Las tareas permanecen en el estado ''PENDIENTE'' sin importar si las tareas se realizaron con éxito o no.
- las tareas no se reintentan cuando fallan. y me sale este error al intentar reintentar:
reject requeue = False: [WinError 10061] No se pudo establecer conexión porque la máquina de destino lo rechazó activamente
- El backend de resultados no parece funcionar.
¡También estoy confundido acerca de mi configuración, y no sé exactamente de dónde provienen estos problemas!
Así que aquí está mi configuración hasta ahora:
my_app / settings.py
# region Celery Settings
CELERY_CONCURRENCY = 1
CELERY_ACCEPT_CONTENT = [''json'']
# CELERY_RESULT_BACKEND = ''redis://:C@pV@[email protected]:6379/0''
BROKER_URL = ''amqp://soufiaane:C@pV@[email protected]:5672/cvcHost''
CELERY_RESULT_SERIALIZER = ''json''
CELERY_TASK_SERIALIZER = ''json''
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_REDIS_HOST = ''cvc.ma''
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0
CELERY_RESULT_BACKEND = ''redis''
CELERY_RESULT_PASSWORD = "C@pV@lue2016"
REDIS_CONNECT_RETRY = True
AMQP_SERVER = "cvc.ma"
AMQP_PORT = 5672
AMQP_USER = "soufiaane"
AMQP_PASSWORD = "C@pV@lue2016"
AMQP_VHOST = "/cvcHost"
CELERYD_HIJACK_ROOT_LOGGER = True
CELERY_HIJACK_ROOT_LOGGER = True
CELERYBEAT_SCHEDULER = ''djcelery.schedulers.DatabaseScheduler''
# endregion
my_app / celery_settings.py
from __future__ import absolute_import
from django.conf import settings
from celery import Celery
import django
import os
# set the default Django settings module for the ''celery'' program.
os.environ.setdefault(''DJANGO_SETTINGS_MODULE'', ''my_app.settings'')
django.setup()
app = Celery(''CapValue'', broker=''amqp://soufiaane:C@pV@[email protected]/cvcHost'', backend=''redis://:C@pV@[email protected]:6379/0'')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object(''django.conf:settings'')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print(''Request: {0!r}''.format(self.request))
my_app__init__.py
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_settings import app as celery_app
my_app / email / tasks.py
from __future__ import absolute_import
from my_app.celery_settings import app
# here i only define the task skeleton because i''m executing this task on remote workers !
@app.task(name=''email_task'', bind=True, max_retries=3, default_retry_delay=1)
def email_task(self, job, email):
try:
print("x")
except Exception as exc:
self.retry(exc=exc)
en el lado de los trabajadores, tengo un archivo ''tasks.py'' que tiene la implementación real de la tarea:
Trabajador / tareas.py
from __future__ import absolute_import
from celery.utils.log import get_task_logger
from celery import Celery
logger = get_task_logger(__name__)
app = Celery(''CapValue'', broker=''amqp://soufiaane:C@pV@[email protected]/cvcHost'', backend=''redis://:C@pV@[email protected]:6379/0'')
@app.task(name=''email_task'', bind=True, max_retries=3, default_retry_delay=1)
def email_task(self, job, email):
try:
"""
The actual implementation of the task
"""
except Exception as exc:
self.retry(exc=exc)
Lo que sí noté es:
- cuando cambio la configuración del agente en mis trabajadores a una contraseña incorrecta, no puedo conectar con el error del agente.
- cuando cambio la configuración del backend de resultado en mis trabajadores a una contraseña incorrecta, se ejecuta normalmente como si todo estuviera bien.
¿Qué podría estar causándome esos problemas?
EDITAR
En mi servidor Redis, ya habilité la conexión remota
/etc/redis/redis.conf
... se unen 0.0.0.0 ...
Agregar CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
a mi configuración resolvió este problema por mí.
Mi conjetura es que su problema está en la contraseña. Su contraseña tiene @
en ella, lo que podría interpretarse como un divisor entre el user:pass
y la sección de host
.
Los trabajadores permanecen en pendiente porque no pudieron conectarse al agente correctamente. De la documentación de apio http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending
PENDIENTE La tarea está en espera de ejecución o se desconoce. Cualquier ID de tarea que no se conozca está implícito en estado pendiente.
Tenía una configuración donde los ''servidores de instancia única'' (los servidores dev y locahost) estaban funcionando, pero no cuando el servidor redis era otro servidor. Las tareas de apio funcionaban correctamente, pero no el resultado. Recibía el siguiente error al intentar obtener el resultado de una tarea:
Error 111 connecting to localhost:6379. Connection refused.
Lo que lo hizo funcionar fue simplemente agregar esa configuración a Django:
CELERY_RESULT_BACKEND = ''redis://10.10.10.10:6379/0''
Parece que si este parámetro no está presente, se usará de forma predeterminada en localhost para obtener el resultado de las tareas.