tasks run beat python celery django-celery

python - run - pip install celery



Apio-encadenamiento a grupos y subtareas.-> ejecuciĆ³n fuera de orden (2)

Cuando tengo algo como lo siguiente

group1 = group(task1.si(), task1.si(), task1.si()) group2 = group(task2.si(), task2.si(), task2.si()) workflow = chain(group1, group2, task3.si())

La interpretación intuitiva es que task3 solo debe ejecutarse después de que todas las tareas del grupo 2 hayan finalizado.

En realidad, la tarea 3 se ejecuta mientras que el grupo 1 ha comenzado pero aún no se ha completado.

¿Qué estoy haciendo mal?


Así que resulta que, en el apio no se pueden encadenar dos grupos.
Sospecho que esto se debe a que los grupos encadenados con tareas se convierten automáticamente en un acorde
-> Documentos de apio: http://docs.celeryproject.org/en/latest/userguide/canvas.html

Al encadenar un grupo con otra tarea, se actualizará automáticamente para que sea un acorde:

Los grupos devuelven una tarea principal. Al encadenar dos grupos, sospecho que cuando se completa el primer grupo, el acorde inicia la "tarea" de devolución de llamada. Sospecho que esta "tarea" es en realidad la "tarea principal" del segundo grupo. También sospecho que esta tarea principal se completa tan pronto como termina de iniciar todas las subtareas dentro del grupo y, como resultado, el siguiente elemento después de que se ejecuta el segundo grupo.

Para demostrar esto aquí hay un código de ejemplo. Tendrá que tener ya una instancia de apio en ejecución.

# celery_experiment.py from celery import task, group, chain, chord from celery.signals import task_sent, task_postrun, task_prerun import time import logging import random random.seed() logging.basicConfig(level=logging.DEBUG) ### HANDLERS ### @task_prerun.connect() def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds): try: logging.info(''[%s] starting'' % kwargs[''id'']) except KeyError: pass @task_postrun.connect() def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds): try: logging.info(''[%s] finished'' % kwargs[''id'']) except KeyError: pass def random_sleep(id): slp = random.randint(1, 3) logging.info(''[%s] sleep for %ssecs'' % (id, slp)) time.sleep(slp) @task() def thing(id): logging.info(''[%s] begin'' % id) random_sleep(id) logging.info(''[%s] end'' % id) def exec_exp(): st = thing.si(id=''st'') st_arr = [thing.si(id=''st_arr1_a''), thing.si(id=''st_arr1_b''), thing.si(id=''st_arr1_c''),] st_arr2 = [thing.si(id=''st_arr2_a''), thing.si(id=''st_arr2_b''),] st2 = thing.si(id=''st2'') st3 = thing.si(id=''st3'') st4 = thing.si(id=''st4'') grp1 = group(st_arr) grp2 = group(st_arr2) # chn can chain two groups together because they are seperated by a single subtask chn = (st | grp1 | st2 | grp2 | st3 | st4) # in chn2 you can''t chain two groups together. what will happen is st3 will start before grp2 finishes #chn2 = (st | st2 | grp1 | grp2 | st3 | st4) r = chn() #r2 = chn2()


Tengo el mismo problema con el apio, tratando de tener un flujo de trabajo donde el primer paso es "generar un millón de tareas". Intenté grupos de grupos, subtareas y, finalmente, mi step2 se inicia antes de que el step1 termine.

En pocas palabras, podría haber encontrado una solución con el uso de acordes y un terminador tonto:

@celery.task def chordfinisher( *args, **kwargs ): return "OK"

No hacer mucho, pero me permite hacer esto:

tasks = [] for id in ids: tasks.append( mytask.si( id ) ) step1 = chord( group( tasks ), chordfinisher.si() ) step2 = ... workflow = chain( step1, step2 )

Originalmente quería tener el paso 1 en una subtarea, pero por el mismo motivo que se sospechaba, la acción de llamar a un grupo finaliza, la tarea se considera finalizada y mi flujo de trabajo continúa ...

Si alguien tiene algo mejor, me interesa!