tutorial mochilas full español colchones airflow

airflow - mochilas - No se pueden importar complementos de flujo de aire



airflow scheduler (6)

Siguiendo el tutorial de Airflow here .

Problema : el servidor web devuelve el siguiente error

Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name MyFirstOperator

Notas: La estructura del directorio se ve así:

airflow_home ├── airflow.cfg ├── airflow.db ├── dags │ └── test_operators.py ├── plugins │ └── my_operators.py └── unittests.cfg

Estoy intentando importar el complemento en ''test_operators.py'' como este:

from airflow.operators import MyFirstOperator

El código es el mismo que se encuentra en el tutorial.


En el artículo le gusta esto:

class MyFirstPlugin(AirflowPlugin): name = "my_first_plugin" operators = [MyFirstOperator]

En su lugar, utilice:

class MyFirstPlugin(AirflowPlugin): name = "my_first_plugin" operators = [MyFirstOperator] # A list of class(es) derived from BaseHook hooks = [] # A list of class(es) derived from BaseExecutor executors = [] # A list of references to inject into the macros namespace macros = [] # A list of objects created from a class derived # from flask_admin.BaseView admin_views = [] # A list of Blueprint object created from flask.Blueprint flask_blueprints = [] # A list of menu links (flask_admin.base.MenuLink) menu_links = []

Tampoco uses:

from airflow.operators import MyFirstOperator

De acuerdo con el artículo de flujo de aire en los complementos, debe ser:

from airflow.operators.my_first_plugin import MyFirstOperator

Si eso no funciona, intente:

from airflow.operators.my_operators import MyFirstOperator

Si eso no funciona, revise el inicio de sesión de su servidor web para obtener más información.


Encontré el mismo error al seguir here .

Sin embargo, mi culpa fue que había usado el carácter de espacio '' '' en task_id , que no es compatible con Airflow .

Claramente el error no apuntaba hacia el problema real. Al reiniciar el scheduler flujo de aire y el webserver se mostró el mensaje de error correcto en WebUI .


Reinicié el servidor web, y ahora todo funciona bien.

Esto es lo que creo que podría haber sucedido:

  1. Antes de comenzar con el ejemplo del tutorial, intenté ejecutar mi propio plugin y dag. Hubo un pequeño error de sintaxis en la primera ejecución que reparé, sin embargo, después de la corrección empecé a recibir el error "no se puede importar el nombre".
  2. Eliminé el plugin y el dag, y traté de usar el del tutorial para ver qué estaba pasando.

Mi conjetura es que el error del paso 1 de alguna manera afectó el paso 2.


Según los documentos

Los módulos de Python en la carpeta de complementos se importan, y los enlaces, operadores, sensores, macros, ejecutores y vistas web se integran a las principales colecciones de Airflow y están disponibles para su uso.

y funciona bien en la versión 1.10.1


Tuve que actualizar la ruta del complemento en el archivo airflow.cfg para solucionar el problema.

Donde se almacenan tus plugins de Airflow:

plugins_folder = /airflow/plugins


Yo uso el flujo de aire 1.10 . Si es un operador personalizado que desea importar, puede cargarlo en la carpeta de complementos de flujo de aire, y luego en el DAG especifique la importación como:

de [nombre de archivo] import [nombre de clase]

donde: nombre_archivo es el nombre de su archivo de complemento nombre de clase es el nombre de su clase.

Por ejemplo: si el nombre de su archivo es my_first_plugin y el nombre de la clase es MyFirstOperator entonces, la importación sería:

desde my_first_plugin import MyFirstOperator

Funcionó para mí ya que estoy usando el flujo de aire 1.10

Gracias ! Espero que esto ayude !!