airflow - mega - Flujo de aire: ¿cómo eliminar un DAG?
airflow pc (14)
Inicié el servidor web Airflow y programé algunos dags. Puedo ver los dags en la GUI web.
¿Cómo puedo eliminar un DAG en particular para que no se ejecute y se muestre en la GUI web? ¿Hay un comando de Airflow CLI para hacer eso?
Miré a mi alrededor pero no pude encontrar una respuesta para una forma simple de eliminar un DAG una vez que se ha cargado y programado.
Acabo de escribir un script que elimina todo lo relacionado con un dag en particular, pero esto es solo para MySQL. Puede escribir un método de conector diferente si está utilizando PostgreSQL. Originalmente, los comandos fueron publicados por Lance en https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 Lo acabo de poner en el script. Espero que esto ayude. Formato: python script.py dag_id
import sys
import MySQLdb
dag_input = sys.argv[1]
query = {''delete from xcom where dag_id = "'' + dag_input + ''"'',
''delete from task_instance where dag_id = "'' + dag_input + ''"'',
''delete from sla_miss where dag_id = "'' + dag_input + ''"'',
''delete from log where dag_id = "'' + dag_input + ''"'',
''delete from job where dag_id = "'' + dag_input + ''"'',
''delete from dag_run where dag_id = "'' + dag_input + ''"'',
''delete from dag where dag_id = "'' + dag_input + ''"'' }
def connect(query):
db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
cur = db.cursor()
cur.execute(query)
db.commit()
db.close()
return
for value in query:
print value
connect(value)
Airflow 1.10.1 ha sido lanzado. Esta versión agrega la capacidad de eliminar un DAG de la interfaz de usuario web después de que haya eliminado el DAG correspondiente del sistema de archivos.
Vea este boleto para más detalles:
[AIRFLOW-2657] Agregue la capacidad de eliminar DAG de la interfaz de usuario web
Tenga en cuenta que esto en realidad no elimina el DAG del sistema de archivos, primero deberá hacerlo manualmente; de lo contrario, el DAG se volverá a cargar.
Elimine el dag (que desea eliminar) de la carpeta dags y ejecute
airflow resetdb
.
Alternativamente, puede ir a airflow_db y eliminar manualmente esas entradas de las tablas dag (task_fail, xcom, task_instance, sla_miss, log, job, dag_run, dag, dag_stats).
Este es mi código adaptado usando PostgresHook con el valor predeterminado de connection_id.
import sys
from airflow.hooks.postgres_hook import PostgresHook
dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
sql="delete from {} where dag_id=''{}''".format(t, dag_input)
hook.run(sql, True)
He escrito un script que elimina todos los metadatos relacionados con un dag específico para la base de datos SQLite predeterminada.
Esto se basa en la respuesta de Jesús anterior, pero se adaptó de Postgres a SQLite.
Los usuarios deben configurar
../airflow.db
en donde script.py esté almacenado en relación con el archivo predeterminado airflow.db (generalmente
~/airflow
airflow).
Para ejecutar, use
python script.py dag_id
.
import sqlite3
import sys
conn = sqlite3.connect(''../airflow.db'')
c = conn.cursor()
dag_input = sys.argv[1]
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
query = "delete from {} where dag_id=''{}''".format(t, dag_input)
c.execute(query)
conn.commit()
conn.close()
Los DAG-s se pueden eliminar en Airflow 1.10, pero el proceso y la secuencia de acciones deben ser correctos. Hay un "problema de huevo y gallina": si elimina el DAG de la interfaz mientras el archivo todavía está allí, el DAG se vuelve a cargar (porque el archivo no se elimina). Si primero elimina el archivo y actualiza la página, DAG ya no se puede eliminar de la interfaz gráfica de usuario web. Entonces, la secuencia de acciones que me permitió eliminar un DAG de la interfaz fue:
- elimine el archivo DAG (en mi caso, elimínelo del repositorio de canalización e impleméntelo en servidores de flujo de aire, especialmente el programador)
- NO actualice la GUI web.
- En la GUI web en la vista de DAG (portada normal), haga clic en "Eliminar dag" -> el ícono rojo en el extremo derecho.
- Limpia todos los restos de este DAG de la base de datos.
No estoy seguro de por qué Apache Airflow no tiene una forma fácil y obvia de eliminar un DAG
AIRFLOW-1002 archivado
No hay nada incorporado en Airflow que lo haga por usted. Para eliminar el DAG, elimínelo del repositorio y elimine las entradas de la base de datos en la tabla metastore de Airflow - dag.
Para aquellos que todavía están encontrando respuestas. En Airflow versión 1.8, es muy difícil eliminar un DAG, puede consultar las respuestas anteriores. Pero desde que se lanzó 1.9, solo tienes que
elimine el dag en la carpeta dags y reinicie el servidor web
Puede borrar un conjunto de instancias de tareas, como si nunca se hubieran ejecutado con:
airflow clear dag_id -s 2017-1-23 -e 2017-8-31
Y luego elimine el archivo dag de la carpeta dags
Según la respuesta de @OlegYamin, estoy haciendo lo siguiente para eliminar un dag respaldado por postgres, donde airflow usa el esquema
public
.
delete from public.dag_pickle where id = (
select pickle_id from public.dag where dag_id = ''my_dag_id''
);
delete from public.dag_run where dag_id = ''my_dag_id'';
delete from public.dag_stats where dag_id = ''my_dag_id'';
delete from public.log where dag_id = ''my_dag_id'';
delete from public.sla_miss where dag_id = ''my_dag_id'';
delete from public.task_fail where dag_id = ''my_dag_id'';
delete from public.task_instance where dag_id = ''my_dag_id'';
delete from public.xcom where dag_id = ''my_dag_id'';
delete from public.dag where dag_id = ''my_dag_id'';
ADVERTENCIA : El efecto / corrección de la primera consulta de eliminación es desconocido para mí. Es solo una suposición de que se necesita.
simplemente elimínelo de mysql, funciona bien para mí. eliminarlos de las tablas a continuación:
-
trozo de cuero
-
dag_constructor
- dag_group_ship
- dag_pickle
- dag_run
- dag_stats
(puede haber más tablas en futuras versiones) luego reinicie el servidor web y el trabajador.
versiones> = 1.10.0:
airflow delete_dag <dag_id>
versiones <= 1.9.0:
No hay un comando para eliminar un dag, por lo que primero debe eliminar el archivo dag y luego eliminar todas las referencias al dag_id de la base de datos de metadatos de flujo de aire.
ADVERTENCIA
Puede restablecer la meta base de datos de flujo de aire, borrará todo, incluidos los dags, pero recuerde que también borrará el historial, los grupos, las variables, etc.
airflow resetdb
y luego
airflow initdb
Editar 8/27/18 - ¡Airflow 1.10 ahora se lanza en PyPI!
https://pypi.org/project/apache-airflow/1.10.0/
Cómo eliminar un DAG por completo
¡Tenemos esta característica ahora en Airflow ≥ 1.10!
El PR #2199 (Jira: AIRFLOW-1002 ) que agrega la eliminación de DAG a Airflow ahora se ha fusionado, lo que permite eliminar completamente las entradas de un DAG de todas las tablas relacionadas.
El delete_dag(...) core delete_dag(...) ahora es parte de la API experimental, y hay puntos de entrada disponibles a través de la CLI y también a través de la API REST .
CLI:
airflow delete_dag my_dag_id
API REST (ejecutando servidor web localmente):
curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id
Advertencia sobre la API REST : asegúrese de que su clúster Airflow use autenticación en producción.
Instalación / actualización a Airflow 1.10 (actual)
Para actualizar, ejecute:
export SLUGIFY_USES_TEXT_UNIDECODE=yes
o:
export AIRFLOW_GPL_UNIDECODE=yes
Entonces:
pip install -U apache-airflow
¡Recuerde consultar UPDATING.md primero para conocer todos los detalles!