simple run results beat app python django celery

python - run - django-celery-results



Ejecutar tareas "Ășnicas" con apio (5)

Utilizo apio para actualizar los canales RSS en mi sitio de agregación de noticias. Uso una @task para cada feed, y las cosas parecen funcionar muy bien.

Sin embargo, hay un detalle que no estoy seguro de manejar bien: todos los feeds se actualizan una vez por minuto con @periodic_task, pero ¿qué sucede si un feed aún se está actualizando desde la última tarea periódica cuando se inicia uno nuevo? (por ejemplo, si el feed es muy lento o no está conectado y la tarea se mantiene en un ciclo de reintento)

Actualmente almaceno resultados de tareas y compruebo su estado de esta manera:

import socket from datetime import timedelta from celery.decorators import task, periodic_task from aggregator.models import Feed _results = {} @periodic_task(run_every=timedelta(minutes=1)) def fetch_articles(): for feed in Feed.objects.all(): if feed.pk in _results: if not _results[feed.pk].ready(): # The task is not finished yet continue _results[feed.pk] = update_feed.delay(feed) @task() def update_feed(feed): try: feed.fetch_articles() except socket.error, exc: update_feed.retry(args=[feed], exc=exc)

¿Tal vez hay una manera más sofisticada / robusta de lograr el mismo resultado utilizando algún mecanismo de apio que eché de menos?



El uso de https://pypi.python.org/pypi/celery_once parece hacer muy bien el trabajo, incluidos los errores de informe y las pruebas en relación con algunos parámetros de exclusividad.

Puedes hacer cosas como:

from celery_once import QueueOnce from myapp.celery import app from time import sleep @app.task(base=QueueOnce, once=dict(keys=(''customer_id'',))) def start_billing(customer_id, year, month): sleep(30) return "Done!"

que solo necesita la siguiente configuración en tu proyecto:

ONCE_REDIS_URL = ''redis://localhost:6379/0'' ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale


Esta solución para el trabajo de apio en un solo host con concurencia mayor 1. Otros tipos (sin dependencias como redis) de bloqueos diferencia basada en archivos no funcionan con concurrencia mayor 1.

class Lock(object): def __init__(self, filename): self.f = open(filename, ''w'') def __enter__(self): try: flock(self.f.fileno(), LOCK_EX | LOCK_NB) return True except IOError: pass return False def __exit__(self, *args): self.f.close() class SinglePeriodicTask(PeriodicTask): abstract = True run_every = timedelta(seconds=1) def __call__(self, *args, **kwargs): lock_filename = join(''/tmp'', md5(self.name).hexdigest()) with Lock(lock_filename) as is_locked: if is_locked: super(SinglePeriodicTask, self).__call__(*args, **kwargs) else: print ''already working'' class SearchTask(SinglePeriodicTask): restart_delay = timedelta(seconds=60) def run(self, *args, **kwargs): print self.name, ''start'', datetime.now() sleep(5) print self.name, ''end'', datetime.now()


Según la respuesta de MattH, podrías usar un decorador como este:

def single_instance_task(timeout): def task_exc(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func(*args, **kwargs) finally: release_lock() return wrapper return task_exc

luego, úsalo así ...

@periodic_task(run_every=timedelta(minutes=1)) @single_instance_task(60*10) def fetch_articles() yada yada...


Si está buscando un ejemplo que no use Django, loose-bits.com/2010/10/distributed-task-locking-in-celery.html (advertencia: usa Redis, que ya estaba usando).

El código del decorador es el siguiente (crédito completo para el autor del artículo, ve a leerlo)

import redis REDIS_CLIENT = redis.Redis() def only_one(function=None, key="", timeout=None): """Enforce only one celery task at a time.""" def _dec(run_func): """Decorator.""" def _caller(*args, **kwargs): """Caller.""" ret_value = None have_lock = False lock = REDIS_CLIENT.lock(key, timeout=timeout) try: have_lock = lock.acquire(blocking=False) if have_lock: ret_value = run_func(*args, **kwargs) finally: if have_lock: lock.release() return ret_value return _caller return _dec(function) if function is not None else _dec