Dynamic Task Mapping with Task SDK¶
Dynamic Task Mapping allows tasks defined with the Task SDK to generate
a variable number of task instances at runtime based on upstream data.
This is enabled via the expand()
method on tasks, providing a way
to parallelize execution without knowing the number of tasks ahead of time.
Simple Mapping¶
Map over a Python list directly in the DAG:
from datetime import datetime
from airflow.sdk import DAG, task
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values: list[int]):
print(f"Total was {sum(values)}")
with DAG(dag_id="dynamic-map-simple", start_date=datetime(2022, 1, 1)) as dag:
summed = sum_it(values=add_one.expand(x=[1, 2, 3, 4, 5]))
Task-Generated Mapping¶
Generate the list at runtime from an upstream task:
@task
def make_list():
# This could fetch data from an API, database, etc.
return ["a", "b", "c"]
@task
def consume(item: str):
print(item)
with DAG(dag_id="dynamic-map-generated", start_date=datetime(2022, 1, 1)) as dag:
consume.expand(item=make_list())
Details¶
Only keyword arguments can be passed to
expand()
.Mapped inputs are provided to tasks as lazy proxy objects. To force evaluation into a concrete list, wrap the proxy in
list()
.Combine static parameters with mapped ones using
partial()
:@task def add(x: int, y: int): return x + y with DAG(dag_id="map-with-partial", start_date=datetime(2022, 1, 1)) as dag: add.partial(y=10).expand(x=[1, 2, 3])
Advanced Usage¶
For advanced patterns—such as repeated mapping, cross-product mapping,
named mappings (via map_index_template
), and handling large
datasets—see the Airflow Core documentation: