tutorial español python workflow airflow

python - español - apache airflow



Forma correcta de crear flujos de trabajo dinámicos en Airflow (7)

Problema

¿Hay alguna forma en Airflow para crear un flujo de trabajo de manera que se desconozca el número de tareas B. * hasta que se complete la Tarea A? He examinado subdags pero parece que solo puede funcionar con un conjunto estático de tareas que deben determinarse en la creación de Dag.

¿Dag disparadores funcionan? Y si es así, ¿podría dar un ejemplo?

Tengo un problema en el que es imposible saber la cantidad de tareas B que se necesitarán para calcular la Tarea C hasta que se complete la Tarea A. Cada tarea B. * tomará varias horas para computar y no se puede combinar.

|---> Task B.1 --| |---> Task B.2 --| Task A ------|---> Task B.3 --|-----> Task C | .... | |---> Task B.N --|

Idea # 1

No me gusta esta solución porque tengo que crear un ExternalTaskSensor de bloqueo y toda la Tarea B. * tardará entre 2 y 24 horas en completarse. Así que no considero que esta sea una solución viable. ¿Seguramente hay una manera más fácil? ¿O Airflow no fue diseñado para esto?

Dag 1 Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator) |-- Task B.1 --| |-- Task B.2 --| Task Dummy A --|-- Task B.3 --|-----> Task Dummy B | .... | |-- Task B.N --|

Editar 1:

A partir de ahora, esta pregunta todavía no tiene una gran respuesta . Me han contactado varias personas en busca de una solución.


Así es como lo hice con una solicitud similar sin subdags:

Primero cree un método que devuelva los valores que desee

def values_function(): return values

Luego cree un método que generará los trabajos dinámicamente:

def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = "{{ task_instance.xcom_pull(task_ids=''push_func'') }}" return BashOperator( task_id=''JOB_NAME_{}''.format(number), bash_command=''script.sh {} {}''.format(dyn_value, number), dag=dag)

Y luego combinarlos:

push_func = PythonOperator( task_id=''push_func'', provide_context=True, python_callable=values_function, dag=dag) complete = DummyOperator( task_id=''All_jobs_completed'', dag=dag) for i in values_function(): push_func >> group(i) >> complete


Creo que he encontrado una solución más agradable para esto en https://github.com/mastak/airflow_multi_dagrun , que utiliza la puesta en cola simple de DagRuns activando múltiples dagruns, similar a TriggerDagRuns . La mayoría de los créditos van a https://github.com/mastak , aunque tuve que parchear algunos detalles para que funcione con el flujo de aire más reciente.

La solución utiliza un operador personalizado que activa varios DagRuns :

from airflow import settings from airflow.models import DagBag from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from airflow.utils import timezone class TriggerMultiDagRunOperator(TriggerDagRunOperator): CREATED_DAGRUN_KEY = ''created_dagrun_key'' @apply_defaults def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs): super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs) self.op_args = op_args or [] self.op_kwargs = op_kwargs or {} def execute(self, context): context.update(self.op_kwargs) session = settings.Session() created_dr_ids = [] for dro in self.python_callable(*self.op_args, **context): if not dro: break if not isinstance(dro, DagRunOrder): dro = DagRunOrder(payload=dro) now = timezone.utcnow() if dro.run_id is None: dro.run_id = ''trig__'' + now.isoformat() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id=dro.run_id, execution_date=now, state=State.RUNNING, conf=dro.payload, external_trigger=True, ) created_dr_ids.append(dr.id) self.log.info("Created DagRun %s, %s", dr, now) if created_dr_ids: session.commit() context[''ti''].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids) else: self.log.info("No DagRun created") session.close()

Luego puede enviar varias dagruns desde la función invocable en su PythonOperator, por ejemplo:

