python-3.x airflow

python-3.x - airflow vs luigi



Conexión de Airflow s3 utilizando UI (7)

Otra opción que me funcionó fue poner la clave de acceso como "inicio de sesión" y la clave secreta como "contraseña":

Conn Id: <arbitrary_conn_id> Conn Type: S3 Login: <aws_access_key> Password: <aws_secret_key>

Deje todos los demás campos en blanco.

He estado tratando de usar Airflow para programar un DAG. Uno de los DAG incluye una tarea que carga datos desde el compartimiento s3.

Para el propósito anterior necesito configurar la conexión s3. Pero la IU proporcionada por el flujo de aire no es tan intuitiva ( http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections ). ¿Alguien logró configurar la conexión s3? En caso afirmativo, ¿hay alguna práctica recomendada que sigáis?

Gracias.


Para aws en China, no funciona en el flujo de aire == 1.8.0 necesita actualizarse a 1.9.0 pero el flujo de aire 1.9.0 cambia el nombre a apache-airflow == 1.9.0


Para la nueva versión, cambie el código de python en el ejemplo anterior.

s3_conn_id=''my_conn_S3''

a

aws_conn_id=''my_conn_s3''


Si le preocupa exponer las credenciales en la interfaz de usuario, otra forma es pasar la ubicación del archivo de credenciales en el parámetro Extra en la interfaz de usuario. Solo el usuario funcional tiene privilegios de lectura para el archivo. Parece algo como abajo

Extra: { "profile": "<profile_name>", "s3_config_file": "/home/<functional_user>/creds/s3_credentials", "s3_config_format": "aws" }

el archivo " /home/<functional_user>/creds/s3_credentials " tiene las siguientes entradas

[<profile_name>] aws_access_key_id = <access_key_id> aws_secret_access_key = <secret_key>


Suponiendo que el flujo de aire está alojado en un servidor EC2.

simplemente cree la conexión según las otras respuestas, pero deje todo en blanco en la configuración, aparte del tipo de conexión que debería permanecer como S3

El S3hook se establecerá de forma predeterminada en boto y esto se establecerá de manera predeterminada en la función del servidor EC2 en el que está ejecutando el flujo de aire. asumiendo que este rol tiene derechos para S3, su tarea podrá acceder al depósito.

esta es una forma mucho más segura que usar y almacenar credenciales.


Es difícil encontrar referencias, pero después de cavar un poco pude hacerlo funcionar.

TLDR

Crea una nueva conexión con los siguientes atributos:

Id de conexión : my_conn_S3

Tipo de conexión: S3

Extra:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}

Versión larga, configurando la conexión UI:

  • En la interfaz de usuario de Airflow, vaya a Admin> Conexiones
  • Cree una nueva conexión con los siguientes atributos: Id. De conexión: my_conn_S3, Tipo de conexión: S3, Extra: {"aws_access_key_id": "_ your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
  • Deje en blanco todos los demás campos (Host, Esquema, Inicio de sesión).

Para utilizar esta conexión, a continuación puede encontrar una prueba de sensor S3 simple. La idea de esta prueba es configurar un sensor que mire los archivos en S3 (tarea T1) y una vez que se cumpla la condición, se activará un comando bash (tarea T2).

Pruebas

  • Antes de ejecutar el DAG, asegúrese de tener un cucharón S3 llamado ''S3-Bucket-To-Watch''.
  • Agregue debajo de s3_dag_test.py a la carpeta de dags de flujo de aire (~ / airflow / dags)
  • Iniciar el airflow webserver .
  • Vaya a la interfaz de usuario de Airflow ( http://localhost:8383/ )
  • Iniciar el airflow scheduler .
  • Active el DAG ''s3_dag_test'' en la vista principal de DAG.
  • Seleccione ''s3_dag_test'' para mostrar los detalles del dag.
  • En la vista de gráfico debería poder ver su estado actual.
  • La tarea ''check_s3_for_file_in_s3'' debe estar activa y en ejecución.
  • Ahora, agregue un archivo llamado ''file-to-watch-1'' a su ''S3-Bucket-To-Watch''.
  • Las primeras tareas deben haberse completado, la segunda debe iniciarse y finalizar.

El schedule_interval en la definición de dag se establece en ''@once'', para facilitar la depuración.

Para ejecutarlo de nuevo, deje todo como está, elimine los archivos en el depósito e intente nuevamente seleccionando la primera tarea (en la vista gráfica) y seleccionando ''Borrar'' todo ''Pasado'', ''Futuro'', ''Upstream'', ''Downstream'' .... actividad. Esto debería dar inicio al DAG de nuevo.

Déjame saber cómo te fue.

s3_dag_test.py;

""" S3 Sensor Connection Test """ from airflow import DAG from airflow.operators import SimpleHttpOperator, HttpSensor, BashOperator, EmailOperator, S3KeySensor from datetime import datetime, timedelta default_args = { ''owner'': ''airflow'', ''depends_on_past'': False, ''start_date'': datetime(2016, 11, 1), ''email'': [''[email protected]''], ''email_on_failure'': False, ''email_on_retry'': False, ''retries'': 5, ''retry_delay'': timedelta(minutes=5) } dag = DAG(''s3_dag_test'', default_args=default_args, schedule_interval= ''@once'') t1 = BashOperator( task_id=''bash_test'', bash_command=''echo "hello, it should work" > s3_conn_test.txt'', dag=dag) sensor = S3KeySensor( task_id=''check_s3_for_file_in_s3'', bucket_key=''file-to-watch-*'', wildcard_match=True, bucket_name=''S3-Bucket-To-Watch'', s3_conn_id=''my_conn_S3'', timeout=18*60*60, poke_interval=120, dag=dag) t1.set_upstream(sensor) Referencias principales:


Conn Id: example_s3_connnection Conn Type: S3 Extra:{"aws_access_key_id":"xxxxxxxxxx", "aws_secret_access_key": "yyyyyyyyyyy"}

Nota: Los campos de inicio de sesión y contraseña se dejan vacíos.