schedulers beat_schedule beat autodiscover_tasks add_periodic_task python celery celerybeat

python - beat_schedule - Cómo agregar/eliminar dinámicamente tareas periódicas a apio(apio de cebo)



django beat_schedule (4)

Si tengo una función definida de la siguiente manera:

def add(x,y): return x+y

¿Hay alguna manera de agregar dinámicamente esta función como una PeriodicTask de apio y ponerla en marcha en tiempo de ejecución? Me gustaría poder hacer algo como (pseudocódigo):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) celery.beat.start(some_unique_task_id)

También me gustaría detener o eliminar esa tarea de forma dinámica con algo así como (pseudocódigo):

celery.beat.remove_task(some_unique_task_id)

o

celery.beat.stop(some_unique_task_id)

FYI No estoy usando djcelery, que te permite administrar tareas periódicas a través del administrador de django.


Esta pregunta fue respondida en grupos de google .

NO SOY EL AUTOR, todo el mérito es para Jean Mark

Aquí hay una solución adecuada para esto. Trabajo confirmado. En mi caso, clasifiqué la tarea periódica y creé un modelo, ya que puedo agregar otros campos al modelo que necesito y también para poder agregar el método "terminar". Debe establecer la propiedad habilitada de la tarea periódica en False y guardarla antes de eliminarla. Toda la subclasificación no es obligatoria, el método schedule_every es el que realmente hace el trabajo. Cuando esté listo para finalizar su tarea (si no la subclasificó), puede usar PeriodicTask.objects.filter (name = ...) para buscar su tarea, deshabilitarla y luego eliminarla.

¡Espero que esto ayude!

from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime class TaskScheduler(models.Model): periodic_task = models.ForeignKey(PeriodicTask) @staticmethod def schedule_every(task_name, period, every, args=None, kwargs=None): """ schedules a task by name every "every" "period". So an example call would be: TaskScheduler(''mycustomtask'', ''seconds'', 30, [1,2,3]) that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. """ permissible_periods = [''days'', ''hours'', ''minutes'', ''seconds''] if period not in permissible_periods: raise Exception(''Invalid period specified'') # create the periodic task and the interval ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) if interval_schedules: # just check if interval schedules exist like that already and reuse em interval_schedule = interval_schedules[0] else: # create a brand new interval schedule interval_schedule = IntervalSchedule() interval_schedule.every = every # should check to make sure this is a positive int interval_schedule.period = period interval_schedule.save() ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) if args: ptask.args = args if kwargs: ptask.kwargs = kwargs ptask.save() return TaskScheduler.objects.create(periodic_task=ptask) def stop(self): """pauses the task""" ptask = self.periodic_task ptask.enabled = False ptask.save() def start(self): """starts the task""" ptask = self.periodic_task ptask.enabled = True ptask.save() def terminate(self): self.stop() ptask = self.periodic_task self.delete() ptask.delete()


Hay una biblioteca llamada django-apio-beat que proporciona los modelos que uno necesita. Para hacer que cargue dinámicamente nuevas tareas periódicas, uno debe crear su propio Programador.

from django_celery_beat.schedulers import DatabaseScheduler class AutoUpdateScheduler(DatabaseScheduler): def tick(self, *args, **kwargs): if self.schedule_changed(): print(''resetting heap'') self.sync() self._heap = None new_schedule = self.all_as_schedule() if new_schedule: to_add = new_schedule.keys() - self.schedule.keys() to_remove = self.schedule.keys() - new_schedule.keys() for key in to_add: self.schedule[key] = new_schedule[key] for key in to_remove: del self.schedule[key] super(AutoUpdateScheduler, self).tick(*args, **kwargs) @property def schedule(self): if not self._initial_read and not self._schedule: self._initial_read = True self._schedule = self.all_as_schedule() return self._schedule


No, lo siento, esto no es posible con el celerybeat regular.

Pero es fácilmente extensible para hacer lo que quiera, por ejemplo, el planificador django-apio es solo una subclase leyendo y escribiendo el cronograma en la base de datos (con algunas optimizaciones en la parte superior).

También puedes usar el planificador django-apio incluso para proyectos que no son de Django.

Algo como esto:

  • Instala django + django-apio:

    $ pip install -U django django-apio

  • Agregue las siguientes configuraciones a su apioconfig:

    DATABASES = { ''default'': { ''NAME'': ''celerybeat.db'', ''ENGINE'': ''django.db.backends.sqlite3'', }, } INSTALLED_APPS = (''djcelery'', )

  • Crea las tablas de la base de datos:

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig

  • Comience apio de cebo con el programador de base de datos:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig / -S djcelery.schedulers.DatabaseScheduler

También está el comando djcelerymon que se puede usar para proyectos que no son de Django para iniciar apiocam y un servidor web Admin de Django en el mismo proceso, puede usarlo para editar también sus tareas periódicas en una bonita interfaz web:

$ djcelerymon

(Tenga en cuenta que por alguna razón, djcelerymon no se puede detener usando Ctrl + C, tiene que usar Ctrl + Z + kill% 1)


Puede ver este flask-djcelery que configura matraz y djcelery y también proporciona api de búsqueda navegable