python - significa - Cómo enviar tareas periódicas a una cola específica en Apio
qué significa en español (3)
Encontré la solución para este problema:
1) En primer lugar, cambié la forma de configurar las tareas periódicas. Usé el decorador @periodic_task de esta manera:
@periodic_task(run_every=crontab(minute=''5''),
queue=''celery_periodic'',
options={''queue'': ''celery_periodic''})
def recalc_last_hour():
dt = datetime.utcnow()
prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) /
- timedelta(hours=1)
log.debug(''Generating task for hour %s'', str(prev_hour))
recalc_hour.delay(prev_hour)
2) Escribí celery_periodic dos veces en params en @periodic_task :
La opción queue = ''celery_periodic'' se usa cuando invocas una tarea desde el código (.delay o .apply_async)
options = {''queue'': la opción ''celery_periodic''} se usa cuando Celery Beat lo invoca.
Estoy seguro de que lo mismo es posible si configurara tareas periódicas con la variable CELERYBEAT_SCHEDULE.
UPD. Esta solución es correcta tanto para el almacenamiento basado en DB como en el basado en archivos para CELERYBEAT_SCHEDULER .
De forma predeterminada, Celery envía todas las tareas a la cola ''apio'', pero puede cambiar este comportamiento agregando un parámetro extra:
@task(queue=''celery_periodic'')
def recalc_last_hour():
log.debug(''sending new task'')
recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example
Configuración del programador:
CELERYBEAT_SCHEDULE = {
''installer_recalc_hour'': {
''task'': ''stats.installer.tasks.recalc_last_hour'',
''schedule'': 15 # every 15 sec for test
},
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
Ejecutar trabajador:
python manage.py celery worker -c 1 -Q celery_periodic -B -E
Este esquema no funciona como se esperaba: estos trabajadores envían tareas periódicas a la cola de "apio", no a "apio_periodo". ¿Cómo puedo arreglar eso?
PS apio == 3.0.16
Periódico se envían a las colas por celerybeat. Puedes hacer todo lo que hacemos con Celery api. Aquí está la lista de configuraciones que viene con celerybeat.
http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields
En tu caso
CELERYBEAT_SCHEDULE = {
''installer_recalc_hour'': {
''task'': ''stats.installer.tasks.recalc_last_hour'',
''schedule'': 15 # every 15 sec for test,
''options'': {''queue'' : ''celery_periodic''} ##options are mapped to apply_async options
},
}
Y si está utilizando el planificador de base de datos djcelery, puede especificar la cola en el campo Opciones de ejecución -> cola