python amazon-s3 airflow

python - configurar s3 para registros en flujo de aire



amazon-s3 airflow (6)

Estoy usando docker-compose para configurar un clúster de flujo de aire escalable. Basé mi enfoque en este Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

Mi problema es configurar los registros para escribir / leer desde s3. Cuando se completa un dag, aparece un error como este

*** Log file isn''t local. *** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00 *** Failed to fetch log file from worker. *** Reading remote logs... Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06- 26T11:00:00

Configuré una nueva sección en el archivo airflow.cfg como esta

[MyS3Conn] aws_access_key_id = xxxxxxx aws_secret_access_key = xxxxxxx aws_default_region = xxxxxxx

Y luego especificó la ruta s3 en la sección de registros remotos en airflow.cfg

remote_base_log_folder = s3://buckets/xxxx/airflow/logs remote_log_conn_id = MyS3Conn

¿Configuré esto correctamente y hay un error? ¿Hay una receta para el éxito que me estoy perdiendo?

- Actualización

Intenté exportar en formatos URI y JSON y ninguno parecía funcionar. Luego exporté aws_access_key_id y aws_secret_access_key y luego el flujo de aire comenzó a recogerlo. Ahora recibo su error en los registros de los trabajadores

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3 6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00 6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00 6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

- Actualización

También encontré este enlace https://www.mail-archive.com/[email protected]/msg00462.html

Luego ingresé a una de mis máquinas de trabajo (aparte del servidor web y el programador) y ejecuté este bit de código en python

import airflow s3 = airflow.hooks.S3Hook(''s3_conn'') s3.load_string(''test'', airflow.conf.get(''core'', ''remote_base_log_folder''))

Recibo este error

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

Intenté exportar varios tipos diferentes de AIRFLOW_CONN_ AIRFLOW_CONN_ como se explica aquí en la sección de conexiones https://airflow.incubator.apache.org/concepts.html y por otras respuestas a esta pregunta.

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3 {"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"} {"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

También exporté AWS_ACCESS_KEY_ID y AWS_SECRET_ACCESS_KEY sin éxito.

Estas credenciales se almacenan en una base de datos, por lo que una vez que las agregue a la interfaz de usuario, los trabajadores deben recogerlas, pero por alguna razón no pueden escribir / leer registros.


Debe configurar la conexión s3 a través de la interfaz de usuario de flujo de aire. Para esto, debe ir a la pestaña Admin -> Conexiones en la interfaz de usuario del flujo de aire y crear una nueva fila para su conexión S3.

Un ejemplo de configuración sería:

Id. De conexión: my_conn_S3

Tipo de conexión: S3

Extra: {"aws_access_key_id": "your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}


Haz que funcione con Airflow 10 en kube. Tengo los siguientes conjuntos de variables de entorno:

AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3 AIRFLOW__CORE__REMOTE_LOGGING=True AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3


Para completar la respuesta de Arne con las actualizaciones recientes de Airflow, no necesita establecer task_log_reader en otro valor que no sea el predeterminado: task

Como si siguiera la plantilla de registro predeterminada airflow/config_templates/airflow_local_settings.py se puede ver desde esta confirmación (tenga en cuenta que el nombre del controlador cambió a ''s3'': {''task''... lugar de s3.task ) ese es el valor en el carpeta remota ( REMOTE_BASE_LOG_FOLDER ) reemplazará el controlador con el correcto:

REMOTE_LOGGING = conf.get(''core'', ''remote_logging'') if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith(''s3://''): DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''s3'']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith(''gs://''): DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''gcs'']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith(''wasb''): DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''wasb'']) elif REMOTE_LOGGING and ELASTICSEARCH_HOST: DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''elasticsearch''])

Más detalles sobre cómo iniciar sesión / leer desde S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3


Solo una nota al margen para cualquiera que siga las instrucciones muy útiles en share : Si tropieza con este problema: "ModuleNotFoundError: Ningún módulo llamado ''airflow.utils.log.logging_mixin.RedirectStdHandler''" como se hace referencia aquí (lo que sucede cuando se usa el flujo de aire 1.9), la solución es simple: use más bien esta plantilla base: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (y siga todas las demás instrucciones en share )

La plantilla actual airflow/config_templates/airflow_local_settings.py presente en la rama maestra contiene una referencia a la clase "airflow.utils.log.s3_task_handler.S3TaskHandler", que no está presente en apache-airflow == 1.9.0 python paquete. ¡Espero que esto ayude!


(Actualizado a partir de Airflow 1.10.2)

Aquí hay una solución si no usa la interfaz de usuario de administrador.

Mi Airflow no se ejecuta en un servidor persistente ... (Se inicia de nuevo todos los días en un contenedor Docker, en Heroku). Sé que me estoy perdiendo muchas funciones excelentes, pero en mi configuración mínima, yo nunca toque la interfaz de usuario del administrador o el archivo cfg. En cambio, tengo que establecer variables de entorno específicas de Airflow en un script bash, que anula el archivo .cfg.

flujo de aire apache [s3]

En primer lugar, necesita instalar el subpaquete s3 para escribir sus registros de Airflow en S3. ( boto3 funciona bien para los trabajos de Python dentro de sus DAG, pero el S3Hook depende del subpaquete s3).

Una nota adicional más: conda install aún no maneja esto , así que tengo que hacer pip install apache-airflow[s3] .

Variables de entorno

En un script bash, configuro estas variables core . A partir de estas instrucciones, pero usando la convención de nomenclatura AIRFLOW__{SECTION}__{KEY} para las variables de entorno, hago:

export AIRFLOW__CORE__REMOTE_LOGGING=True export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

ID de conexión S3

El s3_uri anterior es una ID de conexión que s3_uri . En Airflow, corresponde a otra variable de entorno, AIRFLOW_CONN_S3_URI . El valor de eso es su ruta S3, que tiene que estar en forma de URI. Eso es

s3://access_key:secret_key@bucket/key

Almacene esto sin embargo maneja otras variables de entorno sensibles.

Con esta configuración, Airflow podrá escribir sus registros en S3. s3://bucket/key/dag/task_id/timestamp/1.log la ruta de s3://bucket/key/dag/task_id/timestamp/1.log .

Apéndice sobre la actualización de Airflow 1.8 a Airflow 1.10

Recientemente actualicé mi tubería de producción de Airflow 1.8 a 1.9, y luego a 1.10. La buena noticia es que los cambios son bastante pequeños; El resto del trabajo consistía en descubrir matices con las instalaciones del paquete (sin relación con la pregunta original sobre los registros S3).

(1) En primer lugar, necesitaba actualizar a Python 3.6 con Airflow 1.9.

(2) El nombre del paquete cambió de airflow de airflow a airflow de airflow apache-airflow con 1.9. También puede encontrarse con this en su pip install .

(3) El paquete psutil debe estar en un rango de versión específico para Airflow. Puede encontrar esto cuando esté haciendo pip install apache-airflow .

(4) se necesitan encabezados python3-dev con Airflow 1.9+.

(5) Aquí están los cambios sustanciales: export AIRFLOW__CORE__REMOTE_LOGGING=True ahora se requiere. Y

(6) Los registros tienen una ruta ligeramente diferente en S3, que actualicé en la respuesta: s3://bucket/key/dag/task_id/timestamp/1.log .

¡Pero eso es todo! Los registros no funcionaron en 1.9, por lo que recomiendo ir directamente a 1.10, ahora que está disponible.


ACTUALIZAR Airflow 1.10 hace que el registro sea mucho más fácil.

Para el registro s3, configure el enlace de conexión según la respuesta anterior

y luego simplemente agregue lo siguiente a airflow.cfg

[core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either ''s3://...'') and an Airflow connection # id that provides access to the storage location. remote_base_log_folder = s3://my-bucket/path/to/logs remote_log_conn_id = MyS3Conn # Use server-side encryption for logs stored in S3 encrypt_s3_logs = False

Para el registro de gcs,

  1. Instale el paquete gcp_api primero, así: pip install apache-airflow [gcp_api].

  2. Configure el enlace de conexión según la respuesta anterior

  3. Agregue lo siguiente a airflow.cfg

    [core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either ''s3://...'') and an Airflow connection # id that provides access to the storage location. remote_logging = True remote_base_log_folder = gs://my-bucket/path/to/logs remote_log_conn_id = MyGCSConn

NOTA: A partir de Airflow 1.9, el registro remoto se ha modificado significativamente . Si está utilizando 1.9, siga leyendo.

Referencia here

Instrucciones completas

  1. Cree un directorio para almacenar configuraciones y colóquelo para que pueda encontrarse en PYTHONPATH. Un ejemplo es $ AIRFLOW_HOME / config

  2. Cree archivos vacíos llamados $ AIRFLOW_HOME / config / log_config.py y $ AIRFLOW_HOME / config / __ init__.py

  3. Copie el contenido de airflow/config_templates/airflow_local_settings.py en el archivo log_config.py que acaba de crear en el paso anterior.

  4. Personalice las siguientes partes de la plantilla:

    #Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = ''s3://<bucket where logs should be persisted>/'' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the ''handlers'' block of the LOGGING_CONFIG variable ''s3.task'': { ''class'': ''airflow.utils.log.s3_task_handler.S3TaskHandler'', ''formatter'': ''airflow.task'', ''base_log_folder'': os.path.expanduser(BASE_LOG_FOLDER), ''s3_log_folder'': S3_LOG_FOLDER, ''filename_template'': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be ''s3.task'' instead >of ''file.task''. ''loggers'': { ''airflow.task'': { ''handlers'': [''s3.task''], ... }, ''airflow.task_runner'': { ''handlers'': [''s3.task''], ... }, ''airflow'': { ''handlers'': [''console''], ... }, }

  5. Asegúrese de que se haya definido un enlace de conexión s3 en Airflow, según la respuesta anterior . El enlace debe tener acceso de lectura y escritura al depósito s3 definido anteriormente en S3_LOG_FOLDER.

  6. Actualice $ AIRFLOW_HOME / airflow.cfg para que contenga:

    task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the s3 platform hook>

  7. Reinicie el servidor web y el planificador Airflow, y active (o espere) una nueva ejecución de tarea.

  8. Verifique que los registros se muestran para las tareas recién ejecutadas en el depósito que ha definido.

  9. Verifique que el visor de almacenamiento s3 funcione en la interfaz de usuario. Abra una tarea recién ejecutada y verifique que vea algo como:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: [''bash'', ''-c'', u''airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py''] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py