python - run - django-celery-results
Ejecutar tareas "Ășnicas" con apio (5)
Utilizo apio para actualizar los canales RSS en mi sitio de agregación de noticias. Uso una @task para cada feed, y las cosas parecen funcionar muy bien.
Sin embargo, hay un detalle que no estoy seguro de manejar bien: todos los feeds se actualizan una vez por minuto con @periodic_task, pero ¿qué sucede si un feed aún se está actualizando desde la última tarea periódica cuando se inicia uno nuevo? (por ejemplo, si el feed es muy lento o no está conectado y la tarea se mantiene en un ciclo de reintento)
Actualmente almaceno resultados de tareas y compruebo su estado de esta manera:
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed
_results = {}
@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
for feed in Feed.objects.all():
if feed.pk in _results:
if not _results[feed.pk].ready():
# The task is not finished yet
continue
_results[feed.pk] = update_feed.delay(feed)
@task()
def update_feed(feed):
try:
feed.fetch_articles()
except socket.error, exc:
update_feed.retry(args=[feed], exc=exc)
¿Tal vez hay una manera más sofisticada / robusta de lograr el mismo resultado utilizando algún mecanismo de apio que eché de menos?
De la documentación oficial: Asegurar que una tarea se ejecute una a la vez .
El uso de https://pypi.python.org/pypi/celery_once parece hacer muy bien el trabajo, incluidos los errores de informe y las pruebas en relación con algunos parámetros de exclusividad.
Puedes hacer cosas como:
from celery_once import QueueOnce
from myapp.celery import app
from time import sleep
@app.task(base=QueueOnce, once=dict(keys=(''customer_id'',)))
def start_billing(customer_id, year, month):
sleep(30)
return "Done!"
que solo necesita la siguiente configuración en tu proyecto:
ONCE_REDIS_URL = ''redis://localhost:6379/0''
ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale
Esta solución para el trabajo de apio en un solo host con concurencia mayor 1. Otros tipos (sin dependencias como redis) de bloqueos diferencia basada en archivos no funcionan con concurrencia mayor 1.
class Lock(object):
def __init__(self, filename):
self.f = open(filename, ''w'')
def __enter__(self):
try:
flock(self.f.fileno(), LOCK_EX | LOCK_NB)
return True
except IOError:
pass
return False
def __exit__(self, *args):
self.f.close()
class SinglePeriodicTask(PeriodicTask):
abstract = True
run_every = timedelta(seconds=1)
def __call__(self, *args, **kwargs):
lock_filename = join(''/tmp'',
md5(self.name).hexdigest())
with Lock(lock_filename) as is_locked:
if is_locked:
super(SinglePeriodicTask, self).__call__(*args, **kwargs)
else:
print ''already working''
class SearchTask(SinglePeriodicTask):
restart_delay = timedelta(seconds=60)
def run(self, *args, **kwargs):
print self.name, ''start'', datetime.now()
sleep(5)
print self.name, ''end'', datetime.now()
Según la respuesta de MattH, podrías usar un decorador como este:
def single_instance_task(timeout):
def task_exc(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock_id = "celery-single-instance-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
func(*args, **kwargs)
finally:
release_lock()
return wrapper
return task_exc
luego, úsalo así ...
@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
yada yada...
Si está buscando un ejemplo que no use Django, loose-bits.com/2010/10/distributed-task-locking-in-celery.html (advertencia: usa Redis, que ya estaba usando).
El código del decorador es el siguiente (crédito completo para el autor del artículo, ve a leerlo)
import redis
REDIS_CLIENT = redis.Redis()
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = REDIS_CLIENT.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=False)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return _caller
return _dec(function) if function is not None else _dec