simple run results beat app django celery django-celery

run - django-celery-results



La recuperación de la tarea falló más allá de max_retries (4)

Con la versión 2.3.2 de Celery, este enfoque me ha funcionado bien:

class MyTask(celery.task.Task): abstract = True def after_return(self, status, retval, task_id, args, kwargs, einfo): if self.max_retries == self.request.retries: #If max retries is equal to task retries do something @task(base=MyTask, default_retry_delay=5, max_retries=10) def request(xml): #Your stuff here

Estoy intentando consumir de forma asíncrona un servicio web porque tarda hasta 45 segundos en volver. Desafortunadamente, este servicio web tampoco es confiable y puede generar errores. He configurado django-celery y tengo mis tareas en ejecución, lo que funciona bien hasta que la tarea falla más allá de max_retries .

Aquí está lo que tengo hasta ahora:

@task(default_retry_delay=5, max_retries=10) def request(xml): try: server = Client(''https://www.whatever.net/RealTimeService.asmx?wsdl'') xml = server.service.RunRealTimeXML( username=settings.WS_USERNAME, password=settings.WS_PASSWORD, xml=xml ) except Exception, e: result = Result(celery_id=request.request.id, details=e.reason, status="i") result.save() try: return request.retry(exc=e) except MaxRetriesExceededError, e: result = Result(celery_id=request.request.id, details="Max Retries Exceeded", status="f") result.save() raise result = Result(celery_id=request.request.id, details=xml, status="s") result.save() return result

Desafortunadamente, MaxRetriesExceededError no se está MaxRetriesExceededError retry() , por lo que no estoy seguro de cómo manejar la falla de esta tarea. Django ya ha devuelto HTML al cliente, y estoy verificando el contenido de Result través de AJAX, que nunca llega a un estado de falla total.

Entonces, la pregunta es: ¿Cómo puedo actualizar mi base de datos cuando la tarea de apio ha excedido max_retries ?


El problema es que el apio está intentando volver a elevar la excepción que pasaste cuando llega al límite de reintentos. El código para hacer esta reedición está aquí: https://github.com/celery/celery/blob/v3.1.20/celery/app/task.py#L673-L681

La forma más sencilla de evitar esto es no permitir que el apio maneje sus excepciones:

@task(max_retries=10) def mytask(): try: do_the_thing() except Exception as e: try: mytask.retry() except MaxRetriesExceededError: do_something_to_handle_the_error() logger.exception(e)


Puede anular el método after_return de la clase de tarea de apio, este método se invoca después de la ejecución de la tarea, sea cual sea el estado ret (SUCCESS, FAILED, RETRY)

class MyTask(celery.task.Task) def run(self, xml, **kwargs) #Your stuffs here def after_return(self, status, retval, task_id, args, kwargs, einfo=None): if self.max_retries == int(kwargs[''task_retries'']): #If max retries are equals to task retries do something if status == "FAILURE": #You can do also something if the tasks fail instead of check the retries

http://readthedocs.org/docs/celery/en/latest/reference/celery.task.base.html#celery.task.base.BaseTask.after_return

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return


Solo voy con esto por ahora, me ahorra el trabajo de subclasificar Tarea y es fácil de entender.

# auto-retry with delay as defined below. After that, hook is disabled. @celery.shared_task(bind=True, max_retries=5, default_retry_delay=300) def post_data(self, hook_object_id, url, event, payload): headers = {''Content-type'': ''application/json''} try: r = requests.post(url, data=payload, headers=headers) r.raise_for_status() except requests.exceptions.RequestException as e: if self.request.retries >= self.max_retries: log.warning("Auto-deactivating webhook %s for event %s", hook_object_id, event) Webhook.objects.filter(object_id=hook_object_id).update(active=False) return False raise self.retry(exc=e) return True