www sesion netitrix net iniciar bitrix24 bitrix airflow

airflow - sesion - bitrix24 netitrix



Eliminar registros de tareas de flujo de aire (4)

Estoy ejecutando 5 DAG que han generado un total de aproximadamente 6 GB de datos de registro en la base_log_folder durante un período de meses. Acabo de agregar una remote_base_log_folder pero parece que no excluye el registro en la base_log_folder .

¿Hay alguna forma de eliminar automáticamente los archivos de registro antiguos, rotarlos o forzar el flujo de aire para que no inicie sesión en el disco (base_log_folder) solo en almacenamiento remoto?



Los encargados del flujo de aire no piensan que truncar los registros es parte de la lógica del núcleo del flujo de aire. Para ver this , y luego, en este problema, los encargados del mantenimiento sugieren cambiar LOG_LEVEL para evitar demasiados datos de registro.

Y en this PR, podemos aprender cómo cambiar el nivel de registro en airflow.cfg .

buena suerte.



FileTaskHandler los registros de tareas implementando nuestro propio FileTaskHandler , y luego lo airflow.cfg en airflow.cfg . Por lo tanto, sobrescribimos el LogHandler predeterminado para mantener solo N registros de tareas, sin programar DAG adicionales.

Estamos utilizando Airflow==1.10.1 .

[core] logging_config_class = log_config.LOGGING_CONFIG

log_config.LOGGING_CONFIG

BASE_LOG_FOLDER = conf.get(''core'', ''BASE_LOG_FOLDER'') FOLDER_TASK_TEMPLATE = ''{{ ti.dag_id }}/{{ ti.task_id }}'' FILENAME_TEMPLATE = ''{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'' LOGGING_CONFIG = { ''formatters'': {}, ''handlers'': { ''...'': {}, ''task'': { ''class'': ''file_task_handler.FileTaskRotationHandler'', ''formatter'': ''airflow.job'', ''base_log_folder'': os.path.expanduser(BASE_LOG_FOLDER), ''filename_template'': FILENAME_TEMPLATE, ''folder_task_template'': FOLDER_TASK_TEMPLATE, ''retention'': 20 }, ''...'': {} }, ''loggers'': { ''airflow.task'': { ''handlers'': [''task''], ''level'': JOB_LOG_LEVEL, ''propagate'': False, }, ''airflow.task_runner'': { ''handlers'': [''task''], ''level'': LOG_LEVEL, ''propagate'': True, }, ''...'': {} } }

file_task_handler.FileTaskRotationHandler

import os import shutil from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler class FileTaskRotationHandler(FileTaskHandler): def __init__(self, base_log_folder, filename_template, folder_task_template, retention): """ :param base_log_folder: Base log folder to place logs. :param filename_template: template filename string. :param folder_task_template: template folder task path. :param retention: Number of folder logs to keep """ super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template) self.retention = retention self.folder_task_template, self.folder_task_template_jinja_template = / parse_template_string(folder_task_template) @staticmethod def _get_directories(path=''.''): return next(os.walk(path))[1] def _render_folder_task_path(self, ti): if self.folder_task_template_jinja_template: jinja_context = ti.get_template_context() return self.folder_task_template_jinja_template.render(**jinja_context) return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id) def _init_file(self, ti): relative_path = self._render_folder_task_path(ti) folder_task_path = os.path.join(self.local_base, relative_path) subfolders = self._get_directories(folder_task_path) to_remove = set(subfolders) - set(subfolders[-self.retention:]) for dir_to_remove in to_remove: full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove) print(''Removing'', full_dir_to_remove) shutil.rmtree(full_dir_to_remove) return FileTaskHandler._init_file(self, ti)