Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Alibaba Cloud AnalyticDB Spark Operators

Overview

Airflow to Alibaba Cloud AnalyticDB Spark integration provides several operators to develop spark batch and sql applications.

Develop Spark batch applications

Purpose

This example dag uses AnalyticDBSparkBatchOperator to submit Spark Pi and Spark Logistic regression applications.

Defining tasks

In the following code we submit Spark Pi and Spark Logistic regression applications.

tests/system/alibaba/example_adb_spark_batch.py[source]

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2021, 1, 1),
    schedule=None,
    default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    max_active_runs=1,
    catchup=False,
) as dag:
    spark_pi = AnalyticDBSparkBatchOperator(
        task_id="task1",
        file="local:///tmp/spark-examples.jar",
        class_name="org.apache.spark.examples.SparkPi",
        cluster_id="<your cluster id>",
        rg_name="<your resource group name>",
    )

    spark_lr = AnalyticDBSparkBatchOperator(
        task_id="task2",
        file="local:///tmp/spark-examples.jar",
        class_name="org.apache.spark.examples.SparkLR",
        cluster_id="<your cluster id>",
        rg_name="<your resource group name>",
    )

    #  Replace the above cluster_id and rg_name with your own values.

    spark_pi >> spark_lr

    from tests_common.test_utils.watcher import watcher

    # This test needs watcher in order to properly mark success/failure
    # when "tearDown" task with trigger rule is part of the DAG
    list(dag.tasks) >> watcher()

Was this entry helpful?