Pythonic DAGs with the TaskFlow API

In the first tutorial, you built your first Airflow DAG using traditional Operators like PythonOperator. Now let’s look at a more modern and Pythonic way to write workflows using the TaskFlow API — introduced in Airflow 2.0.

The TaskFlow API is designed to make your code simpler, cleaner, and easier to maintain. You write plain Python functions, decorate them, and Airflow handles the rest — including task creation, dependency wiring, and passing data between tasks.

In this tutorial, we’ll create a simple ETL pipeline — Extract → Transform → Load using the TaskFlow API. Let’s dive in!

The Big Picture: A TaskFlow Pipeline

Here’s what the full pipeline looks like using TaskFlow. Don’t worry if some of it looks unfamiliar — we’ll break it down step-by-step.

src/airflow/example_dags/tutorial_taskflow_api.py


import json

import pendulum

from airflow.sdk import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()

Step 1: Define the DAG

Just like before, your DAG is a Python script that Airflow loads and parses. But this time, we’re using the @dag decorator to define it.

src/airflow/example_dags/tutorial_taskflow_api.py

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

To make this DAG discoverable by Airflow, we can call the Python function that was decorated with @dag:

src/airflow/example_dags/tutorial_taskflow_api.py

tutorial_taskflow_api()

Changed in version 2.4: If you’re using the @dag decorator or defining your DAG in a with block, you no longer need to assign it to a global variable. Airflow will find it automatically.

You can visualize your DAG in the Airflow UI! Once your DAG is loaded, navigate to the Graph View to see how tasks are connected.

Step 2: Write Your Tasks with @task

With Taskflow, each task is just a regular Python function. You can use the @task decorator to turn it into a task that Airflow can schedule and run. Here’s the extract task:

src/airflow/example_dags/tutorial_taskflow_api.py

@task()
def extract():
    """
    #### Extract task
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data is simulated by reading from a
    hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)
    return order_data_dict


The function’s return value is passed to the next task — no manual use of XComs required. Under the hood, TaskFlow uses XComs to manage data passing automatically, abstracting away the complexity of manual XCom management from the previous methods. You’ll define transform and load tasks using the same pattern.

Notice the use of @task(multiple_outputs=True) above — this tells Airflow that the function returns a dictionary of values that should be split into individual XComs. Each key in the returned dictionary becomes its own XCom entry, which makes it easy to reference specific values in downstream tasks. If you omit multiple_outputs=True, the entire dictionary is stored as a single XCom instead, and must be accessed as a whole.

Step 3: Build the Flow

Once the tasks are defined, you can build the pipeline by simply calling them like Python functions. Airflow uses this functional invocation to set task dependencies and manage data passing.

src/airflow/example_dags/tutorial_taskflow_api.py

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

That’s it! Airflow knows how to schedule and orchestrate your pipeline from this code alone.

Running Your DAG

To enable and trigger your DAG:

  1. Navigate to the Airflow UI.

  2. Find your DAG in the list and click the toggle to enable it.

  3. You can trigger it manually by clicking the “Trigger DAG” button, or wait for it to run on its schedule.

What’s Happening Behind the Scenes?

If you’ve used Airflow 1.x, this probably feels like magic. Let’s compare what’s happening under the hood.

The “Old Way”: Manual Wiring and XComs

Before the TaskFlow API, you had to use Operators like PythonOperator and pass data manually between tasks using XComs.

Here’s what the same DAG might have looked like using the traditional approach:

import json
import pendulum
from airflow.sdk import DAG, PythonOperator


def extract():
    # Old way: simulate extracting data from a JSON string
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)


def transform(ti):
    # Old way: manually pull from XCom
    order_data_dict = ti.xcom_pull(task_ids="extract")
    total_order_value = sum(order_data_dict.values())
    return {"total_order_value": total_order_value}


def load(ti):
    # Old way: manually pull from XCom
    total = ti.xcom_pull(task_ids="transform")["total_order_value"]
    print(f"Total order value is: {total:.2f}")


with DAG(
    dag_id="legacy_etl_pipeline",
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    load_task = PythonOperator(task_id="load", python_callable=load)

    extract_task >> transform_task >> load_task

Note

This version produces the same result as the TaskFlow API example, but requires explicit management of XComs and task dependencies.

The Taskflow Way

Using TaskFlow, all of this is handled automatically.

src/airflow/example_dags/tutorial_taskflow_api.py


import json

import pendulum

from airflow.sdk import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()


Airflow still uses XComs and builds a dependency graph — it’s just abstracted away so you can focus on your business logic.

How XComs Work

TaskFlow return values are stored as XComs automatically. These values can be inspected in the UI under the “XCom” tab. Manual xcom_pull() is still possible for traditional operators.

Error Handling and Retries

You can easily configure retries for your tasks using decorators. For example, you can set a maximum number of retries directly in the task decorator:

@task(retries=3)
def my_task(): ...

This helps ensure that transient failures do not lead to task failure.

Task Parameterization

You can reuse decorated tasks in multiple DAGs and override parameters like task_id or retries.

start = add_task.override(task_id="start")(1, 2)

You can even import decorated tasks from a shared module.

What to Explore Next

Nice work! You’ve now written your first pipeline using the TaskFlow API. Curious where to go from here?

  • Add a new task to the DAG – maybe a filter or validation step

  • Modify return values and pass multiple outputs

  • Explore retries and overrides with .override(task_id="...")

  • Open the Airflow UI and inspect how the data flows between tasks, including task logs and dependencies

See also

Advanced Taskflow Patterns

Once you’re comfortable with the basics, here are a few powerful techniques you can try.

Reusing Decorated Tasks

You can reuse decorated tasks across multiple DAGs or DAG runs. This is especially useful for common logic like reusable utilities or shared business rules. Use .override() to customize task metadata like task_id or retries.

start = add_task.override(task_id="start")(1, 2)

You can even import decorated tasks from a shared module.

Handling Conflicting Dependencies

Sometimes tasks require different Python dependencies than the rest of your DAG — for example, specialized libraries or system-level packages. TaskFlow supports multiple execution environments to isolate those dependencies.

Dynamically Created Virtualenv

Creates a temporary virtualenv at task runtime. Great for experimental or dynamic tasks, but may have cold start overhead.

src/airflow/example_dags/example_python_decorator.py

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

External Python Environment

Executes the task using a pre-installed Python interpreter — ideal for consistent environments or shared virtualenvs.

src/airflow/example_dags/example_python_decorator.py

@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = callable_external_python()

Docker Environment

Runs your task in a Docker container. Useful for packaging everything the task needs — but requires Docker to be available on your worker.

docker/tests/system/docker/example_taskflow_api_docker_virtualenv.py

@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}


Note

Requires Airflow 2.2 and the Docker provider.

KubernetesPodOperator

Runs your task inside a Kubernetes pod, fully isolated from the main Airflow environment. Ideal for large tasks or tasks requiring custom runtimes.

cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_decorator.py

@task.kubernetes(
    image="python:3.9-slim-buster",
    name="k8s_test",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
    import time

    print("Hello from k8s pod")
    time.sleep(2)

@task.kubernetes(image="python:3.9-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
    n = 5
    for i in range(n):
        # inner loop to handle number of columns
        # values changing acc. to outer loop
        for _ in range(i + 1):
            # printing stars
            print("* ", end="")

        # ending line after each row
        print("\r")

execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()

execute_in_k8s_pod_instance >> print_pattern_instance

Note

Requires Airflow 2.4 and the Kubernetes provider.

Using Sensors

Use @task.sensor to build lightweight, reusable sensors using Python functions. These support both poke and reschedule modes.

src/airflow/example_dags/example_sensor_decorator.py


import pendulum

from airflow.sdk import PokeReturnValue, dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_sensor_decorator():
    # Using a sensor operator to wait for the upstream data to be ready.
    @task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
    def wait_for_upstream() -> PokeReturnValue:
        return PokeReturnValue(is_done=True, xcom_value="xcom_value")
    @task
    def dummy_operator() -> None:
        pass
    wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()

Mixing with Traditional Tasks

You can combine decorated tasks with classic Operators. This is helpful when using community providers or when migrating incrementally to TaskFlow.

You can chain Taskflow and traditional tasks using >> or pass data using the .output attribute.

Templating in TaskFlow

Like traditional tasks, decorated TaskFlow functions support templated arguments — including loading content from files or using runtime parameters.

When running your callable, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your Jinja templates. For this to work, you can add context keys you would like to receive in the function as keyword arguments.

For example, the callable in the code block below will get values of the ti and next_ds context variables:

@task
def my_python_callable(*, ti, next_ds):
    pass

You can also choose to receive the entire context with **kwargs. Note that this can incur a slight performance penalty since Airflow will need to expand the entire context that likely contains many things you don’t actually need. It is therefore more recommended for you to use explicit arguments, as demonstrated in the previous paragraph.

@task
def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]

Also, sometimes you might want to access the context somewhere deep in the stack, but you do not want to pass the context variables from the task callable. You can still access execution context via the get_current_context method.

from airflow.providers.standard.operators.python import get_current_context


def some_function_in_your_library():
    context = get_current_context()
    ti = context["ti"]

Arguments passed to decorated functions are automatically templated. You can also template file using templates_exts:

@task(templates_exts=[".sql"])
def read_sql(sql): ...

Conditional Execution

Use @task.run_if() or @task.skip_if() to control whether a task runs based on dynamic conditions at runtime — without altering your DAG structure.

@task.run_if(lambda ctx: ctx["task_instance"].task_id == "run")
@task.bash()
def echo():
    return "echo 'run'"

What’s Next

Now that you’ve seen how to build clean, maintainable DAGs using the TaskFlow API, here are some good next steps:

Was this entry helpful?