Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Examples

Note

For a minimal quick start, see the Getting Started section.

Key Concepts

Defining DAGs

Example: Defining a DAG

Use the airflow.sdk.dag() decorator to convert a Python function into an Airflow DAG. All nested calls to airflow.sdk.task() within the function will become tasks in the DAG. For full parameters and usage, see the API reference for airflow.sdk.dag().

/opt/airflow/airflow-core/src/airflow/example_dags/example_dag_decorator.py

from typing import TYPE_CHECKING, Any

import httpx
import pendulum

from airflow.models.baseoperator import BaseOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import dag, task

if TYPE_CHECKING:
    from airflow.sdk import Context


class GetRequestOperator(BaseOperator):
    """Custom operator to send GET request to provided url"""

    template_fields = ("url",)

    def __init__(self, *, url: str, **kwargs):
        super().__init__(**kwargs)
        self.url = url

    def execute(self, context: Context):
        return httpx.get(self.url).json()


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_dag_decorator(url: str = "http://httpbin.org/get"):
    """
    DAG to get IP address and echo it via BashOperator.

    :param url: URL to get IP address from. Defaults to "http://httpbin.org/get".
    """
    get_ip = GetRequestOperator(task_id="get_ip", url=url)

    @task(multiple_outputs=True)
    def prepare_command(raw_json: dict[str, Any]) -> dict[str, str]:
        external_ip = raw_json["origin"]
        return {
            "command": f"echo 'Seems like today your server executing Airflow is connected from IP {external_ip}'",
        }

    command_info = prepare_command(get_ip.output)

    BashOperator(task_id="echo_ip_info", bash_command=command_info["command"])


example_dag = example_dag_decorator()

Decorators

Example: Using Task SDK decorators

The Task SDK provides decorators to simplify DAG definitions:

/opt/airflow/airflow-core/src/airflow/example_dags/example_task_group_decorator.py

import pendulum

from airflow.sdk import DAG, task, task_group


# Creating Tasks
@task
def task_start():
    """Empty Task which is First Task of Dag"""
    return "[Task_start]"


@task
def task_1(value: int) -> str:
    """Empty Task1"""
    return f"[ Task1 {value} ]"


@task
def task_2(value: str) -> str:
    """Empty Task2"""
    return f"[ Task2 {value} ]"


@task
def task_3(value: str) -> None:
    """Empty Task3"""
    print(f"[ Task3 {value} ]")


@task
def task_end() -> None:
    """Empty Task which is Last Task of Dag"""
    print("[ Task_End  ]")


# Creating TaskGroups
@task_group
def task_group_function(value: int) -> None:
    """TaskGroup for grouping related Tasks"""
    task_3(task_2(task_1(value)))


# Executing Tasks and TaskGroups
with DAG(
    dag_id="example_task_group_decorator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    start_task = task_start()
    end_task = task_end()
    for i in range(5):
        current_task_group = task_group_function(i)
        start_task >> current_task_group >> end_task

/opt/airflow/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py

import pendulum

from airflow.sdk import DAG, setup, task, task_group, teardown

with DAG(
    dag_id="example_setup_teardown_taskflow",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task
    def my_first_task():
        print("Hello 1")

    @task
    def my_second_task():
        print("Hello 2")

    @task
    def my_third_task():
        print("Hello 3")

    # you can set setup / teardown relationships with the `as_teardown` method.
    task_1 = my_first_task()
    task_2 = my_second_task()
    task_3 = my_third_task()
    task_1 >> task_2 >> task_3.as_teardown(setups=task_1)

    # The method `as_teardown` will mark task_3 as teardown, task_1 as setup, and
    # arrow task_1 >> task_3.
    # Now if you clear task_2, then its setup task, task_1, will be cleared in
    # addition to its teardown task, task_3

    # it's also possible to use a decorator to mark a task as setup or
    # teardown when you define it. see below.

    @setup
    def outer_setup():
        print("I am outer_setup")
        return "some cluster id"

    @teardown
    def outer_teardown(cluster_id):
        print("I am outer_teardown")
        print(f"Tearing down cluster: {cluster_id}")

    @task
    def outer_work():
        print("I am just a normal task")

    @task_group
    def section_1():
        @setup
        def inner_setup():
            print("I set up")
            return "some_cluster_id"

        @task
        def inner_work(cluster_id):
            print(f"doing some work with {cluster_id=}")

        @teardown
        def inner_teardown(cluster_id):
            print(f"tearing down {cluster_id=}")

        # this passes the return value of `inner_setup` to both `inner_work` and `inner_teardown`
        inner_setup_task = inner_setup()
        inner_work(inner_setup_task) >> inner_teardown(inner_setup_task)

    # by using the decorators, outer_setup and outer_teardown are already marked as setup / teardown
    # now we just need to make sure they are linked directly.  At a low level, what we need
    # to do so is the following::
    #     s = outer_setup()
    #     t = outer_teardown()
    #     s >> t
    #     s >> outer_work() >> t
    # Thus, s and t are linked directly, and outer_work runs in between.  We can take advantage of
    # the fact that we are in taskflow, along with the context manager on teardowns, as follows:
    with outer_teardown(outer_setup()):
        outer_work()

        # and let's put section 1 inside the outer setup and teardown tasks
        section_1()

Tasks and Operators

Example: Defining tasks and using operators

Use the airflow.sdk.task() decorator to wrap Python callables as tasks and leverage dynamic task mapping with the .expand() method. Tasks communicate via airflow.sdk.XComArg. For traditional operators and sensors, import classes like airflow.sdk.BaseOperator or airflow.sdk.Sensor.

/opt/airflow/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py

from datetime import datetime

from airflow.sdk import DAG, task

with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

with DAG(
    dag_id="example_task_mapping_second_order", schedule=None, catchup=False, start_date=datetime(2022, 3, 4)
) as dag2:

    @task
    def get_nums():
        return [1, 2, 3]

    @task
    def times_2(num):
        return num * 2

    @task
    def add_10(num):
        return num + 10

    _get_nums = get_nums()
    _times_2 = times_2.expand(num=_get_nums)
    add_10.expand(num=_times_2)

/opt/airflow/airflow-core/src/airflow/example_dags/example_xcomargs.py

import logging

import pendulum

from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, task

log = logging.getLogger(__name__)


@task
def generate_value():
    """Empty function"""
    return "Bring me a shrubbery!"


@task
def print_value(value, ts=None):
    """Empty function"""
    log.info("The knights of Ni say: %s (at %s)", value, ts)


with DAG(
    dag_id="example_xcom_args",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["example"],
) as dag:
    print_value(generate_value())

with DAG(
    "example_xcom_args_with_operators",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["example"],
) as dag2:
    bash_op1 = BashOperator(task_id="c", bash_command="echo c")
    bash_op2 = BashOperator(task_id="d", bash_command="echo c")
    xcom_args_a = print_value("first!")
    xcom_args_b = print_value("second!")

    bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2

Assets

Example: Defining and aliasing assets

Model data artifacts using the Task SDK’s asset API. Decorate functions with airflow.sdk.asset() and create aliases with airflow.sdk.AssetAlias. See the API reference under assets for full guidance.

/opt/airflow/airflow-core/src/airflow/example_dags/example_assets.py

import pendulum

from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, Asset
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

dag1_asset = Asset("s3://dag1/output_1.txt", extra={"hi": "bye"})
dag2_asset = Asset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_asset = Asset("s3://dag3/output_3.txt", extra={"hi": "bye"})

with DAG(
    dag_id="asset_produces_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="@daily",
    tags=["produces", "asset-scheduled"],
) as dag1:
    # [START task_outlet]
    BashOperator(outlets=[dag1_asset], task_id="producing_task_1", bash_command="sleep 5")
    # [END task_outlet]

with DAG(
    dag_id="asset_produces_2",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=None,
    tags=["produces", "asset-scheduled"],
) as dag2:
    BashOperator(outlets=[dag2_asset], task_id="producing_task_2", bash_command="sleep 5")

# [START dag_dep]
with DAG(
    dag_id="asset_consumes_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[dag1_asset],
    tags=["consumes", "asset-scheduled"],
) as dag3:
    # [END dag_dep]
    BashOperator(
        outlets=[Asset("s3://consuming_1_task/asset_other.txt")],
        task_id="consuming_1",
        bash_command="sleep 5",
    )

with DAG(
    dag_id="asset_consumes_1_and_2",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[dag1_asset, dag2_asset],
    tags=["consumes", "asset-scheduled"],
) as dag4:
    BashOperator(
        outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
        task_id="consuming_2",
        bash_command="sleep 5",
    )

with DAG(
    dag_id="asset_consumes_1_never_scheduled",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[
        dag1_asset,
        Asset("s3://unrelated/this-asset-doesnt-get-triggered"),
    ],
    tags=["consumes", "asset-scheduled"],
) as dag5:
    BashOperator(
        outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
        task_id="consuming_3",
        bash_command="sleep 5",
    )

with DAG(
    dag_id="asset_consumes_unknown_never_scheduled",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[
        Asset("s3://unrelated/asset3.txt"),
        Asset("s3://unrelated/asset_other_unknown.txt"),
    ],
    tags=["asset-scheduled"],
) as dag6:
    BashOperator(
        task_id="unrelated_task",
        outlets=[Asset("s3://unrelated_task/asset_other_unknown.txt")],
        bash_command="sleep 5",
    )

with DAG(
    dag_id="consume_1_and_2_with_asset_expressions",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=(dag1_asset & dag2_asset),
) as dag5:
    BashOperator(
        outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
        task_id="consume_1_and_2_with_asset_expressions",
        bash_command="sleep 5",
    )
with DAG(
    dag_id="consume_1_or_2_with_asset_expressions",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=(dag1_asset | dag2_asset),
) as dag6:
    BashOperator(
        outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
        task_id="consume_1_or_2_with_asset_expressions",
        bash_command="sleep 5",
    )
with DAG(
    dag_id="consume_1_or_both_2_and_3_with_asset_expressions",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=(dag1_asset | (dag2_asset & dag3_asset)),
) as dag7:
    BashOperator(
        outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
        task_id="consume_1_or_both_2_and_3_with_asset_expressions",
        bash_command="sleep 5",
    )
with DAG(
    dag_id="conditional_asset_and_time_based_timetable",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=AssetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), assets=(dag1_asset & dag2_asset)
    ),
    tags=["asset-time-based-timetable"],
) as dag8:
    BashOperator(
        outlets=[Asset("s3://asset_time_based/asset_other_unknown.txt")],
        task_id="conditional_asset_and_time_based_timetable",
        bash_command="sleep 5",
    )