from airflow.operators.dagrun_operator import DagRunOrder from airflow.models import DAG from airflow.operators import TriggerMultiDagRunOperator from airflow.utils.dates import days_ago def generate_dag_run(**kwargs): for i in range(10): order = DagRunOrder(payload={''my_variable'': i}) yield order args = { ''start_date'': days_ago(1), ''owner'': ''airflow'', } dag = DAG( dag_id=''simple_trigger'', max_active_runs=1, schedule_interval=''@hourly'', default_args=args, ) gen_target_dag_run = TriggerMultiDagRunOperator( task_id=''gen_target_dag_run'', dag=dag, trigger_dag_id=''common_target'', python_callable=generate_dag_run )

Creé un tenedor con el código en https://github.com/flinz/airflow_multi_dagrun


El gráfico de trabajos no se genera en tiempo de ejecución. Más bien, el gráfico se construye cuando Airflow lo recoge de su carpeta dags. Por lo tanto, no será posible tener un gráfico diferente para el trabajo cada vez que se ejecute. Puede configurar un trabajo para construir un gráfico basado en una consulta en el momento de la carga . Ese gráfico seguirá siendo el mismo para cada ejecución después de eso, lo que probablemente no sea muy útil.

Puede diseñar un gráfico que ejecute diferentes tareas en cada ejecución en función de los resultados de la consulta utilizando un Operador de sucursal.

Lo que he hecho es preconfigurar un conjunto de tareas y luego tomar los resultados de la consulta y distribuirlos entre las tareas. De todos modos, esto probablemente sea mejor porque si su consulta devuelve muchos resultados, probablemente no quiera inundar el planificador con muchas tareas concurrentes de todos modos. Para estar aún más seguro, también utilicé un grupo para garantizar que mi concurrencia no se salga de control con una consulta inesperadamente grande.

""" - This is an idea for how to invoke multiple tasks based on the query results """ import logging from datetime import datetime from airflow import DAG from airflow.hooks.postgres_hook import PostgresHook from airflow.operators.mysql_operator import MySqlOperator from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from include.run_celery_task import runCeleryTask ######################################################################## default_args = { ''owner'': ''airflow'', ''catchup'': False, ''depends_on_past'': False, ''start_date'': datetime(2019, 7, 2, 19, 50, 00), ''email'': [''rotten@''], ''email_on_failure'': True, ''email_on_retry'': False, ''retries'': 0, ''max_active_runs'': 1 } dag = DAG(''dynamic_tasks_example'', default_args=default_args, schedule_interval=None) totalBuckets = 5 get_orders_query = """ select o.id, o.customer from orders o where o.created_at >= current_timestamp at time zone ''UTC'' - ''2 days''::interval and o.is_test = false and o.is_processed = false """ ########################################################################################################### # Generate a set of tasks so we can parallelize the results def createOrderProcessingTask(bucket_number): return PythonOperator( task_id=f''order_processing_task_{bucket_number}'', python_callable=runOrderProcessing, pool=''order_processing_pool'', op_kwargs={''task_bucket'': f''order_processing_task_{bucket_number}''}, provide_context=True, dag=dag ) # Fetch the order arguments from xcom and doStuff() to them def runOrderProcessing(task_bucket, **context): orderList = context[''ti''].xcom_pull(task_ids=''get_open_orders'', key=task_bucket) if orderList is not None: for order in orderList: logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}") doStuff(**op_kwargs) # Discover the orders we need to run and group them into buckets for processing def getOpenOrders(**context): myDatabaseHook = PostgresHook(postgres_conn_id=''my_database_conn_id'') # initialize the task list buckets tasks = {} for task_number in range(0, totalBuckets): tasks[f''order_processing_task_{task_number}''] = [] # populate the task list buckets # distribute them evenly across the set of buckets resultCounter = 0 for record in myDatabaseHook.get_records(get_orders_query): resultCounter += 1 bucket = (resultCounter % totalBuckets) tasks[f''order_processing_task_{bucket}''].append({''order_id'': str(record[0]), ''customer_id'': str(record[1])}) # push the order lists into xcom for task in tasks: if len(tasks[task]) > 0: logging.info(f''Task {task} has {len(tasks[task])} orders.'') context[''ti''].xcom_push(key=task, value=tasks[task]) else: # if we didn''t have enough tasks for every bucket # don''t bother running that task - remove it from the list logging.info(f"Task {task} doesn''t have any orders.") del(tasks[task]) return list(tasks.keys()) ################################################################################################### # this just makes sure that there aren''t any dangling xcom values in the database from a crashed dag clean_xcoms = MySqlOperator( task_id=''clean_xcoms'', mysql_conn_id=''airflow_db'', sql="delete from xcom where dag_id=''{{ dag.dag_id }}''", dag=dag) # Ideally we''d use BranchPythonOperator() here instead of PythonOperator so that if our # query returns fewer results than we have buckets, we don''t try to run them all. # Unfortunately I couldn''t get BranchPythonOperator to take a list of results like the # documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now. get_orders_task = PythonOperator( task_id=''get_orders'', python_callable=getOpenOrders, provide_context=True, dag=dag ) open_order_task.set_upstream(clean_xcoms) # set up the parallel tasks -- these are configured at compile time, not at run time: for bucketNumber in range(0, totalBuckets): taskBucket = createOrderProcessingTask(bucketNumber) taskBucket.set_upstream(get_orders_task) ###################################################################################################


