español - apache airflow wikipedia
DAG dinámico de flujo de aire e ID de tareas (2)
Desde ¿Cómo puedo crear DAGs dinámicamente? :
Airflow busca en usted [sic] DAGS_FOLDER los módulos que contienen objetos DAG en su espacio de nombres global, y agrega los objetos que encuentra en el DagBag. Sabiendo esto, todo lo que necesitamos es una forma de asignar dinámicamente una variable en el espacio de nombres global, que se realiza fácilmente en python utilizando la función globals () para la biblioteca estándar que se comporta como un diccionario simple.
for i in range(10): dag_id = ''foo_{}''.format(i) globals()[dag_id] = DAG(dag_id) # or better, call a function that returns a DAG object!
Principalmente veo que Airflow se utiliza para trabajos relacionados con ETL / Bid. Estoy intentando usarlo para flujos de trabajo empresariales en los que una acción de usuario desencadena un conjunto de tareas dependientes en el futuro. Es posible que algunas de estas tareas deban borrarse (eliminarse) en función de otras acciones de otros usuarios. Pensé que la mejor manera de manejar esto sería a través de ID de tareas dinámicas. Leí que Airflow es compatible con ID de dag dinámicos. Por lo tanto, creé una secuencia de comandos de Python simple que toma el ID de DAG y el ID de tarea como parámetros de línea de comandos. Sin embargo, estoy teniendo problemas para que funcione. Da dag_id error no encontrado. ¿Alguien ha probado esto? Aquí está el código para el script (llámelo tmp.py) que ejecuto en la línea de comandos como python (python tmp.py 820 2016-08-24T22: 50: 00):
from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = ''2016-08-24T22:20:00''
if len(sys.argv) > 2 :
dagid = sys.argv[1]
taskid = ''Activate'' + sys.argv[1]
execution = sys.argv[2]
else:
dagid = ''DAGObjectId''
taskid = ''Activate''
default_args = {''owner'' : ''airflow'', ''depends_on_past'': False, ''start_date'':date.today(), ''email'': [''[email protected]''], ''email_on_failure'': False, ''email_on_retry'': False, ''retries'': 1}
dag = DAG(dag_id = dagid,
default_args=default_args,
schedule_interval=''@once'',
)
globals()[dagid] = dag
task1 = BashOperator(
task_id = taskid,
bash_command=''ls -l'',
dag=dag)
fakeTask = BashOperator(
task_id = ''fakeTask'',
bash_command=''sleep 5'',
retries = 3,
dag=dag)
task1.set_upstream(fakeTask)
airflowcmd = "airflow run " + dagid + " " + taskid + " " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)
Después de numerosas pruebas y errores, pude resolver esto. Con suerte, ayudará a alguien. Así es como funciona: debe tener un iterador o una fuente externa (tabla de archivo / base de datos) para generar dags / tarea dinámicamente a través de una plantilla. Puede mantener los nombres de tareas y dag estáticos, simplemente asigne los ID dinámicamente para diferenciar un dag de otro. Pones este script de python en la carpeta dags. Cuando inicia el programador de flujo de aire, se ejecuta a través de este script en cada latido del corazón y escribe los DAG en la tabla dag en la base de datos. Si ya se ha escrito un dag (id de dag único), simplemente lo omitirá. El programador también mira el programa de los DAG individuales para determinar cuál está listo para la ejecución. Si un DAG está listo para su ejecución, lo ejecuta y actualiza su estado. Aquí hay un código de ejemplo:
from airflow.operators import PythonOperator
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import sys
import time
dagid = ''DA'' + str(int(time.time()))
taskid = ''TA'' + str(int(time.time()))
input_file = ''/home/directory/airflow/textfile_for_dagids_and_schedule''
def my_sleeping_function(random_base):
''''''This is a function that will run within the DAG execution''''''
time.sleep(random_base)
def_args = {
''owner'': ''airflow'',
''depends_on_past'': False,
''start_date'': datetime.now(), ''email_on_failure'': False,
''retries'': 1, ''retry_delay'': timedelta(minutes=2)
}
with open(input_file,''r'') as f:
for line in f:
args = line.strip().split('','')
if len(args) < 6:
continue
dagid = ''DAA'' + args[0]
taskid = ''TAA'' + args[0]
yyyy = int(args[1])
mm = int(args[2])
dd = int(args[3])
hh = int(args[4])
mins = int(args[5])
ss = int(args[6])
dag = DAG(
dag_id=dagid, default_args=def_args,
schedule_interval=''@once'', start_date=datetime(yyyy,mm,dd,hh,mins,ss)
)
myBashTask = BashOperator(
task_id=taskid,
bash_command=''python /home/directory/airflow/sendemail.py'',
dag=dag)
task2id = taskid + ''-X''
task_sleep = PythonOperator(
task_id=task2id,
python_callable=my_sleeping_function,
op_kwargs={''random_base'': 10},
dag=dag)
task_sleep.set_upstream(myBashTask)
f.close()