python - español - apache airflow
Forma correcta de crear flujos de trabajo dinámicos en Airflow (7)
Problema
¿Hay alguna forma en Airflow para crear un flujo de trabajo de manera que se desconozca el número de tareas B. * hasta que se complete la Tarea A? He examinado subdags pero parece que solo puede funcionar con un conjunto estático de tareas que deben determinarse en la creación de Dag.
¿Dag disparadores funcionan? Y si es así, ¿podría dar un ejemplo?
Tengo un problema en el que es imposible saber la cantidad de tareas B que se necesitarán para calcular la Tarea C hasta que se complete la Tarea A. Cada tarea B. * tomará varias horas para computar y no se puede combinar.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Idea # 1
No me gusta esta solución porque tengo que crear un ExternalTaskSensor de bloqueo y toda la Tarea B. * tardará entre 2 y 24 horas en completarse. Así que no considero que esta sea una solución viable. ¿Seguramente hay una manera más fácil? ¿O Airflow no fue diseñado para esto?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Editar 1:
A partir de ahora, esta pregunta todavía no tiene una gran respuesta . Me han contactado varias personas en busca de una solución.
Así es como lo hice con una solicitud similar sin subdags:
Primero cree un método que devuelva los valores que desee
def values_function():
return values
Luego cree un método que generará los trabajos dinámicamente:
def group(number, **kwargs):
#load the values if needed in the command you plan to execute
dyn_value = "{{ task_instance.xcom_pull(task_ids=''push_func'') }}"
return BashOperator(
task_id=''JOB_NAME_{}''.format(number),
bash_command=''script.sh {} {}''.format(dyn_value, number),
dag=dag)
Y luego combinarlos:
push_func = PythonOperator(
task_id=''push_func'',
provide_context=True,
python_callable=values_function,
dag=dag)
complete = DummyOperator(
task_id=''All_jobs_completed'',
dag=dag)
for i in values_function():
push_func >> group(i) >> complete
Creo que he encontrado una solución más agradable para esto en https://github.com/mastak/airflow_multi_dagrun , que utiliza la puesta en cola simple de DagRuns activando múltiples dagruns, similar a TriggerDagRuns . La mayoría de los créditos van a https://github.com/mastak , aunque tuve que parchear algunos detalles para que funcione con el flujo de aire más reciente.
La solución utiliza un operador personalizado que activa varios DagRuns :
from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
CREATED_DAGRUN_KEY = ''created_dagrun_key''
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None,
*args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
context.update(self.op_kwargs)
session = settings.Session()
created_dr_ids = []
for dro in self.python_callable(*self.op_args, **context):
if not dro:
break
if not isinstance(dro, DagRunOrder):
dro = DagRunOrder(payload=dro)
now = timezone.utcnow()
if dro.run_id is None:
dro.run_id = ''trig__'' + now.isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
execution_date=now,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True,
)
created_dr_ids.append(dr.id)
self.log.info("Created DagRun %s, %s", dr, now)
if created_dr_ids:
session.commit()
context[''ti''].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
else:
self.log.info("No DagRun created")
session.close()
Luego puede enviar varias dagruns desde la función invocable en su PythonOperator, por ejemplo:
from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago
def generate_dag_run(**kwargs):
for i in range(10):
order = DagRunOrder(payload={''my_variable'': i})
yield order
args = {
''start_date'': days_ago(1),
''owner'': ''airflow'',
}
dag = DAG(
dag_id=''simple_trigger'',
max_active_runs=1,
schedule_interval=''@hourly'',
default_args=args,
)
gen_target_dag_run = TriggerMultiDagRunOperator(
task_id=''gen_target_dag_run'',
dag=dag,
trigger_dag_id=''common_target'',
python_callable=generate_dag_run
)
Creé un tenedor con el código en https://github.com/flinz/airflow_multi_dagrun
El gráfico de trabajos no se genera en tiempo de ejecución. Más bien, el gráfico se construye cuando Airflow lo recoge de su carpeta dags. Por lo tanto, no será posible tener un gráfico diferente para el trabajo cada vez que se ejecute. Puede configurar un trabajo para construir un gráfico basado en una consulta en el momento de la carga . Ese gráfico seguirá siendo el mismo para cada ejecución después de eso, lo que probablemente no sea muy útil.
Puede diseñar un gráfico que ejecute diferentes tareas en cada ejecución en función de los resultados de la consulta utilizando un Operador de sucursal.
Lo que he hecho es preconfigurar un conjunto de tareas y luego tomar los resultados de la consulta y distribuirlos entre las tareas. De todos modos, esto probablemente sea mejor porque si su consulta devuelve muchos resultados, probablemente no quiera inundar el planificador con muchas tareas concurrentes de todos modos. Para estar aún más seguro, también utilicé un grupo para garantizar que mi concurrencia no se salga de control con una consulta inesperadamente grande.
"""
- This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask
########################################################################
default_args = {
''owner'': ''airflow'',
''catchup'': False,
''depends_on_past'': False,
''start_date'': datetime(2019, 7, 2, 19, 50, 00),
''email'': [''rotten@''],
''email_on_failure'': True,
''email_on_retry'': False,
''retries'': 0,
''max_active_runs'': 1
}
dag = DAG(''dynamic_tasks_example'', default_args=default_args, schedule_interval=None)
totalBuckets = 5
get_orders_query = """
select
o.id,
o.customer
from
orders o
where
o.created_at >= current_timestamp at time zone ''UTC'' - ''2 days''::interval
and
o.is_test = false
and
o.is_processed = false
"""
###########################################################################################################
# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
return PythonOperator(
task_id=f''order_processing_task_{bucket_number}'',
python_callable=runOrderProcessing,
pool=''order_processing_pool'',
op_kwargs={''task_bucket'': f''order_processing_task_{bucket_number}''},
provide_context=True,
dag=dag
)
# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
orderList = context[''ti''].xcom_pull(task_ids=''get_open_orders'', key=task_bucket)
if orderList is not None:
for order in orderList:
logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
doStuff(**op_kwargs)
# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
myDatabaseHook = PostgresHook(postgres_conn_id=''my_database_conn_id'')
# initialize the task list buckets
tasks = {}
for task_number in range(0, totalBuckets):
tasks[f''order_processing_task_{task_number}''] = []
# populate the task list buckets
# distribute them evenly across the set of buckets
resultCounter = 0
for record in myDatabaseHook.get_records(get_orders_query):
resultCounter += 1
bucket = (resultCounter % totalBuckets)
tasks[f''order_processing_task_{bucket}''].append({''order_id'': str(record[0]), ''customer_id'': str(record[1])})
# push the order lists into xcom
for task in tasks:
if len(tasks[task]) > 0:
logging.info(f''Task {task} has {len(tasks[task])} orders.'')
context[''ti''].xcom_push(key=task, value=tasks[task])
else:
# if we didn''t have enough tasks for every bucket
# don''t bother running that task - remove it from the list
logging.info(f"Task {task} doesn''t have any orders.")
del(tasks[task])
return list(tasks.keys())
###################################################################################################
# this just makes sure that there aren''t any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
task_id=''clean_xcoms'',
mysql_conn_id=''airflow_db'',
sql="delete from xcom where dag_id=''{{ dag.dag_id }}''",
dag=dag)
# Ideally we''d use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don''t try to run them all.
# Unfortunately I couldn''t get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
task_id=''get_orders'',
python_callable=getOpenOrders,
provide_context=True,
dag=dag
)
open_order_task.set_upstream(clean_xcoms)
# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
taskBucket = createOrderProcessingTask(bucketNumber)
taskBucket.set_upstream(get_orders_task)
###################################################################################################
Encontré esta publicación de Medium que es muy similar a esta pregunta. Sin embargo, está lleno de errores tipográficos y no funciona cuando intenté implementarlo.
Mi respuesta a lo anterior es la siguiente:
Si está creando tareas dinámicamente, debe hacerlo iterando sobre algo que no ha sido creado por una tarea ascendente o que puede definirse independientemente de esa tarea. Aprendí que no puede pasar fechas de ejecución u otras variables de flujo de aire a algo fuera de una plantilla (por ejemplo, una tarea) como muchos otros han señalado anteriormente. Ver también esta publicación .
He descubierto una forma de crear flujos de trabajo basados en el resultado de tareas anteriores.
Básicamente, lo que quieres hacer es tener dos subdags con lo siguiente:
-
Xcom inserta una lista (o lo que necesite para crear el flujo de trabajo dinámico más adelante) en el subdag que se ejecuta primero (consulte test1.py
def return_list()
return_listdef return_list()
) - Pase el objeto dag principal como parámetro a su segundo subdag
-
Ahora, si tiene el objeto dag principal, puede usarlo para obtener una lista de sus instancias de tareas.
De esa lista de instancias de tareas, puede filtrar una tarea de la ejecución actual utilizando
parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), probablemente podría agregar más Filtros aquí. -
Con esa instancia de tarea, puede usar xcom pull para obtener el valor que necesita especificando el dag_id al del primer subdag:
dag_id=''%s.%s'' % (parent_dag_name, ''test1'')
- Use la lista / valor para crear sus tareas dinámicamente
Ahora lo he probado en mi instalación de flujo de aire local y funciona bien. No sé si la parte de extracción de xcom tendrá algún problema si hay más de una instancia del dag ejecutándose al mismo tiempo, pero entonces probablemente usaría una clave única o algo así para identificar de forma única el xcom valor que quieres. Probablemente se podría optimizar el paso 3. para estar 100% seguro de obtener una tarea específica del dag principal actual, pero para mi uso esto funciona lo suficientemente bien, creo que solo se necesita un objeto task_instance para usar xcom_pull.
También limpio las xcoms para el primer subdag antes de cada ejecución, solo para asegurarme de que no obtengo ningún valor incorrecto accidentalmente.
Soy bastante malo para explicar, así que espero que el siguiente código aclare todo:
test1.py
from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
log = logging.getLogger(__name__)
def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
''%s.test1'' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)
def return_list():
return [''test1'', ''test2'']
list_extract_folder = PythonOperator(
task_id=''list'',
dag=dag,
python_callable=return_list
)
clean_xcoms = PostgresOperator(
task_id=''clean_xcoms'',
postgres_conn_id=''airflow_db'',
sql="delete from xcom where dag_id=''{{ dag.dag_id }}''",
dag=dag)
clean_xcoms >> list_extract_folder
return dag
test2.py
from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator
log = logging.getLogger(__name__)
def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
''%s.test2'' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)
if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id=''%s.%s'' % (parent_dag_name, ''test1''),
task_ids=''list'')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)
return dag
y el flujo de trabajo principal:
prueba.py
from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2
DAG_NAME = ''test-dag''
dag = DAG(DAG_NAME,
description=''Test workflow'',
catchup=False,
schedule_interval=''0 0 * * *'',
start_date=datetime(2018, 8, 24))
test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id=''test1'',
dag=dag
)
test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id=''test2'',
dag=dag
)
test1 >> test2
OA: "¿Hay alguna forma en Airflow para crear un flujo de trabajo tal que el número de tareas B. * sea desconocido hasta la finalización de la Tarea A?"
La respuesta corta es no. Airflow construirá el flujo DAG antes de comenzar a ejecutarlo.
Dicho esto, llegamos a una conclusión simple, es decir, no tenemos tanta necesidad. Cuando desee hacer un trabajo en paralelo, debe evaluar los recursos que tiene disponibles y no la cantidad de elementos para procesar.
Lo hicimos así: generamos dinámicamente un número fijo de tareas, digamos 10, que dividirán el trabajo. Por ejemplo, si necesitamos procesar 100 archivos, cada tarea procesará 10 de ellos. Publicaré el código más tarde hoy.
Actualizar
Aquí está el código, perdón por el retraso.
from datetime import datetime, timedelta
import airflow
from airflow.operators.dummy_operator import DummyOperator
args = {
''owner'': ''airflow'',
''depends_on_past'': False,
''start_date'': datetime(2018, 1, 8),
''email'': [''[email protected]''],
''email_on_failure'': True,
''email_on_retry'': True,
''retries'': 1,
''retry_delay'': timedelta(seconds=5)
}
dag = airflow.DAG(
''parallel_tasks_v1'',
schedule_interval="@daily",
catchup=False,
default_args=args)
# You can read this from variables
parallel_tasks_total_number = 10
start_task = DummyOperator(
task_id=''start_task'',
dag=dag
)
# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
return DummyOperator(
provide_context=True,
task_id=''parallel_task_'' + str(current_task_number),
python_callable=parallelTask,
# your task will take as input the total number and the current number to elaborate a chunk of total elements
op_args=[current_task_number, int(parallel_tasks_total_number)],
dag=dag)
end = DummyOperator(
task_id=''end'',
dag=dag)
for page in range(int(parallel_tasks_total_number)):
created_task = create_dynamic_task(page)
start_task >> created_task
created_task >> end
Explicación del código:
Aquí tenemos una única tarea inicial y una única tarea final (ambas ficticias).
Luego, desde la tarea de inicio con el bucle for, creamos 10 tareas con el mismo python invocable. Las tareas se crean en la función create_dynamic_task.
A cada python invocable pasamos como argumentos el número total de tareas paralelas y el índice de tareas actual.
Supongamos que tiene 1000 elementos para elaborar: la primera tarea recibirá en la entrada que debe elaborar el primer fragmento de 10 fragmentos. Dividirá los 1000 artículos en 10 trozos y elaborará el primero.
Sí, esto es posible. He creado un ejemplo de DAG que lo demuestra.
import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
main_dag_id = ''DynamicWorkflow2''
args = {
''owner'': ''airflow'',
''start_date'': airflow.utils.dates.days_ago(2),
''provide_context'': True
}
dag = DAG(
main_dag_id,
schedule_interval="@once",
default_args=args)
def start(*args, **kwargs):
value = Variable.get("DynamicWorkflow_Group1")
logging.info("Current DynamicWorkflow_Group1 value is " + str(value))
def resetTasksStatus(task_id, execution_date):
logging.info("Resetting: " + task_id + " " + execution_date)
dag_folder = conf.get(''core'', ''DAGS_FOLDER'')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[main_dag_id]
session = settings.Session()
my_task = check_dag.get_task(task_id)
ti = TaskInstance(my_task, execution_date)
state = ti.current_state()
logging.info("Current state of " + task_id + " is " + str(state))
ti.set_state(None, session)
state = ti.current_state()
logging.info("Updated state of " + task_id + " is " + str(state))
def bridge1(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 2
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
os.system(''airflow variables --set DynamicWorkflow_Group2 '' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus(''secondGroup_'' + str(i), str(kwargs[''execution_date'']))
def bridge2(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 3
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
os.system(''airflow variables --set DynamicWorkflow_Group3 '' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus(''thirdGroup_'' + str(i), str(kwargs[''execution_date'']))
def end(*args, **kwargs):
logging.info("Ending")
def doSomeWork(name, index, *args, **kwargs):
# Do whatever work you need to do
# Here I will just create a new file
os.system(''touch /home/ec2-user/airflow/'' + str(name) + str(index) + ''.txt'')
starting_task = PythonOperator(
task_id=''start'',
dag=dag,
provide_context=True,
python_callable=start,
op_args=[])
# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
task_id=''bridge1'',
dag=dag,
provide_context=True,
python_callable=bridge1,
op_args=[])
DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
for index in range(int(DynamicWorkflow_Group1)):
dynamicTask = PythonOperator(
task_id=''firstGroup_'' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=[''firstGroup'', index])
starting_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge1_task)
# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
task_id=''bridge2'',
dag=dag,
provide_context=True,
python_callable=bridge2,
op_args=[])
DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))
for index in range(int(DynamicWorkflow_Group2)):
dynamicTask = PythonOperator(
task_id=''secondGroup_'' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=[''secondGroup'', index])
bridge1_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge2_task)
ending_task = PythonOperator(
task_id=''end'',
dag=dag,
provide_context=True,
python_callable=end,
op_args=[])
DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))
for index in range(int(DynamicWorkflow_Group3)):
# You can make this logic anything you''d like
# I chose to use the PythonOperator for all tasks
# except the last task will use the BashOperator
if index < (int(DynamicWorkflow_Group3) - 1):
dynamicTask = PythonOperator(
task_id=''thirdGroup_'' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=[''thirdGroup'', index])
else:
dynamicTask = BashOperator(
task_id=''thirdGroup_'' + str(index),
bash_command=''touch /home/ec2-user/airflow/thirdGroup_'' + str(index) + ''.txt'',
dag=dag)
bridge2_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(ending_task)
# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)
Antes de ejecutar el DAG, cree estas tres variables de flujo de aire
airflow variables --set DynamicWorkflow_Group1 1
airflow variables --set DynamicWorkflow_Group2 0
airflow variables --set DynamicWorkflow_Group3 0
Verás que el DAG va de esto
A esto después de que se ejecutó
Puede ver más información sobre este DAG en mi artículo sobre cómo crear linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine .