Alibaba Cloud AnalyticDB Spark Operators¶
Overview¶
Airflow to Alibaba Cloud AnalyticDB Spark integration provides several operators to develop spark batch and sql applications.
Develop Spark batch applications¶
Purpose¶
This example dag uses AnalyticDBSparkBatchOperator
to submit Spark Pi and Spark Logistic regression applications.
Defining tasks¶
In the following code we submit Spark Pi and Spark Logistic regression applications.
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule=None,
default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
max_active_runs=1,
catchup=False,
) as dag:
spark_pi = AnalyticDBSparkBatchOperator(
task_id="task1",
file="local:///tmp/spark-examples.jar",
class_name="org.apache.spark.examples.SparkPi",
cluster_id="<your cluster id>",
rg_name="<your resource group name>",
)
spark_lr = AnalyticDBSparkBatchOperator(
task_id="task2",
file="local:///tmp/spark-examples.jar",
class_name="org.apache.spark.examples.SparkLR",
cluster_id="<your cluster id>",
rg_name="<your resource group name>",
)
# Replace the above cluster_id and rg_name with your own values.
spark_pi >> spark_lr
from tests_common.test_utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()