Encontré esta publicación de Medium que es muy similar a esta pregunta. Sin embargo, está lleno de errores tipográficos y no funciona cuando intenté implementarlo.

Mi respuesta a lo anterior es la siguiente:

Si está creando tareas dinámicamente, debe hacerlo iterando sobre algo que no ha sido creado por una tarea ascendente o que puede definirse independientemente de esa tarea. Aprendí que no puede pasar fechas de ejecución u otras variables de flujo de aire a algo fuera de una plantilla (por ejemplo, una tarea) como muchos otros han señalado anteriormente. Ver también esta publicación .


He descubierto una forma de crear flujos de trabajo basados ​​en el resultado de tareas anteriores.
Básicamente, lo que quieres hacer es tener dos subdags con lo siguiente:

  1. Xcom inserta una lista (o lo que necesite para crear el flujo de trabajo dinámico más adelante) en el subdag que se ejecuta primero (consulte test1.py def return_list() return_list def return_list() )
  2. Pase el objeto dag principal como parámetro a su segundo subdag
  3. Ahora, si tiene el objeto dag principal, puede usarlo para obtener una lista de sus instancias de tareas. De esa lista de instancias de tareas, puede filtrar una tarea de la ejecución actual utilizando parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1] ), probablemente podría agregar más Filtros aquí.
  4. Con esa instancia de tarea, puede usar xcom pull para obtener el valor que necesita especificando el dag_id al del primer subdag: dag_id=''%s.%s'' % (parent_dag_name, ''test1'')
  5. Use la lista / valor para crear sus tareas dinámicamente

Ahora lo he probado en mi instalación de flujo de aire local y funciona bien. No sé si la parte de extracción de xcom tendrá algún problema si hay más de una instancia del dag ejecutándose al mismo tiempo, pero entonces probablemente usaría una clave única o algo así para identificar de forma única el xcom valor que quieres. Probablemente se podría optimizar el paso 3. para estar 100% seguro de obtener una tarea específica del dag principal actual, pero para mi uso esto funciona lo suficientemente bien, creo que solo se necesita un objeto task_instance para usar xcom_pull.

También limpio las xcoms para el primer subdag antes de cada ejecución, solo para asegurarme de que no obtengo ningún valor incorrecto accidentalmente.

Soy bastante malo para explicar, así que espero que el siguiente código aclare todo:

test1.py

from airflow.models import DAG import logging from airflow.operators.python_operator import PythonOperator from airflow.operators.postgres_operator import PostgresOperator log = logging.getLogger(__name__) def test1(parent_dag_name, start_date, schedule_interval): dag = DAG( ''%s.test1'' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date, ) def return_list(): return [''test1'', ''test2''] list_extract_folder = PythonOperator( task_id=''list'', dag=dag, python_callable=return_list ) clean_xcoms = PostgresOperator( task_id=''clean_xcoms'', postgres_conn_id=''airflow_db'', sql="delete from xcom where dag_id=''{{ dag.dag_id }}''", dag=dag) clean_xcoms >> list_extract_folder return dag

