airflow - sesion - bitrix24 netitrix
Eliminar registros de tareas de flujo de aire (4)
Estoy ejecutando 5 DAG que han generado un total de aproximadamente 6 GB de datos de registro en la base_log_folder
durante un período de meses. Acabo de agregar una remote_base_log_folder
pero parece que no excluye el registro en la base_log_folder
.
¿Hay alguna forma de eliminar automáticamente los archivos de registro antiguos, rotarlos o forzar el flujo de aire para que no inicie sesión en el disco (base_log_folder) solo en almacenamiento remoto?
Consulte https://github.com/teamclairvoyant/airflow-maintenance-dags
Este complemento tiene DAG que pueden eliminar tareas detenidas y limpieza de registros. Puede captar los conceptos y crear un nuevo DAG que se pueda limpiar según sus necesidades.
Los encargados del flujo de aire no piensan que truncar los registros es parte de la lógica del núcleo del flujo de aire. Para ver this , y luego, en este problema, los encargados del mantenimiento sugieren cambiar LOG_LEVEL para evitar demasiados datos de registro.
Y en this PR, podemos aprender cómo cambiar el nivel de registro en airflow.cfg
.
buena suerte.
No creo que exista un mecanismo de rotación, pero puede almacenarlos en S3 o en el almacenamiento en la nube de Google como se describe aquí: https://airflow.incubator.apache.org/configuration.html#logs
FileTaskHandler
los registros de tareas implementando nuestro propio FileTaskHandler
, y luego lo airflow.cfg
en airflow.cfg
. Por lo tanto, sobrescribimos el LogHandler predeterminado para mantener solo N registros de tareas, sin programar DAG adicionales.
Estamos utilizando Airflow==1.10.1
.
[core]
logging_config_class = log_config.LOGGING_CONFIG
log_config.LOGGING_CONFIG
BASE_LOG_FOLDER = conf.get(''core'', ''BASE_LOG_FOLDER'')
FOLDER_TASK_TEMPLATE = ''{{ ti.dag_id }}/{{ ti.task_id }}''
FILENAME_TEMPLATE = ''{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log''
LOGGING_CONFIG = {
''formatters'': {},
''handlers'': {
''...'': {},
''task'': {
''class'': ''file_task_handler.FileTaskRotationHandler'',
''formatter'': ''airflow.job'',
''base_log_folder'': os.path.expanduser(BASE_LOG_FOLDER),
''filename_template'': FILENAME_TEMPLATE,
''folder_task_template'': FOLDER_TASK_TEMPLATE,
''retention'': 20
},
''...'': {}
},
''loggers'': {
''airflow.task'': {
''handlers'': [''task''],
''level'': JOB_LOG_LEVEL,
''propagate'': False,
},
''airflow.task_runner'': {
''handlers'': [''task''],
''level'': LOG_LEVEL,
''propagate'': True,
},
''...'': {}
}
}
file_task_handler.FileTaskRotationHandler
import os
import shutil
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler
class FileTaskRotationHandler(FileTaskHandler):
def __init__(self, base_log_folder, filename_template, folder_task_template, retention):
"""
:param base_log_folder: Base log folder to place logs.
:param filename_template: template filename string.
:param folder_task_template: template folder task path.
:param retention: Number of folder logs to keep
"""
super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template)
self.retention = retention
self.folder_task_template, self.folder_task_template_jinja_template = /
parse_template_string(folder_task_template)
@staticmethod
def _get_directories(path=''.''):
return next(os.walk(path))[1]
def _render_folder_task_path(self, ti):
if self.folder_task_template_jinja_template:
jinja_context = ti.get_template_context()
return self.folder_task_template_jinja_template.render(**jinja_context)
return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id)
def _init_file(self, ti):
relative_path = self._render_folder_task_path(ti)
folder_task_path = os.path.join(self.local_base, relative_path)
subfolders = self._get_directories(folder_task_path)
to_remove = set(subfolders) - set(subfolders[-self.retention:])
for dir_to_remove in to_remove:
full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove)
print(''Removing'', full_dir_to_remove)
shutil.rmtree(full_dir_to_remove)
return FileTaskHandler._init_file(self, ti)