python - Cómo crear una tarea condicional en Airflow
condition (2)
Me gustaría crear una tarea condicional en Airflow como se describe en el esquema a continuación. El escenario esperado es el siguiente:
- La tarea 1 se ejecuta
- Si la Tarea 1 tiene éxito, entonces ejecute la Tarea 2a
- De lo contrario, si la tarea 1 falla, ejecute la tarea 2b
- Finalmente ejecutar la tarea 3
Todas las tareas anteriores son SSHExecuteOperator. Supongo que debería usar ShortCircuitOperator y / o XCom para manejar la condición, pero no tengo claro cómo implementar eso. ¿Podría por favor describir la solución?
Airflow tiene un BranchPythonOperator que se puede usar para expresar la dependencia de ramificación de manera más directa.
Los BranchPythonOperator describen su uso:
El BranchPythonOperator es muy parecido al PythonOperator, excepto que espera un python_callable que devuelve un task_id. Se sigue el task_id devuelto, y todas las otras rutas se omiten. El task_id devuelto por la función Python debe estar haciendo referencia a una tarea directamente en sentido descendente desde la tarea BranchPythonOperator.
...
Si desea omitir algunas tareas, tenga en cuenta que no puede tener una ruta vacía, si es así realice una tarea ficticia.
Tienes que usar reglas de disparo de flujo de aire.
Todos los operadores tienen un argumento trigger_rule que define la regla por la cual se activa la tarea generada.
Las posibilidades de la regla de activación:
ALL_SUCCESS = ''all_success''
ALL_FAILED = ''all_failed''
ALL_DONE = ''all_done''
ONE_SUCCESS = ''one_success''
ONE_FAILED = ''one_failed''
DUMMY = ''dummy''
Aquí está la idea para resolver su problema:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id=''task_1'',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id=''conditional_task'',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id=''task_2a'',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id=''task_2b'',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id=''task_3'',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)