Examples¶
Note
For a minimal quick start, see the Getting Started section.
Key Concepts¶
Defining DAGs¶
Example: Defining a DAG
Use the airflow.sdk.dag()
decorator to convert a Python function into an Airflow DAG. All nested calls to airflow.sdk.task()
within the function will become tasks in the DAG. For full parameters and usage, see the API reference for airflow.sdk.dag()
.
/opt/airflow/airflow-core/src/airflow/example_dags/example_dag_decorator.py
from typing import TYPE_CHECKING, Any
import httpx
import pendulum
from airflow.models.baseoperator import BaseOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import dag, task
if TYPE_CHECKING:
from airflow.sdk import Context
class GetRequestOperator(BaseOperator):
"""Custom operator to send GET request to provided url"""
template_fields = ("url",)
def __init__(self, *, url: str, **kwargs):
super().__init__(**kwargs)
self.url = url
def execute(self, context: Context):
return httpx.get(self.url).json()
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_dag_decorator(url: str = "http://httpbin.org/get"):
"""
DAG to get IP address and echo it via BashOperator.
:param url: URL to get IP address from. Defaults to "http://httpbin.org/get".
"""
get_ip = GetRequestOperator(task_id="get_ip", url=url)
@task(multiple_outputs=True)
def prepare_command(raw_json: dict[str, Any]) -> dict[str, str]:
external_ip = raw_json["origin"]
return {
"command": f"echo 'Seems like today your server executing Airflow is connected from IP {external_ip}'",
}
command_info = prepare_command(get_ip.output)
BashOperator(task_id="echo_ip_info", bash_command=command_info["command"])
example_dag = example_dag_decorator()
Decorators¶
Example: Using Task SDK decorators
The Task SDK provides decorators to simplify DAG definitions:
airflow.sdk.task_group()
groups related tasks into logical TaskGroups.airflow.sdk.setup()
andairflow.sdk.teardown()
define setup and teardown hooks for DAGs or TaskGroups.
/opt/airflow/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
import pendulum
from airflow.sdk import DAG, task, task_group
# Creating Tasks
@task
def task_start():
"""Empty Task which is First Task of Dag"""
return "[Task_start]"
@task
def task_1(value: int) -> str:
"""Empty Task1"""
return f"[ Task1 {value} ]"
@task
def task_2(value: str) -> str:
"""Empty Task2"""
return f"[ Task2 {value} ]"
@task
def task_3(value: str) -> None:
"""Empty Task3"""
print(f"[ Task3 {value} ]")
@task
def task_end() -> None:
"""Empty Task which is Last Task of Dag"""
print("[ Task_End ]")
# Creating TaskGroups
@task_group
def task_group_function(value: int) -> None:
"""TaskGroup for grouping related Tasks"""
task_3(task_2(task_1(value)))
# Executing Tasks and TaskGroups
with DAG(
dag_id="example_task_group_decorator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
start_task = task_start()
end_task = task_end()
for i in range(5):
current_task_group = task_group_function(i)
start_task >> current_task_group >> end_task
/opt/airflow/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
import pendulum
from airflow.sdk import DAG, setup, task, task_group, teardown
with DAG(
dag_id="example_setup_teardown_taskflow",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task
def my_first_task():
print("Hello 1")
@task
def my_second_task():
print("Hello 2")
@task
def my_third_task():
print("Hello 3")
# you can set setup / teardown relationships with the `as_teardown` method.
task_1 = my_first_task()
task_2 = my_second_task()
task_3 = my_third_task()
task_1 >> task_2 >> task_3.as_teardown(setups=task_1)
# The method `as_teardown` will mark task_3 as teardown, task_1 as setup, and
# arrow task_1 >> task_3.
# Now if you clear task_2, then its setup task, task_1, will be cleared in
# addition to its teardown task, task_3
# it's also possible to use a decorator to mark a task as setup or
# teardown when you define it. see below.
@setup
def outer_setup():
print("I am outer_setup")
return "some cluster id"
@teardown
def outer_teardown(cluster_id):
print("I am outer_teardown")
print(f"Tearing down cluster: {cluster_id}")
@task
def outer_work():
print("I am just a normal task")
@task_group
def section_1():
@setup
def inner_setup():
print("I set up")
return "some_cluster_id"
@task
def inner_work(cluster_id):
print(f"doing some work with {cluster_id=}")
@teardown
def inner_teardown(cluster_id):
print(f"tearing down {cluster_id=}")
# this passes the return value of `inner_setup` to both `inner_work` and `inner_teardown`
inner_setup_task = inner_setup()
inner_work(inner_setup_task) >> inner_teardown(inner_setup_task)
# by using the decorators, outer_setup and outer_teardown are already marked as setup / teardown
# now we just need to make sure they are linked directly. At a low level, what we need
# to do so is the following::
# s = outer_setup()
# t = outer_teardown()
# s >> t
# s >> outer_work() >> t
# Thus, s and t are linked directly, and outer_work runs in between. We can take advantage of
# the fact that we are in taskflow, along with the context manager on teardowns, as follows:
with outer_teardown(outer_setup()):
outer_work()
# and let's put section 1 inside the outer setup and teardown tasks
section_1()
Tasks and Operators¶
Example: Defining tasks and using operators
Use the airflow.sdk.task()
decorator to wrap Python callables as tasks and leverage dynamic task mapping with the .expand()
method. Tasks communicate via airflow.sdk.XComArg
. For traditional operators and sensors, import classes like airflow.sdk.BaseOperator
or airflow.sdk.Sensor
.
/opt/airflow/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
from datetime import datetime
from airflow.sdk import DAG, task
with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3])
sum_it(added_values)
with DAG(
dag_id="example_task_mapping_second_order", schedule=None, catchup=False, start_date=datetime(2022, 3, 4)
) as dag2:
@task
def get_nums():
return [1, 2, 3]
@task
def times_2(num):
return num * 2
@task
def add_10(num):
return num + 10
_get_nums = get_nums()
_times_2 = times_2.expand(num=_get_nums)
add_10.expand(num=_times_2)
/opt/airflow/airflow-core/src/airflow/example_dags/example_xcomargs.py
import logging
import pendulum
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, task
log = logging.getLogger(__name__)
@task
def generate_value():
"""Empty function"""
return "Bring me a shrubbery!"
@task
def print_value(value, ts=None):
"""Empty function"""
log.info("The knights of Ni say: %s (at %s)", value, ts)
with DAG(
dag_id="example_xcom_args",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example"],
) as dag:
print_value(generate_value())
with DAG(
"example_xcom_args_with_operators",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example"],
) as dag2:
bash_op1 = BashOperator(task_id="c", bash_command="echo c")
bash_op2 = BashOperator(task_id="d", bash_command="echo c")
xcom_args_a = print_value("first!")
xcom_args_b = print_value("second!")
bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2
Assets¶
Example: Defining and aliasing assets
Model data artifacts using the Task SDK’s asset API. Decorate functions with airflow.sdk.asset()
and create aliases with airflow.sdk.AssetAlias
. See the API reference under assets for full guidance.
/opt/airflow/airflow-core/src/airflow/example_dags/example_assets.py
import pendulum
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, Asset
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
dag1_asset = Asset("s3://dag1/output_1.txt", extra={"hi": "bye"})
dag2_asset = Asset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_asset = Asset("s3://dag3/output_3.txt", extra={"hi": "bye"})
with DAG(
dag_id="asset_produces_1",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="@daily",
tags=["produces", "asset-scheduled"],
) as dag1:
# [START task_outlet]
BashOperator(outlets=[dag1_asset], task_id="producing_task_1", bash_command="sleep 5")
# [END task_outlet]
with DAG(
dag_id="asset_produces_2",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
tags=["produces", "asset-scheduled"],
) as dag2:
BashOperator(outlets=[dag2_asset], task_id="producing_task_2", bash_command="sleep 5")
# [START dag_dep]
with DAG(
dag_id="asset_consumes_1",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[dag1_asset],
tags=["consumes", "asset-scheduled"],
) as dag3:
# [END dag_dep]
BashOperator(
outlets=[Asset("s3://consuming_1_task/asset_other.txt")],
task_id="consuming_1",
bash_command="sleep 5",
)
with DAG(
dag_id="asset_consumes_1_and_2",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[dag1_asset, dag2_asset],
tags=["consumes", "asset-scheduled"],
) as dag4:
BashOperator(
outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
task_id="consuming_2",
bash_command="sleep 5",
)
with DAG(
dag_id="asset_consumes_1_never_scheduled",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[
dag1_asset,
Asset("s3://unrelated/this-asset-doesnt-get-triggered"),
],
tags=["consumes", "asset-scheduled"],
) as dag5:
BashOperator(
outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
task_id="consuming_3",
bash_command="sleep 5",
)
with DAG(
dag_id="asset_consumes_unknown_never_scheduled",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[
Asset("s3://unrelated/asset3.txt"),
Asset("s3://unrelated/asset_other_unknown.txt"),
],
tags=["asset-scheduled"],
) as dag6:
BashOperator(
task_id="unrelated_task",
outlets=[Asset("s3://unrelated_task/asset_other_unknown.txt")],
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_and_2_with_asset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_asset & dag2_asset),
) as dag5:
BashOperator(
outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
task_id="consume_1_and_2_with_asset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_2_with_asset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_asset | dag2_asset),
) as dag6:
BashOperator(
outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
task_id="consume_1_or_2_with_asset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_both_2_and_3_with_asset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_asset | (dag2_asset & dag3_asset)),
) as dag7:
BashOperator(
outlets=[Asset("s3://consuming_2_task/asset_other_unknown.txt")],
task_id="consume_1_or_both_2_and_3_with_asset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="conditional_asset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=AssetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), assets=(dag1_asset & dag2_asset)
),
tags=["asset-time-based-timetable"],
) as dag8:
BashOperator(
outlets=[Asset("s3://asset_time_based/asset_other_unknown.txt")],
task_id="conditional_asset_and_time_based_timetable",
bash_command="sleep 5",
)
/opt/airflow/airflow-core/src/airflow/example_dags/example_asset_alias.py
import pendulum
from airflow.sdk import DAG, Asset, AssetAlias, task
with DAG(
dag_id="asset_s3_bucket_producer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["producer", "asset"],
):
@task(outlets=[Asset("s3://bucket/my-task")])
def produce_asset_events():
pass
produce_asset_events()
with DAG(
dag_id="asset_alias_example_alias_producer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["producer", "asset-alias"],
):
@task(outlets=[AssetAlias("example-alias")])
def produce_asset_events_through_asset_alias(*, outlet_events=None):
bucket_name = "bucket"
object_path = "my-task"
outlet_events[AssetAlias("example-alias")].add(Asset(f"s3://{bucket_name}/{object_path}"))
produce_asset_events_through_asset_alias()
with DAG(
dag_id="asset_s3_bucket_consumer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[Asset("s3://bucket/my-task")],
catchup=False,
tags=["consumer", "asset"],
):
@task
def consume_asset_event():
pass
consume_asset_event()
with DAG(
dag_id="asset_alias_example_alias_consumer",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[AssetAlias("example-alias")],
catchup=False,
tags=["consumer", "asset-alias"],
):
@task(inlets=[AssetAlias("example-alias")])
def consume_asset_event_from_asset_alias(*, inlet_events=None):
for event in inlet_events[AssetAlias("example-alias")]:
print(event)
consume_asset_event_from_asset_alias()
TaskFlow API Tutorial¶
This section provides a concise, code-first view. For the full tutorial and context, see the core TaskFlow tutorial.
Step 1: Define the DAG¶
In this step, define your DAG by applying the airflow.sdk.dag()
decorator to a Python function. This registers the DAG with its schedule and default arguments. For more details, see airflow.sdk.dag()
.
/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
Step 2: Write your Tasks¶
/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
Step 3: Build the Flow¶
/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
Step 4: Invoke the DAG¶
/opt/airflow/airflow-core/src/airflow/example_dags/tutorial_taskflow_api.py
tutorial_taskflow_api()