bash - ruta - Parámetros de paso de flujo de aire a tarea dependiente
ruta critica pert (1)
Echa un vistazo a XComs - http://airflow.incubator.apache.org/concepts.html#xcoms . Estos se utilizan para comunicar el estado entre tareas.
¿Cuál es la manera de pasar el parámetro a las tareas dependientes en Airflow? Tengo muchos archivos bashes y estoy intentando migrar este enfoque del flujo de aire, pero no sé cómo pasar algunas propiedades entre tareas.
Este es un ejemplo real:
#sqoop bash template
sqoop_template = """
sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
"""
s3_template = """
s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
"""
#Task of extraction in EMR
t1 = BashOperator(
task_id=''extract_account'',
bash_command=sqoop_template,
params={''job'': ''job'', ''dir'': ''hdfs:///account/'' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
task_id=''s3_upload'',
bash_command=s3_template,
params={}, #here i need the dir name created in t1
depends_on_past=True
)
t2.set_upstream(t1)
En t2 necesito acceder al nombre de directorio creado en t1.
Solución
#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
s3, hdfs = dirpath(table_name)
sqoop_job = job_default_config(job_name, hdfs)
#call(sqoop_job)
return {''hdfs_dir'': hdfs, ''s3_dir'': s3}
def s3_upload(**context):
hdfs = context[''task_instance''].xcom_pull(task_ids=''sqoop_import'')[''hdfs_dir'']
s3 = context[''task_instance''].xcom_pull(task_ids=''sqoop_import'')[''s3_dir'']
s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)]
#call(s3_cpdist_job)
return {''s3_dir'': s3} #context[''task_instance''].xcom_pull(task_ids=''sqoop_import'')
def sns_notify(**context):
s3 = context[''task_instance''].xcom_pull(task_ids=''distcp_s3'')[''s3_dir'']
client = boto3.client(''sns'')
arn = ''arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg''
response = client.publish(TargetArn=arn, Message=s3)
return response
Esa no es la solución definitiva, así que las mejoras son bienvenidas. Gracias.