test2.py

from airflow.models import DAG, settings import logging from airflow.operators.dummy_operator import DummyOperator log = logging.getLogger(__name__) def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None): dag = DAG( ''%s.test2'' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date ) if len(parent_dag.get_active_runs()) > 0: test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull( dag_id=''%s.%s'' % (parent_dag_name, ''test1''), task_ids=''list'') if test_list: for i in test_list: test = DummyOperator( task_id=i, dag=dag ) return dag

y el flujo de trabajo principal:

prueba.py

from datetime import datetime from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from subdags.test1 import test1 from subdags.test2 import test2 DAG_NAME = ''test-dag'' dag = DAG(DAG_NAME, description=''Test workflow'', catchup=False, schedule_interval=''0 0 * * *'', start_date=datetime(2018, 8, 24)) test1 = SubDagOperator( subdag=test1(DAG_NAME, dag.start_date, dag.schedule_interval), task_id=''test1'', dag=dag ) test2 = SubDagOperator( subdag=test2(DAG_NAME, dag.start_date, dag.schedule_interval, parent_dag=dag), task_id=''test2'', dag=dag ) test1 >> test2


OA: "¿Hay alguna forma en Airflow para crear un flujo de trabajo tal que el número de tareas B. * sea desconocido hasta la finalización de la Tarea A?"

La respuesta corta es no. Airflow construirá el flujo DAG antes de comenzar a ejecutarlo.

Dicho esto, llegamos a una conclusión simple, es decir, no tenemos tanta necesidad. Cuando desee hacer un trabajo en paralelo, debe evaluar los recursos que tiene disponibles y no la cantidad de elementos para procesar.

Lo hicimos así: generamos dinámicamente un número fijo de tareas, digamos 10, que dividirán el trabajo. Por ejemplo, si necesitamos procesar 100 archivos, cada tarea procesará 10 de ellos. Publicaré el código más tarde hoy.

Actualizar

Aquí está el código, perdón por el retraso.

from datetime import datetime, timedelta import airflow from airflow.operators.dummy_operator import DummyOperator args = { ''owner'': ''airflow'', ''depends_on_past'': False, ''start_date'': datetime(2018, 1, 8), ''email'': [''[email protected]''], ''email_on_failure'': True, ''email_on_retry'': True, ''retries'': 1, ''retry_delay'': timedelta(seconds=5) } dag = airflow.DAG( ''parallel_tasks_v1'', schedule_interval="@daily", catchup=False, default_args=args) # You can read this from variables parallel_tasks_total_number = 10 start_task = DummyOperator( task_id=''start_task'', dag=dag ) # Creates the tasks dynamically. # Each one will elaborate one chunk of data. def create_dynamic_task(current_task_number): return DummyOperator( provide_context=True, task_id=''parallel_task_'' + str(current_task_number), python_callable=parallelTask, # your task will take as input the total number and the current number to elaborate a chunk of total elements op_args=[current_task_number, int(parallel_tasks_total_number)], dag=dag) end = DummyOperator( task_id=''end'', dag=dag) for page in range(int(parallel_tasks_total_number)): created_task = create_dynamic_task(page) start_task >> created_task created_task >> end

Explicación del código:

Aquí tenemos una única tarea inicial y una única tarea final (ambas ficticias).

Luego, desde la tarea de inicio con el bucle for, creamos 10 tareas con el mismo python invocable. Las tareas se crean en la función create_dynamic_task.

A cada python invocable pasamos como argumentos el número total de tareas paralelas y el índice de tareas actual.

Supongamos que tiene 1000 elementos para elaborar: la primera tarea recibirá en la entrada que debe elaborar el primer fragmento de 10 fragmentos. Dividirá los 1000 artículos en 10 trozos y elaborará el primero.


Sí, esto es posible. He creado un ejemplo de DAG que lo demuestra.

