mochilas - ¿Cómo funciona exactamente el subDAG en Airflow? ¿Qué significa que un subDAG esté habilitado?
airflow scheduler (1)
Usé tu código localmente y funciona bien.
Las únicas cosas que cambié, fueron la configuración del dag externo y del dag secundario para que tengan schedule_interval = None y las activaran manualmente.
Tener una fecha de inicio de datetime (2016, 04, 20) y schedule_interval de 5 minutos inundará el programador de flujo de aire con muchas solicitudes de reposición .
Es posible que deba cambiar del uso de LocalExecutor a CeleryExecutor. LocalExecutor es bastante limitado.
Aquí está la salida del último paso en la subetiqueta:
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: {
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: echo "226"
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: echo "airflow_home = /root/airflow/"
[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask: }
He mirado la sección SubDAG de Flujo de aire y he intentado encontrar cualquier otra cosa en línea que pudiera ser útil, sin embargo, no he encontrado nada que explique en detalle cómo hacer funcionar un subDAG. Uno de los requisitos para que se ejecute un subDAG es que debe estar habilitado. ¿Cómo habilitar / deshabilitar una subetiqueta?
Escribí un código de muestra que no muestra ningún error en el flujo de aire, sin embargo, cuando intento ejecutarlo, ninguno de los operadores en el subDAG se ejecuta.
Este es mi código dag principal:
import os
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.operators.subdag_operator import SubDagOperator
from linecount_subdag import sub_dag
parent_dag_name = ''example_linecount_dag''
child_dag_name = ''example_linecount_subdag''
args = {
''owner'': ''airflow'',
''start_date'': datetime(2016, 04, 20),
''retries'': 0,
}
main_dag = DAG(
dag_id=parent_dag_name,
default_args=args,
schedule_interval=timedelta(minutes=5),
start_date=datetime(2016, 04, 20),
max_active_runs=1
)
subdag = SubDagOperator(
subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval),
task_id=child_dag_name,
default_args=args,
dag=main_dag)
t = BashOperator(
task_id=''start'',
bash_command=''echo "waiting for subdag..."'',
default_args=args,
dag=main_dag)
t.set_downstream(subdag)
En este código, la tarea ''inicio'' tiene éxito, sin embargo, la tarea de subdivisión no hace nada y ni falla ni tiene éxito.
Aquí está mi código subDAG:
from airflow.models import DAG
from airflow.operators import BashOperator
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
dag = DAG(
''%s.%s'' % (parent_dag_name, child_dag_name),
default_args=args,
start_date=args[''start_date''],
max_active_runs=1,
)
t1 = BashOperator(
task_id=''count_lines'',
bash_command=''cat /root/airflow/airflow.cfg | wc -l'',
default_args=args,
xcom_push=True,
dag=dag)
t2 = BashOperator(
task_id=''retrieve_val'',
bash_command=''grep "airflow_home" /root/airflow/airflow.cfg'',
default_args=args,
xcom_push=True,
dag=dag)
templated_command = """
{
echo "{{ ti.xcom_pull(task_ids=''count_lines'') }}"
echo "{{ ti.xcom_pull(task_ids=''retrieve_val'') }}"
}"""
t3 = BashOperator(
task_id=''print_values'',
bash_command=templated_command,
default_args=args,
dag=dag)
t3.set_upstream(t1)
t3.set_upstream(t2)
return dag
Los 3 operadores en este código obtienen el número de líneas del archivo "airflow.cfg", encuentran el valor de "airflow_home" en ese archivo y devuelven ambos valores para imprimir. Este código funciona por sí solo, así que no creo que sea el problema.
¿Qué tengo que cambiar para que el subDAG ejecute sus operadores?