/opt/airflow/airflow-core/src/airflow/example_dags/example_asset_alias.py

import pendulum

from airflow.sdk import DAG, Asset, AssetAlias, task

with DAG(
    dag_id="asset_s3_bucket_producer",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=["producer", "asset"],
):

    @task(outlets=[Asset("s3://bucket/my-task")])
    def produce_asset_events():
        pass

    produce_asset_events()

with DAG(
    dag_id="asset_alias_example_alias_producer",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=["producer", "asset-alias"],
):

    @task(outlets=[AssetAlias("example-alias")])
    def produce_asset_events_through_asset_alias(*, outlet_events=None):
        bucket_name = "bucket"
        object_path = "my-task"
        outlet_events[AssetAlias("example-alias")].add(Asset(f"s3://{bucket_name}/{object_path}"))

    produce_asset_events_through_asset_alias()

with DAG(
    dag_id="asset_s3_bucket_consumer",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[Asset("s3://bucket/my-task")],
    catchup=False,
    tags=["consumer", "asset"],
):

    @task
    def consume_asset_event():
        pass

    consume_asset_event()

with DAG(
    dag_id="asset_alias_example_alias_consumer",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[AssetAlias("example-alias")],
    catchup=False,
    tags=["consumer", "asset-alias"],
):

    @task(inlets=[AssetAlias("example-alias")])
    def consume_asset_event_from_asset_alias(*, inlet_events=None):
        for event in inlet_events[AssetAlias("example-alias")]:
            print(event)

    consume_asset_event_from_asset_alias()

TaskFlow API Tutorial

This section provides a concise, code-first view. For the full tutorial and context, see the core TaskFlow tutorial.

Step 1: Define the DAG

In this step, define your DAG by applying the airflow.sdk.dag() decorator to a Python function. This registers the DAG with its schedule and default arguments. For more details, see airflow.sdk.dag().

/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

Step 2: Write your Tasks

/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py

@task()
def extract():
    """
    #### Extract task
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data is simulated by reading from a
    hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)
    return order_data_dict

/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}

/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py

@task()
def load(total_order_value: float):
    """
    #### Load task
    A simple Load task which takes in the result of the Transform task and
    instead of saving it to end user review, just prints it out.
    """

    print(f"Total order value is: {total_order_value:.2f}")

Step 3: Build the Flow

/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

Step 4: Invoke the DAG

/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py

tutorial_taskflow_api()

Was this entry helpful?