sirve - settitle java
¿Cómo ejecutar el código Spark en Airflow? (3)
Hola gente de la tierra! Estoy usando Airflow para programar y ejecutar tareas Spark. Todo lo que encontré a estas alturas son los DAG de python que Airflow puede administrar.
Ejemplo DAG:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
''owner'': ''airflow''
, ''start_date'': datetime(2016, 4, 17)
, ''provide_context'': True
}
dag = DAG(
''spark_count_lines''
, start_date = datetime(2016, 4, 17)
, schedule_interval = ''@hourly''
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile(''file:///opt/spark/current/examples/src/main/resources/people.txt'')
logging.info(''Number of lines in people.txt = {0}''.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = ''call_spark''
, dag = dag
, python_callable = run_spark
)
El problema es que no soy bueno en el código Python y tengo algunas tareas escritas en Java. Mi pregunta es ¿cómo ejecutar el Spark Java jar en python DAG? O tal vez hay otra manera de hacerlo? Encontré chispa enviar: http://spark.apache.org/docs/latest/submitting-applications.html
Pero no sé cómo conectar todo juntos. Tal vez alguien lo haya usado antes y tenga ejemplo de trabajo. ¡Gracias por tu tiempo!
Deberías poder utilizar BashOperator
. Manteniendo el resto de su código tal como está, importe la clase requerida y los paquetes del sistema:
from airflow.operators.bash_operator import BashOperator
import os
import sys
establecer rutas requeridas:
os.environ[''SPARK_HOME''] = ''/path/to/spark/root''
sys.path.append(os.path.join(os.environ[''SPARK_HOME''], ''bin''))
y añadir operador:
spark_task = BashOperator(
task_id=''spark_java'',
bash_command=''spark-submit --class {{ params.class }} {{ params.jar }}'',
params={''class'': ''MainClassName'', ''jar'': ''/path/to/your.jar''},
dag=dag
)
Puede extender esto fácilmente para proporcionar argumentos adicionales utilizando plantillas Jinja.
Por supuesto, puede ajustar esto para un escenario que no sea Spark reemplazando bash_command
con una plantilla adecuada para su caso, por ejemplo:
bash_command = ''java -jar {{ params.jar }}''
y ajustando params
.
El flujo de aire a partir de la versión 1.8 (lanzado hoy), ha
- SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py ;
Código SparkSQLHook: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
- SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
Código SparkSubmitHook - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
Tenga en cuenta que estos dos nuevos operadores / ganchos Spark están en la rama "contrib" a partir de la versión 1.8, por lo que no están (bien) documentados.
Así que puedes usar SparkSubmitOperator para enviar tu código java para la ejecución de Spark.
Hay un ejemplo del uso de SparkSubmitOperator
para Spark 2.3.1 en kubernetes (instancia de minikube):
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
''owner'': ''[email protected]'',
''depends_on_past'': False,
''start_date'': datetime(2018, 7, 27),
''email'': [''[email protected]''],
''email_on_failure'': False,
''email_on_retry'': False,
''retries'': 1,
''retry_delay'': timedelta(minutes=5),
# ''queue'': ''bash_queue'',
# ''pool'': ''backfill'',
# ''priority_weight'': 10,
''end_date'': datetime(2018, 7, 29),
}
dag = DAG(
''tutorial_spark_operator'', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id=''print_date'',
bash_command=''date'',
dag=dag)
print_path_env_task = BashOperator(
task_id=''print_path_env'',
bash_command=''echo $PATH'',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id=''spark_submit_job'',
conn_id=''spark_default'',
java_class=''com.ibm.cdopoc.DataLoaderDB2COS'',
application=''local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar'',
total_executor_cores=''1'',
executor_cores=''1'',
executor_memory=''2g'',
num_executors=''2'',
name=''airflowspark-DataLoaderDB2COS'',
verbose=True,
driver_memory=''1g'',
conf={
''spark.DB_URL'': ''jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;'',
''spark.DB_USER'': Variable.get("CEDP_DB2_WoC_User"),
''spark.DB_PASSWORD'': Variable.get("CEDP_DB2_WoC_Password"),
''spark.DB_DRIVER'': ''com.ibm.db2.jcc.DB2Driver'',
''spark.DB_TABLE'': ''MKT_ATBTN.MERGE_STREAM_2000_REST_API'',
''spark.COS_API_KEY'': Variable.get("COS_API_KEY"),
''spark.COS_SERVICE_ID'': Variable.get("COS_SERVICE_ID"),
''spark.COS_ENDPOINT'': ''s3-api.us-geo.objectstorage.softlayer.net'',
''spark.COS_BUCKET'': ''data-ingestion-poc'',
''spark.COS_OUTPUT_FILENAME'': ''cedp-dummy-table-cos2'',
''spark.kubernetes.container.image'': ''ctipka/spark:spark-docker'',
''spark.kubernetes.authenticate.driver.serviceAccountName'': ''spark''
},
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
El código que utiliza las variables almacenadas en las variables de flujo de aire:
Además, debe crear una nueva conexión de chispa o editar el ''spark_default'' existente con diccionario adicional {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}
: