usar sirve settitle que para codigo java python apache-spark directed-acyclic-graphs airflow

sirve - settitle java



¿Cómo ejecutar el código Spark en Airflow? (3)

Hola gente de la tierra! Estoy usando Airflow para programar y ejecutar tareas Spark. Todo lo que encontré a estas alturas son los DAG de python que Airflow puede administrar.
Ejemplo DAG:

spark_count_lines.py import logging from airflow import DAG from airflow.operators import PythonOperator from datetime import datetime args = { ''owner'': ''airflow'' , ''start_date'': datetime(2016, 4, 17) , ''provide_context'': True } dag = DAG( ''spark_count_lines'' , start_date = datetime(2016, 4, 17) , schedule_interval = ''@hourly'' , default_args = args ) def run_spark(**kwargs): import pyspark sc = pyspark.SparkContext() df = sc.textFile(''file:///opt/spark/current/examples/src/main/resources/people.txt'') logging.info(''Number of lines in people.txt = {0}''.format(df.count())) sc.stop() t_main = PythonOperator( task_id = ''call_spark'' , dag = dag , python_callable = run_spark )

El problema es que no soy bueno en el código Python y tengo algunas tareas escritas en Java. Mi pregunta es ¿cómo ejecutar el Spark Java jar en python DAG? O tal vez hay otra manera de hacerlo? Encontré chispa enviar: http://spark.apache.org/docs/latest/submitting-applications.html
Pero no sé cómo conectar todo juntos. Tal vez alguien lo haya usado antes y tenga ejemplo de trabajo. ¡Gracias por tu tiempo!


Deberías poder utilizar BashOperator . Manteniendo el resto de su código tal como está, importe la clase requerida y los paquetes del sistema:

from airflow.operators.bash_operator import BashOperator import os import sys

establecer rutas requeridas:

os.environ[''SPARK_HOME''] = ''/path/to/spark/root'' sys.path.append(os.path.join(os.environ[''SPARK_HOME''], ''bin''))

y añadir operador:

spark_task = BashOperator( task_id=''spark_java'', bash_command=''spark-submit --class {{ params.class }} {{ params.jar }}'', params={''class'': ''MainClassName'', ''jar'': ''/path/to/your.jar''}, dag=dag )

Puede extender esto fácilmente para proporcionar argumentos adicionales utilizando plantillas Jinja.

Por supuesto, puede ajustar esto para un escenario que no sea Spark reemplazando bash_command con una plantilla adecuada para su caso, por ejemplo:

bash_command = ''java -jar {{ params.jar }}''

y ajustando params .


El flujo de aire a partir de la versión 1.8 (lanzado hoy), ha

Código SparkSQLHook: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

Código SparkSubmitHook - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

Tenga en cuenta que estos dos nuevos operadores / ganchos Spark están en la rama "contrib" a partir de la versión 1.8, por lo que no están (bien) documentados.

Así que puedes usar SparkSubmitOperator para enviar tu código java para la ejecución de Spark.


Hay un ejemplo del uso de SparkSubmitOperator para Spark 2.3.1 en kubernetes (instancia de minikube):

Code that goes along with the Airflow located at: http://airflow.readthedocs.org/en/latest/tutorial.html """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator from airflow.models import Variable from datetime import datetime, timedelta default_args = { ''owner'': ''[email protected]'', ''depends_on_past'': False, ''start_date'': datetime(2018, 7, 27), ''email'': [''[email protected]''], ''email_on_failure'': False, ''email_on_retry'': False, ''retries'': 1, ''retry_delay'': timedelta(minutes=5), # ''queue'': ''bash_queue'', # ''pool'': ''backfill'', # ''priority_weight'': 10, ''end_date'': datetime(2018, 7, 29), } dag = DAG( ''tutorial_spark_operator'', default_args=default_args, schedule_interval=timedelta(1)) t1 = BashOperator( task_id=''print_date'', bash_command=''date'', dag=dag) print_path_env_task = BashOperator( task_id=''print_path_env'', bash_command=''echo $PATH'', dag=dag) spark_submit_task = SparkSubmitOperator( task_id=''spark_submit_job'', conn_id=''spark_default'', java_class=''com.ibm.cdopoc.DataLoaderDB2COS'', application=''local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar'', total_executor_cores=''1'', executor_cores=''1'', executor_memory=''2g'', num_executors=''2'', name=''airflowspark-DataLoaderDB2COS'', verbose=True, driver_memory=''1g'', conf={ ''spark.DB_URL'': ''jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;'', ''spark.DB_USER'': Variable.get("CEDP_DB2_WoC_User"), ''spark.DB_PASSWORD'': Variable.get("CEDP_DB2_WoC_Password"), ''spark.DB_DRIVER'': ''com.ibm.db2.jcc.DB2Driver'', ''spark.DB_TABLE'': ''MKT_ATBTN.MERGE_STREAM_2000_REST_API'', ''spark.COS_API_KEY'': Variable.get("COS_API_KEY"), ''spark.COS_SERVICE_ID'': Variable.get("COS_SERVICE_ID"), ''spark.COS_ENDPOINT'': ''s3-api.us-geo.objectstorage.softlayer.net'', ''spark.COS_BUCKET'': ''data-ingestion-poc'', ''spark.COS_OUTPUT_FILENAME'': ''cedp-dummy-table-cos2'', ''spark.kubernetes.container.image'': ''ctipka/spark:spark-docker'', ''spark.kubernetes.authenticate.driver.serviceAccountName'': ''spark'' }, dag=dag, ) t1.set_upstream(print_path_env_task) spark_submit_task.set_upstream(t1)

El código que utiliza las variables almacenadas en las variables de flujo de aire:

Además, debe crear una nueva conexión de chispa o editar el ''spark_default'' existente con diccionario adicional {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"} :