Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Dynamic Task Mapping with Task SDK

Dynamic Task Mapping allows tasks defined with the Task SDK to generate a variable number of task instances at runtime based on upstream data. This is enabled via the expand() method on tasks, providing a way to parallelize execution without knowing the number of tasks ahead of time.

Simple Mapping

Map over a Python list directly in the DAG:

from datetime import datetime

from airflow.sdk import DAG, task


@task
def add_one(x: int):
    return x + 1


@task
def sum_it(values: list[int]):
    print(f"Total was {sum(values)}")


with DAG(dag_id="dynamic-map-simple", start_date=datetime(2022, 1, 1)) as dag:
    summed = sum_it(values=add_one.expand(x=[1, 2, 3, 4, 5]))

Task-Generated Mapping

Generate the list at runtime from an upstream task:

@task
def make_list():
    # This could fetch data from an API, database, etc.
    return ["a", "b", "c"]


@task
def consume(item: str):
    print(item)


with DAG(dag_id="dynamic-map-generated", start_date=datetime(2022, 1, 1)) as dag:
    consume.expand(item=make_list())

Details

  • Only keyword arguments can be passed to expand().

  • Mapped inputs are provided to tasks as lazy proxy objects. To force evaluation into a concrete list, wrap the proxy in list().

  • Combine static parameters with mapped ones using partial():

    @task
    def add(x: int, y: int):
        return x + y
    
    
    with DAG(dag_id="map-with-partial", start_date=datetime(2022, 1, 1)) as dag:
        add.partial(y=10).expand(x=[1, 2, 3])
    

Advanced Usage

For advanced patterns—such as repeated mapping, cross-product mapping, named mappings (via map_index_template), and handling large datasets—see the Airflow Core documentation:

Dynamic Task Mapping in the Airflow Core docs.

Was this entry helpful?