mega license full crack airflow

license - airflow pc



fecha de ejecuciĆ³n en el flujo de aire: es necesario acceder como variable (5)

Realmente soy un novato en este foro. Pero he estado jugando con el flujo de aire, por algún tiempo, para nuestra compañía. Lo siento si esta pregunta suena realmente tonta.

Estoy escribiendo una tubería usando un montón de BashOperators. Básicamente, para cada tarea, quiero simplemente llamar a una API REST usando ''curl''

Así es como se ve mi tubería (versión muy simplificada):

from airflow import DAG from airflow.operators import BashOperator, PythonOperator from dateutil import tz import datetime datetime_obj = datetime.datetime default_args = { ''owner'': ''airflow'', ''depends_on_past'': False, ''start_date'': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()), ''email'': [''[email protected]''], ''email_on_failure'': True, ''email_on_retry'': False, ''retries'': 2, ''retry_delay'': datetime.timedelta(minutes=5), } current_datetime = datetime_obj.now(tz=tz.tzlocal()) dag = DAG( ''test_run'', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60)) curl_cmd=''curl -XPOST "''+hostname+'':8000/run?st=''+current_datetime +''"'' t1 = BashOperator( task_id=''rest-api-1'', bash_command=curl_cmd, dag=dag)

Si nota que estoy haciendo current_datetime= datetime_obj.now(tz=tz.tzlocal()) En lugar de eso, lo que quiero aquí es ''operation_date''

¿Cómo uso directamente''ecution_date '' y lo asigno a una variable en mi archivo python?

Tengo este problema general de acceder a args. Cualquier ayuda será realmente apreciada.

Gracias


Creo que no puede asignar variables con valores del contexto de flujo de aire fuera de una instancia de tarea, solo están disponibles en tiempo de ejecución. Básicamente, hay 2 pasos diferentes cuando se carga y ejecuta un dag en el flujo de aire:

  • Primero se interpreta y analiza su archivo dag. Tiene que funcionar y compilarse y las definiciones de tareas deben ser correctas (sin error de sintaxis ni nada). Durante este paso, si realiza llamadas a funciones para completar algunos valores, estas funciones no podrán acceder al contexto del flujo de aire (la fecha de ejecución, por ejemplo, aún más si está realizando un proceso de relleno).

  • El segundo paso es la ejecución del dag. Es solo durante este segundo paso que las variables proporcionadas por el flujo de aire (fecha de execution_date, ds, etc... ) están disponibles, ya que están relacionadas con una ejecución del dag.

Por lo tanto, no puede inicializar variables globales utilizando el contexto de Flujo de aire, sin embargo, Flujo de aire le brinda múltiples mecanismos para lograr el mismo efecto:

  1. Usando la plantilla jinja en su comando (puede estar en una cadena en el código o en un archivo, ambos serán procesados). Usted tiene la lista de plantillas disponibles aquí: https://airflow.apache.org/code.html#default-variables . Tenga en cuenta que algunas funciones también están disponibles, en particular para los días de cómputo del delta y de fecha.

  2. Utilizando un PythonOperator en el que se pasa el contexto (con el argumento proporcionando_contexto). Esto le permitirá acceder a la misma plantilla con la sintaxis kwargs[''<variable_name''] . Si lo necesita, puede devolver un valor desde un PythonOperator, este se almacenará en una variable XCOM que puede usar más adelante en cualquier plantilla. El acceso a las variables XCOM usa esta sintaxis: https://airflow.apache.org/concepts.html#xcoms

  3. Si escribe su propio operador, puede acceder a las variables de flujo de aire con el context dict.


El argumento bash_command es una plantilla . Puede acceder execution_date en cualquier plantilla como un objeto de datetime y datetime utilizando la variableecution_date. En la plantilla, puedes usar cualquier método jinja2 para manipularlo.

Usando lo siguiente como su cadena BashOperator bash_command :

# pass in the first of the current month some_command.sh {{ execution_date.replace(day=1) }} # last day of previous month some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

Si solo desea la cadena equivalente a la fecha de ejecución, ds devolverá una marca de fecha (YYYY-MM-DD), ds_nodash devuelve lo mismo sin guiones (YYYYMMDD), etc. Más información sobre macros está disponible en Api Docs .

Su operador final se vería así:

command = """curl -XPOST ''%(hostname)s:8000/run?st={{ ds }}''""" % locals() t1 = BashOperator( task_id=''rest-api-1'', bash_command=command, dag=dag)


El constructor de PythonOperator toma un parámetro ''proporcionar_contexto'' (vea https://pythonhosted.org/airflow/code.html ). Si es Verdadero, entonces pasa una serie de parámetros al python_callable a través de kwargs. Kwargs [''ecution_date ''] es lo que quieres, creo.

Algo como esto:

def python_method(ds, **kwargs): Variable.set(''execution_date'', kwargs[''execution_date'']) return doit = PythonOperator( task_id=''doit'', provide_context=True, python_callable=python_method, dag=dag)

No estoy seguro de cómo hacerlo con BashOperator, pero puede comenzar con este problema: https://github.com/airbnb/airflow/issues/775


La fecha de ejecución, (datetime.datetime)

{{ execution_date }}


def execute(self, context): execution_date = context.get("execution_date")

Esto debería estar dentro del método execute () de Operator.