python - tasks - django celery rabbitmq
¿Cómo puedo configurar Celery para que llame a una función de inicialización personalizada antes de ejecutar mis tareas? (1)
Tengo un proyecto Django y estoy tratando de usar Celery para enviar tareas para el procesamiento en segundo plano ( http://ask.github.com/celery/introduction.html ). Celery se integra bien con Django y he podido enviar mis tareas personalizadas y obtener resultados.
El único problema es que no puedo encontrar una forma sensata de realizar la inicialización personalizada en el proceso del daemon. Necesito llamar a una función costosa que carga mucha memoria antes de comenzar a procesar las tareas, y no puedo permitirme llamar a esa función todas las veces.
¿Alguien tuvo este problema antes? ¿Alguna idea de cómo evitarlo sin modificar el código fuente de Celery?
Gracias
Puede escribir un cargador personalizado o usar las señales.
Los cargadores tienen el método on_task_init
, que se llama cuando una tarea está a punto de ejecutarse, y on_worker_init
que es llamado por el proceso principal de apio + celerybeat.
El uso de señales es probablemente lo más fácil, las señales disponibles son:
0.8.x:
task_prerun(task_id, task, args, kwargs)
Se distribuye cuando una tarea está a punto de ser ejecutada por el trabajador (o localmente si está utilizando
apply
/ o si se ha establecidoCELERY_ALWAYS_EAGER
).task_postrun(task_id, task, args, kwargs, retval)
distribuye después de que una tarea se haya ejecutado en las mismas condiciones que anteriormente.task_sent(task_id, task, args, kwargs, eta, taskset)
Se llama cuando se aplica una tarea (no es bueno para operaciones de larga ejecución)
Señales adicionales disponibles en 0.9.x (rama maestra actual en github):
worker_init()
Se invoca cuando se ha iniciado celeryd (antes de que se inicialice la tarea, por lo que si se encuentra en un
fork
soporte del sistema, todos los cambios de memoria se copiarían a los procesos de los trabajadores secundarios).worker_ready()
Llamado cuando apio es capaz de recibir tareas.
worker_shutdown()
Llamado cuando apio se apaga.
Aquí hay un ejemplo que precalcula algo la primera vez que se ejecuta una tarea en el proceso:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it''s empty, so haven''t been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Si desea que la función se ejecute para todas las tareas, simplemente omita el argumento del sender
.