tasks schedulers periodic beat_schedule beat app add_periodic_task python celery django-celery celery-task celerybeat

python - schedulers - django beat_schedule



Apio detener la ejecuciĆ³n de una cadena. (3)

Tengo una tarea check_orders que se ejecuta periódicamente. Hace un grupo de tareas para que pueda medir el tiempo de ejecución de las tareas, y realizar algo cuando están terminadas (este es el propósito de res.join [1] y grouped_subs) Las tareas que se agrupan son pares de Tareas encadenadas.

Lo que quiero es para cuando la primera tarea no cumpla con una condición (falla) no ejecute la segunda tarea en la cadena. No puedo entender esto por mi vida y siento que esta es una funcionalidad bastante básica para un administrador de colas de trabajo. Cuando intento las cosas que he comentado después de [2] (generar excepciones, eliminar las devoluciones de llamada) ... nos quedamos atascados en la unión () en check_orders por alguna razón (rompe el grupo). He intentado establecer ignore_result en False también para todas estas tareas, pero aún no funciona.

@task(ignore_result=True) def check_orders(): # check all the orders and send out appropriate notifications grouped_subs = [] for thingy in things: ... grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), notify.subtask((args_sub_2, ), immutable=True))) res = group(grouped_subs).apply_async() res.join() #[1] logger.info(''Done checking orders at %s'' % current_task.request.id)) @task(ignore_result=True) def is_room_open(args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: # [2] # STOP THE CHAIN SOMEHOW! Don''t execute the rest of the chain, how? # None of the following things work: # is_room_open.update_state(state=''FAILURE'') # raise celery.exceptions.Ignore() # raise Exception(''spam'', ''eggs'') # current_task.request.callbacks[:] = [] @task(ignore_result=True) def notify(args_sub_2): # something else time consuming, only do this if the first part of the chain # passed a test (the chained tasks before this were ''successful'' notify_user(args_sub_2)


En mi opinión, este es un caso de uso común que no recibe suficiente cariño en la documentación.

Suponiendo que desea abortar una cadena a mitad de camino mientras sigue reportando SUCCESS como estado de las tareas completadas, y no envía ningún registro de errores o lo que sea (de lo contrario, solo puede generar una excepción), una forma de lograrlo es:

@app.task(bind=True) # Note that we need bind=True for self to work def task1(self, other_args): #do_stuff if end_chain: self.request.callbacks = None return #Other stuff to do if end_chain is False

Así que en tu ejemplo:

@app.task(ignore_result=True, bind=True) def is_room_open(self, args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: self.request.callbacks = None

Trabajará. Tenga en cuenta que en lugar de ignore_result=True y subtask() puede usar el acceso directo .si() como se indica en @ abbasov-alexander

Editado para trabajar con el modo EAGER, como lo sugiere @PhilipGarnero en los comentarios.


En primer lugar, parece que si en la función existe la excepción ignore_result no te ayuda.

En segundo lugar, utiliza immutable = True. Significa que la siguiente función (en nuestro caso es notificar ) no tiene argumentos adicionales. notify.subtask((args_sub_2, ), immutable=False) debe usar notify.subtask((args_sub_2, ), immutable=False) si es adecuado para su decisión.

Tercero, puedes usar atajos:

notify.si(args_sub_2) lugar de notify.subtask((args_sub_2, ), immutable=True)

y

is_room_open.s(args_sub_1) en is_room_open.subtask((args_sub_1, )) lugar is_room_open.subtask((args_sub_1, ))

Intenta usarlo codigo:

@task def check_orders(): # check all the orders and send out appropriate notifications grouped_subs = [] for thingy in things: ... grouped_subs.append(chain(is_room_open.s(args_sub_1), notify.s(args_sub_2))) res = group(grouped_subs).apply_async() res.join() #[1] logger.info(''Done checking orders at %s'' % current_task.request.id)) @task def is_room_open(args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: # [2] # STOP THE CHAIN SOMEHOW! Don''t execute the rest of the chain, how? # None of the following things work: # is_room_open.update_state(state=''FAILURE'') # raise celery.exceptions.Ignore() # raise Exception(''spam'', ''eggs'') # current_task.request.callbacks[:] = [] return False @task def notify(result, args_sub_2): if result: # something else time consuming, only do this if the first part of the chain # passed a test (the chained tasks before this were ''successful'' notify_user(args_sub_2) return True return False

Si desea excepciones de captura, debe utilizar la devolución de llamada como tal

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery @celery.task def log_error(task_id): result = celery.AsyncResult(task_id) result.get(propagate=False) # make sure result written. with open(os.path.join(''/var/errors'', task_id), ''a'') as fh: fh.write(''--/n/n%s %s %s'' % ( task_id, result.result, result.traceback))


Es increíble que un caso tan común no se trate en ninguna documentación oficial. Tuve que hacer frente al mismo problema (pero al usar shared_tasks con la opción de bind , por lo que tenemos visibilidad del objeto self ), escribí un decorador personalizado que maneja automáticamente la revocación:

def revoke_chain_authority(a_shared_task): """ @see: https://gist.github.com/bloudermilk/2173940 @param a_shared_task: a @shared_task(bind=True) celery function. @return: """ @wraps(a_shared_task) def inner(self, *args, **kwargs): try: return a_shared_task(self, *args, **kwargs) except RevokeChainRequested, e: # Drop subsequent tasks in chain (if not EAGER mode) if self.request.callbacks: self.request.callbacks[:] = [] return e.return_value return inner

Puedes usarlo de la siguiente manera:

@shared_task(bind=True) @revoke_chain_authority def apply_fetching_decision(self, latitude, longitude): #... if condition: raise RevokeChainRequested(False)

Vea la explicación completa here . ¡Espero eso ayude!