Deferred vs Async Operators

Added in version 3.2.0.

Airflow 3.2 introduces Python-native async support for tasks, allowing concurrent I/O within a single worker slot. This page explains how async operators differ from deferred operators and when to use each.

Deferred Operators

A deferred operator is an operator that can pause its execution until an external trigger event occurs, without holding a worker slot. For more details see Deferrable Operators & Triggers. Examples include the HttpOperator in deferrable mode, sensors or operators integrated with triggers.

Key characteristics:

  • Execution is paused while waiting for external events or resources.

  • Worker slots are freed during the wait, improving resource efficiency.

  • Ideal for scenarios where a single external event or a small number of events dictate task completion.

  • Typically simpler to use, as the deferred operator handles all async logic.

Async Python Operators

Python native async operators allow you to write tasks that leverage Python’s asyncio:

  • Tasks can perform many concurrent I/O operations efficiently within a single worker slot sharing the same event loop.

  • Task code uses async/await syntax with async-compatible hooks, such as HttpAsyncHook or the SFTPHookAsync.

Ideal when you need to perform high-throughput operations (e.g., many HTTP requests, database calls, or API interactions) within a single task instance, or when there is no deferred operator available but there is an async hook available.

When to Use Deferred Operators

Prefer a deferred operator when:

  • There is an existing deferrable operator that covers your use case (e.g., HttpOperator deferrable mode).

  • The task waits for a single or limited external events.

  • You want to free worker resources while waiting for triggers.

  • You don’t need to loop over the same operator multiple times (e.g. multiplexing).

from airflow.sdk import dag
from airflow.providers.http.operators.http import HttpOperator


@dag(schedule=None)
def deferred_http_operator_dag():

    get_op_task = HttpOperator(
        http_conn_id="http_conn_id",
        task_id="get_op",
        method="GET",
        endpoint="get",
        data={"param1": "value1", "param2": "value2"},
        deferrable=True,
    )


deferred_http_operator_dag()

When to Use Async Python Operators

Use async Python operators when:

  • The task needs to perform many concurrent requests or operations within a single task.

  • You want to take advantage of the shared event loop to improve throughput.

  • There is simply no deferred operator available.

  • The logic depends on custom Python code (e.g. callables or lambdas) that cannot easily be implemented in a trigger, since triggers must be serializable and do not have access to DAG code at runtime.

Note

The HttpAsyncHook depends on aiohttp, which is installed automatically with the apache-airflow-providers-http provider.

Simple Async Example

The following example demonstrates the basic syntax for writing an async task using async/await.

Note

For a single request like this, a deferrable operator (such as HttpOperator with deferrable=True) is usually preferred. Deferred operators release the worker slot while waiting for the external request to complete.

This example is provided mainly to illustrate the structure of an async task. Async operators become most useful when performing many concurrent operations within the same task (see the multiplexing example below), or when implementing logic such as pagination where multiple requests need to be executed sequentially or concurrently within a single task instance.

from aiohttp import ClientSession
from airflow.providers.http.hooks.http import HttpAsyncHook
from airflow.sdk import dag, task


@dag(schedule=None)
def async_http_operator_dag():

    @task
    async def get_op():
        hook = HttpAsyncHook(http_conn_id="http_conn_id", method="GET")

        async with ClientSession() as session:
            response = await hook.run(
                session=session,
                endpoint="get",
                data={"param1": "value1", "param2": "value2"},
            )
            return await response.json()

    get_op()


async_http_operator_dag()

Async Multiplexing Example

Async operators become particularly useful when making many concurrent requests within a single task. The following example executes multiple HTTP requests concurrently using asyncio.gather while limiting concurrency with a semaphore.

import asyncio
from aiohttp import ClientSession
from airflow.providers.http.hooks.http import HttpAsyncHook
from airflow.sdk import dag, task

parameters = [
    {"param1": "value1", "param2": "value2"},
    {"param1": "value3", "param2": "value4"},
    {"param1": "value5", "param2": "value6"},
    {"param1": "value7", "param2": "value8"},
]


@dag(schedule=None)
def async_http_multiplex_dag():

    @task
    async def get_op(parameters: list[dict[str, str]]):
        hook = HttpAsyncHook(http_conn_id="http_conn_id", method="GET")

        # Limit concurrent requests to avoid overwhelming downstream services
        semaphore = asyncio.Semaphore(5)

        async def fetch(session, params):
            async with semaphore:
                response = await hook.run(
                    session=session,
                    endpoint="get",
                    data=params,
                )
                return await response.json()

        async with ClientSession() as session:
            tasks = [fetch(session, params) for params in parameters]

            # Run requests concurrently in the shared event loop
            return await asyncio.gather(*tasks)

    get_op(parameters)


async_http_multiplex_dag()

Note

The upcoming Dynamic Task Iteration feature will simplify patterns like this. Instead of manually managing concurrency with constructs such as asyncio.gather and asyncio.Semaphore, authors will be able to iterate over asynchronous results directly in downstream tasks while still benefiting from a shared event loop. This will make high-throughput patterns such as pagination or request multiplexing easier to implement.

MS Graph Async Example

Another example using the KiotaRequestAdapterHook, which is async-only, to fetch all users from the Microsoft Graph API.

In this example, multiple paginated requests are expected in order to retrieve all users. Using an async Python task is appropriate here because the KiotaRequestAdapterHook handles pagination internally and performs the requests asynchronously. This allows multiple paginated requests to be performed efficiently within a single task instance and worker slot.

from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.sdk import dag, task


@dag(schedule=None)
def async_msgraph_dag():

    @task
    async def get_users():
        hook = KiotaRequestAdapterHook.get_hook(conn_id="msgraph_default")

        return await hook.paginated_run(url="users")

    get_users()


async_msgraph_dag()

When not to use Deferred vs Async Operators

While the previous sections explain when to prefer deferred or async operators, it is equally important to understand scenarios where one may not be appropriate:

  • Avoid async operators for long waits Async operators keep the task process alive while waiting. If your task involves long-running operations, such as slow APIs or external triggers, this can waste worker resources. In such cases, prefer a deferred operator, which releases the worker slot while waiting.

  • Avoid deferred operators for many short or repeated waits Deferred operators pause and resume tasks each time they defer. If deferrals occur frequently or last only a short time, the overhead of stopping and restarting the task can reduce efficiency. For high-frequency short waits, an async operator may be more suitable.

  • Operator availability should not dictate choice Having a built-in deferrable operator can simplify implementation, but the decision should be driven by the use case, not just what Airflow provides. If your workflow is better suited for deferring but no operator exists yet, consider implementing a custom deferred operator rather than defaulting to async.

Comparison with Dynamic Task Mapping

Async operators and Dynamic Task Mapping solve different problems and have different trade-offs.

Aspect

Async @task

Dynamic Task Mapping (deferrable)

Worker slots

1 worker slot (shared event loop)

N worker slots (one per mapped task instance)

Concurrency model

Async I/O inside a single task

Parallel task instances scheduled by the scheduler

Retry behavior

Whole task retries

Individual mapped tasks can retry independently

UI visibility

Appears as a single task

Each mapped task is visible separately

Scheduler overhead

Minimal

Scheduler must manage N task instances

For more details about Dynamic Task Mapping, see the dynamic task mapping page.

Was this entry helpful?