airflow - mochilas - No se pueden importar complementos de flujo de aire
airflow scheduler (6)
Siguiendo el tutorial de Airflow here .
Problema : el servidor web devuelve el siguiente error
Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name
MyFirstOperator
Notas: La estructura del directorio se ve así:
airflow_home
├── airflow.cfg
├── airflow.db
├── dags
│ └── test_operators.py
├── plugins
│ └── my_operators.py
└── unittests.cfg
Estoy intentando importar el complemento en ''test_operators.py'' como este:
from airflow.operators import MyFirstOperator
El código es el mismo que se encuentra en el tutorial.
En el artículo le gusta esto:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
En su lugar, utilice:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
Tampoco uses:
from airflow.operators import MyFirstOperator
De acuerdo con el artículo de flujo de aire en los complementos, debe ser:
from airflow.operators.my_first_plugin import MyFirstOperator
Si eso no funciona, intente:
from airflow.operators.my_operators import MyFirstOperator
Si eso no funciona, revise el inicio de sesión de su servidor web para obtener más información.
Encontré el mismo error al seguir here .
Sin embargo, mi culpa fue que había usado el carácter de espacio '' ''
en task_id
, que no es compatible con Airflow
.
Claramente el error no apuntaba hacia el problema real. Al reiniciar el scheduler
flujo de aire y el webserver
se mostró el mensaje de error correcto en WebUI .
Reinicié el servidor web, y ahora todo funciona bien.
Esto es lo que creo que podría haber sucedido:
- Antes de comenzar con el ejemplo del tutorial, intenté ejecutar mi propio plugin y dag. Hubo un pequeño error de sintaxis en la primera ejecución que reparé, sin embargo, después de la corrección empecé a recibir el error "no se puede importar el nombre".
- Eliminé el plugin y el dag, y traté de usar el del tutorial para ver qué estaba pasando.
Mi conjetura es que el error del paso 1 de alguna manera afectó el paso 2.
Según los documentos
Los módulos de Python en la carpeta de complementos se importan, y los enlaces, operadores, sensores, macros, ejecutores y vistas web se integran a las principales colecciones de Airflow y están disponibles para su uso.
y funciona bien en la versión 1.10.1
Tuve que actualizar la ruta del complemento en el archivo airflow.cfg
para solucionar el problema.
Donde se almacenan tus plugins de Airflow:
plugins_folder = /airflow/plugins
Yo uso el flujo de aire 1.10 . Si es un operador personalizado que desea importar, puede cargarlo en la carpeta de complementos de flujo de aire, y luego en el DAG especifique la importación como:
de [nombre de archivo] import [nombre de clase]
donde: nombre_archivo es el nombre de su archivo de complemento nombre de clase es el nombre de su clase.
Por ejemplo: si el nombre de su archivo es my_first_plugin y el nombre de la clase es MyFirstOperator entonces, la importación sería:
desde my_first_plugin import MyFirstOperator
Funcionó para mí ya que estoy usando el flujo de aire 1.10
Gracias ! Espero que esto ayude !!