una ruta pert online hacer gantt empresa ejemplo diagrama critica como bash airflow

bash - ruta - Parámetros de paso de flujo de aire a tarea dependiente



ruta critica pert (1)

Echa un vistazo a XComs - http://airflow.incubator.apache.org/concepts.html#xcoms . Estos se utilizan para comunicar el estado entre tareas.

¿Cuál es la manera de pasar el parámetro a las tareas dependientes en Airflow? Tengo muchos archivos bashes y estoy intentando migrar este enfoque del flujo de aire, pero no sé cómo pasar algunas propiedades entre tareas.

Este es un ejemplo real:

#sqoop bash template sqoop_template = """ sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ """ s3_template = """ s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}} """ #Task of extraction in EMR t1 = BashOperator( task_id=''extract_account'', bash_command=sqoop_template, params={''job'': ''job'', ''dir'': ''hdfs:///account/'' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, dag=dag) #Task to upload in s3 backup. t2 = BashOperator( task_id=''s3_upload'', bash_command=s3_template, params={}, #here i need the dir name created in t1 depends_on_past=True ) t2.set_upstream(t1)

En t2 necesito acceder al nombre de directorio creado en t1.

Solución

#Execute a valid job sqoop def sqoop_import(table_name, job_name): s3, hdfs = dirpath(table_name) sqoop_job = job_default_config(job_name, hdfs) #call(sqoop_job) return {''hdfs_dir'': hdfs, ''s3_dir'': s3} def s3_upload(**context): hdfs = context[''task_instance''].xcom_pull(task_ids=''sqoop_import'')[''hdfs_dir''] s3 = context[''task_instance''].xcom_pull(task_ids=''sqoop_import'')[''s3_dir''] s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] #call(s3_cpdist_job) return {''s3_dir'': s3} #context[''task_instance''].xcom_pull(task_ids=''sqoop_import'') def sns_notify(**context): s3 = context[''task_instance''].xcom_pull(task_ids=''distcp_s3'')[''s3_dir''] client = boto3.client(''sns'') arn = ''arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg'' response = client.publish(TargetArn=arn, Message=s3) return response

Esa no es la solución definitiva, así que las mejoras son bienvenidas. Gracias.