import airflow from airflow.operators.python_operator import PythonOperator import os from airflow.models import Variable import logging from airflow import configuration as conf from airflow.models import DagBag, TaskInstance from airflow import DAG, settings from airflow.operators.bash_operator import BashOperator main_dag_id = ''DynamicWorkflow2'' args = { ''owner'': ''airflow'', ''start_date'': airflow.utils.dates.days_ago(2), ''provide_context'': True } dag = DAG( main_dag_id, schedule_interval="@once", default_args=args) def start(*args, **kwargs): value = Variable.get("DynamicWorkflow_Group1") logging.info("Current DynamicWorkflow_Group1 value is " + str(value)) def resetTasksStatus(task_id, execution_date): logging.info("Resetting: " + task_id + " " + execution_date) dag_folder = conf.get(''core'', ''DAGS_FOLDER'') dagbag = DagBag(dag_folder) check_dag = dagbag.dags[main_dag_id] session = settings.Session() my_task = check_dag.get_task(task_id) ti = TaskInstance(my_task, execution_date) state = ti.current_state() logging.info("Current state of " + task_id + " is " + str(state)) ti.set_state(None, session) state = ti.current_state() logging.info("Updated state of " + task_id + " is " + str(state)) def bridge1(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 2 variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue)) os.system(''airflow variables --set DynamicWorkflow_Group2 '' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus(''secondGroup_'' + str(i), str(kwargs[''execution_date''])) def bridge2(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 3 variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue)) os.system(''airflow variables --set DynamicWorkflow_Group3 '' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus(''thirdGroup_'' + str(i), str(kwargs[''execution_date''])) def end(*args, **kwargs): logging.info("Ending") def doSomeWork(name, index, *args, **kwargs): # Do whatever work you need to do # Here I will just create a new file os.system(''touch /home/ec2-user/airflow/'' + str(name) + str(index) + ''.txt'') starting_task = PythonOperator( task_id=''start'', dag=dag, provide_context=True, python_callable=start, op_args=[]) # Used to connect the stream in the event that the range is zero bridge1_task = PythonOperator( task_id=''bridge1'', dag=dag, provide_context=True, python_callable=bridge1, op_args=[]) DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1") logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1)) for index in range(int(DynamicWorkflow_Group1)): dynamicTask = PythonOperator( task_id=''firstGroup_'' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=[''firstGroup'', index]) starting_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge1_task) # Used to connect the stream in the event that the range is zero bridge2_task = PythonOperator( task_id=''bridge2'', dag=dag, provide_context=True, python_callable=bridge2, op_args=[]) DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2)) for index in range(int(DynamicWorkflow_Group2)): dynamicTask = PythonOperator( task_id=''secondGroup_'' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=[''secondGroup'', index]) bridge1_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge2_task) ending_task = PythonOperator( task_id=''end'', dag=dag, provide_context=True, python_callable=end, op_args=[]) DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3)) for index in range(int(DynamicWorkflow_Group3)): # You can make this logic anything you''d like # I chose to use the PythonOperator for all tasks # except the last task will use the BashOperator if index < (int(DynamicWorkflow_Group3) - 1): dynamicTask = PythonOperator( task_id=''thirdGroup_'' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=[''thirdGroup'', index]) else: dynamicTask = BashOperator( task_id=''thirdGroup_'' + str(index), bash_command=''touch /home/ec2-user/airflow/thirdGroup_'' + str(index) + ''.txt'', dag=dag) bridge2_task.set_downstream(dynamicTask) dynamicTask.set_downstream(ending_task) # If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream # and your tasks will run simultaneously instead of in your desired stream order. starting_task.set_downstream(bridge1_task) bridge1_task.set_downstream(bridge2_task) bridge2_task.set_downstream(ending_task)

Antes de ejecutar el DAG, cree estas tres variables de flujo de aire

airflow variables --set DynamicWorkflow_Group1 1 airflow variables --set DynamicWorkflow_Group2 0 airflow variables --set DynamicWorkflow_Group3 0

Verás que el DAG va de esto

A esto después de que se ejecutó

Puede ver más información sobre este DAG en mi artículo sobre cómo crear linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine .