python - configurar s3 para registros en flujo de aire
amazon-s3 airflow (6)
Estoy usando docker-compose para configurar un clúster de flujo de aire escalable. Basé mi enfoque en este Dockerfile https://hub.docker.com/r/puckel/docker-airflow/
Mi problema es configurar los registros para escribir / leer desde s3. Cuando se completa un dag, aparece un error como este
*** Log file isn''t local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.
*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00
Configuré una nueva sección en el archivo
airflow.cfg
como esta
[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx
Y luego especificó la ruta s3 en la sección de registros remotos en
airflow.cfg
remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn
¿Configuré esto correctamente y hay un error? ¿Hay una receta para el éxito que me estoy perdiendo?
- Actualización
Intenté exportar en formatos URI y JSON y ninguno parecía funcionar. Luego exporté aws_access_key_id y aws_secret_access_key y luego el flujo de aire comenzó a recogerlo. Ahora recibo su error en los registros de los trabajadores
6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
- Actualización
También encontré este enlace https://www.mail-archive.com/[email protected]/msg00462.html
Luego ingresé a una de mis máquinas de trabajo (aparte del servidor web y el programador) y ejecuté este bit de código en python
import airflow
s3 = airflow.hooks.S3Hook(''s3_conn'')
s3.load_string(''test'', airflow.conf.get(''core'', ''remote_base_log_folder''))
Recibo este error
boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden
Intenté exportar varios tipos diferentes de
AIRFLOW_CONN_
AIRFLOW_CONN_ como se explica aquí en la sección de conexiones
https://airflow.incubator.apache.org/concepts.html
y por otras respuestas a esta pregunta.
s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3
{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}
{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}
También exporté AWS_ACCESS_KEY_ID y AWS_SECRET_ACCESS_KEY sin éxito.
Estas credenciales se almacenan en una base de datos, por lo que una vez que las agregue a la interfaz de usuario, los trabajadores deben recogerlas, pero por alguna razón no pueden escribir / leer registros.
Debe configurar la conexión s3 a través de la interfaz de usuario de flujo de aire. Para esto, debe ir a la pestaña Admin -> Conexiones en la interfaz de usuario del flujo de aire y crear una nueva fila para su conexión S3.
Un ejemplo de configuración sería:
Id. De conexión: my_conn_S3
Tipo de conexión: S3
Extra: {"aws_access_key_id": "your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}
Haz que funcione con Airflow 10 en kube. Tengo los siguientes conjuntos de variables de entorno:
AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3
Para completar la respuesta de Arne con las actualizaciones recientes de Airflow, no necesita establecer
task_log_reader
en otro valor que no sea el predeterminado:
task
Como si siguiera la plantilla de registro predeterminada
airflow/config_templates/airflow_local_settings.py
se puede ver
desde esta confirmación
(tenga en cuenta que el nombre del controlador cambió a
''s3'': {''task''...
lugar de
s3.task
) ese es el valor en el carpeta remota (
REMOTE_BASE_LOG_FOLDER
) reemplazará el controlador con el correcto:
REMOTE_LOGGING = conf.get(''core'', ''remote_logging'')
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith(''s3://''):
DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''s3''])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith(''gs://''):
DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''gcs''])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith(''wasb''):
DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''wasb''])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG[''handlers''].update(REMOTE_HANDLERS[''elasticsearch''])
Más detalles sobre cómo iniciar sesión / leer desde S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3
Solo una nota al margen para cualquiera que siga las instrucciones muy útiles en share : Si tropieza con este problema: "ModuleNotFoundError: Ningún módulo llamado ''airflow.utils.log.logging_mixin.RedirectStdHandler''" como se hace referencia aquí (lo que sucede cuando se usa el flujo de aire 1.9), la solución es simple: use más bien esta plantilla base: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (y siga todas las demás instrucciones en share )
La plantilla actual airflow/config_templates/airflow_local_settings.py presente en la rama maestra contiene una referencia a la clase "airflow.utils.log.s3_task_handler.S3TaskHandler", que no está presente en apache-airflow == 1.9.0 python paquete. ¡Espero que esto ayude!
(Actualizado a partir de Airflow 1.10.2)
Aquí hay una solución si no usa la interfaz de usuario de administrador.
Mi Airflow no se ejecuta en un servidor persistente ... (Se inicia de nuevo todos los días en un contenedor Docker, en Heroku). Sé que me estoy perdiendo muchas funciones excelentes, pero en mi configuración mínima, yo nunca toque la interfaz de usuario del administrador o el archivo cfg. En cambio, tengo que establecer variables de entorno específicas de Airflow en un script bash, que anula el archivo .cfg.
flujo de aire apache [s3]
En primer lugar, necesita instalar el subpaquete
s3
para escribir sus registros de Airflow en S3.
(
boto3
funciona bien para los trabajos de Python dentro de sus DAG, pero el
S3Hook
depende del subpaquete s3).
Una nota adicional más: conda install
aún no maneja esto
, así que tengo que hacer
pip install apache-airflow[s3]
.
Variables de entorno
En un script bash, configuro estas variables
core
.
A partir de
estas instrucciones,
pero usando la convención de nomenclatura
AIRFLOW__{SECTION}__{KEY}
para las variables de entorno, hago:
export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
ID de conexión S3
El
s3_uri
anterior es una ID de conexión que
s3_uri
.
En Airflow, corresponde a otra variable de entorno,
AIRFLOW_CONN_S3_URI
.
El valor de eso es su ruta S3, que tiene que estar en forma de URI.
Eso es
s3://access_key:secret_key@bucket/key
Almacene esto sin embargo maneja otras variables de entorno sensibles.
Con esta configuración, Airflow podrá escribir sus registros en S3.
s3://bucket/key/dag/task_id/timestamp/1.log
la ruta de
s3://bucket/key/dag/task_id/timestamp/1.log
.
Apéndice sobre la actualización de Airflow 1.8 a Airflow 1.10
Recientemente actualicé mi tubería de producción de Airflow 1.8 a 1.9, y luego a 1.10. La buena noticia es que los cambios son bastante pequeños; El resto del trabajo consistía en descubrir matices con las instalaciones del paquete (sin relación con la pregunta original sobre los registros S3).
(1) En primer lugar, necesitaba actualizar a Python 3.6 con Airflow 1.9.
(2) El nombre del paquete cambió de
airflow
de
airflow
a
airflow
de
airflow
apache-airflow
con 1.9.
También puede encontrarse con
this
en su
pip install
.
(3) El paquete
psutil
debe estar en un rango de versión específico para Airflow.
Puede encontrar esto cuando esté haciendo
pip install apache-airflow
.
(4) se necesitan encabezados python3-dev con Airflow 1.9+.
(5) Aquí están los cambios sustanciales:
export AIRFLOW__CORE__REMOTE_LOGGING=True
ahora se requiere.
Y
(6) Los registros tienen una ruta ligeramente diferente en S3, que actualicé en la respuesta:
s3://bucket/key/dag/task_id/timestamp/1.log
.
¡Pero eso es todo! Los registros no funcionaron en 1.9, por lo que recomiendo ir directamente a 1.10, ahora que está disponible.
ACTUALIZAR Airflow 1.10 hace que el registro sea mucho más fácil.
Para el registro s3, configure el enlace de conexión según la respuesta anterior
y luego simplemente agregue lo siguiente a airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either ''s3://...'') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
Para el registro de gcs,
-
Instale el paquete gcp_api primero, así: pip install apache-airflow [gcp_api].
-
Configure el enlace de conexión según la respuesta anterior
-
Agregue lo siguiente a airflow.cfg
[core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either ''s3://...'') and an Airflow connection # id that provides access to the storage location. remote_logging = True remote_base_log_folder = gs://my-bucket/path/to/logs remote_log_conn_id = MyGCSConn
NOTA: A partir de Airflow 1.9, el registro remoto se ha modificado significativamente . Si está utilizando 1.9, siga leyendo.
Referencia here
Instrucciones completas
-
Cree un directorio para almacenar configuraciones y colóquelo para que pueda encontrarse en PYTHONPATH. Un ejemplo es $ AIRFLOW_HOME / config
-
Cree archivos vacíos llamados $ AIRFLOW_HOME / config / log_config.py y $ AIRFLOW_HOME / config / __ init__.py
-
Copie el contenido de airflow/config_templates/airflow_local_settings.py en el archivo log_config.py que acaba de crear en el paso anterior.
-
Personalice las siguientes partes de la plantilla:
#Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = ''s3://<bucket where logs should be persisted>/'' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the ''handlers'' block of the LOGGING_CONFIG variable ''s3.task'': { ''class'': ''airflow.utils.log.s3_task_handler.S3TaskHandler'', ''formatter'': ''airflow.task'', ''base_log_folder'': os.path.expanduser(BASE_LOG_FOLDER), ''s3_log_folder'': S3_LOG_FOLDER, ''filename_template'': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be ''s3.task'' instead >of ''file.task''. ''loggers'': { ''airflow.task'': { ''handlers'': [''s3.task''], ... }, ''airflow.task_runner'': { ''handlers'': [''s3.task''], ... }, ''airflow'': { ''handlers'': [''console''], ... }, }
-
Asegúrese de que se haya definido un enlace de conexión s3 en Airflow, según la respuesta anterior . El enlace debe tener acceso de lectura y escritura al depósito s3 definido anteriormente en S3_LOG_FOLDER.
-
Actualice $ AIRFLOW_HOME / airflow.cfg para que contenga:
task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the s3 platform hook>
-
Reinicie el servidor web y el planificador Airflow, y active (o espere) una nueva ejecución de tarea.
-
Verifique que los registros se muestran para las tareas recién ejecutadas en el depósito que ha definido.
-
Verifique que el visor de almacenamiento s3 funcione en la interfaz de usuario. Abra una tarea recién ejecutada y verifique que vea algo como:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: [''bash'', ''-c'', u''airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py''] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py