Working with TaskFlow

This tutorial builds on the regular Airflow Tutorial and focuses specifically on writing data pipelines using the TaskFlow API paradigm which is introduced as part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm.

The data pipeline chosen here is a simple pattern with three separate Extract, Transform, and Load tasks.

Example “TaskFlow API” Pipeline

Here is a very simple pipeline using the TaskFlow API paradigm. A more detailed explanation is given below.

airflow/example_dags/tutorial_taskflow_api.py[source]


import json

import pendulum

from airflow.decorators import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()

It’s a DAG definition file

If this is the first DAG file you are looking at, please note that this Python script is interpreted by Airflow and is a configuration file for your data pipeline. For a complete introduction to DAG files, please look at the core fundamentals tutorial which covers DAG structure and definitions extensively.

Instantiate a DAG

We are creating a DAG which is the collection of our tasks with dependencies between the tasks. This is a very simple definition, since we just want the DAG to be run when we set this up with Airflow, without any retries or complex scheduling. In this example, please notice that we are creating this DAG using the @dag decorator as shown below, with the Python function name acting as the DAG identifier.

airflow/example_dags/tutorial_taskflow_api.py[source]

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

Now to actually enable this to be run as a DAG, we invoke the Python function tutorial_taskflow_api set up using the @dag decorator earlier, as shown below.

airflow/example_dags/tutorial_taskflow_api.py[source]

tutorial_taskflow_api()

Changed in version 2.4: It’s no longer required to “register” the DAG into a global variable for Airflow to be able to detect the dag if that DAG is used inside a with block, or if it is the result of a @dag decorated function.

Tasks

In this data pipeline, tasks are created based on Python functions using the @task decorator as shown below. The function name acts as a unique identifier for the task.

airflow/example_dags/tutorial_taskflow_api.py[source]

@task()
def extract():
    """
    #### Extract task
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data is simulated by reading from a
    hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)
    return order_data_dict

The returned value, which in this case is a dictionary, will be made available for use in later tasks.

The Transform and Load tasks are created in the same manner as the Extract task shown above.

Main flow of the DAG

Now that we have the Extract, Transform, and Load tasks defined based on the Python functions, we can move to the main part of the DAG.

airflow/example_dags/tutorial_taskflow_api.py[source]

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

That’s it, we are done! We have invoked the Extract task, obtained the order data from there and sent it over to the Transform task for summarization, and then invoked the Load task with the summarized data. The dependencies between the tasks and the passing of data between these tasks which could be running on different workers on different nodes on the network is all handled by Airflow.

Now to actually enable this to be run as a DAG, we invoke the Python function tutorial_taskflow_api set up using the @dag decorator earlier, as shown below.

airflow/example_dags/tutorial_taskflow_api.py[source]

tutorial_taskflow_api()

But how?

For experienced Airflow DAG authors, this is startlingly simple! Let’s contrast this with how this DAG had to be written before Airflow 2.0 below:

airflow/example_dags/tutorial_dag.py[source]


import json
import textwrap

import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
    "tutorial_dag",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={"retries": 2},
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    dag.doc_md = __doc__
    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)
    def transform(**kwargs):
        ti = kwargs["ti"]
        extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
        order_data = json.loads(extract_data_string)

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push("total_order_value", total_value_json_string)
    def load(**kwargs):
        ti = kwargs["ti"]
        total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
        total_order_value = json.loads(total_value_string)

        print(total_order_value)
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )
    extract_task.doc_md = textwrap.dedent(
        """\
    #### Extract task
    A simple Extract task to get data ready for the rest of the data pipeline.
    In this case, getting data is simulated by reading from a hardcoded JSON string.
    This data is then put into xcom, so that it can be processed by the next task.
    """
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )
    transform_task.doc_md = textwrap.dedent(
        """\
    #### Transform task
    A simple Transform task which takes in the collection of order data from xcom
    and computes the total order value.
    This computed value is then put into xcom, so that it can be processed by the next task.
    """
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
    )
    load_task.doc_md = textwrap.dedent(
        """\
    #### Load task
    A simple Load task which takes in the result of the Transform task, by reading it
    from xcom and instead of saving it to end user review, just prints it out.
    """
    )

    extract_task >> transform_task >> load_task

All of the processing shown above is being done in the new Airflow 2.0 DAG as well, but it is all abstracted from the DAG developer.

Let’s examine this in detail by looking at the Transform task in isolation since it is in the middle of the data pipeline. In Airflow 1.x, this task is defined as shown below:

airflow/example_dags/tutorial_dag.py[source]

def transform(**kwargs):
    ti = kwargs["ti"]
    extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
    order_data = json.loads(extract_data_string)

    total_order_value = 0
    for value in order_data.values():
        total_order_value += value

    total_value = {"total_order_value": total_order_value}
    total_value_json_string = json.dumps(total_value)
    ti.xcom_push("total_order_value", total_value_json_string)

As we see here, the data being processed in the Transform function is passed to it using XCom variables. In turn, the summarized data from the Transform function is also placed into another XCom variable which will then be used by the Load task.

Contrasting that with TaskFlow API in Airflow 2.0 as shown below.

airflow/example_dags/tutorial_taskflow_api.py[source]

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}

All of the XCom usage for data passing between these tasks is abstracted away from the DAG author in Airflow 2.0. However, XCom variables are used behind the scenes and can be viewed using the Airflow UI as necessary for debugging or DAG monitoring.

Similarly, task dependencies are automatically generated within TaskFlows based on the functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly created and dependencies specified as shown below.

airflow/example_dags/tutorial_dag.py[source]

extract_task = PythonOperator(
    task_id="extract",
    python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
    """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)

transform_task = PythonOperator(
    task_id="transform",
    python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
    """\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)

load_task = PythonOperator(
    task_id="load",
    python_callable=load,
)
load_task.doc_md = textwrap.dedent(
    """\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)

extract_task >> transform_task >> load_task

In contrast, with the TaskFlow API in Airflow 2.0, the invocation itself automatically generates the dependencies as shown below.

airflow/example_dags/tutorial_taskflow_api.py[source]

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

Reusing a decorated task

Decorated tasks are flexible. You can reuse a decorated task in multiple DAGs, overriding the task parameters such as the task_id, queue, pool, etc.

Below is an example of how you can reuse a decorated task in multiple DAGs:

from airflow.decorators import task, dag
from datetime import datetime


@task
def add_task(x, y):
    print(f"Task args: x={x}, y={y}")
    return x + y


@dag(start_date=datetime(2022, 1, 1))
def mydag():
    start = add_task.override(task_id="start")(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"add_start_{i}")(start, i)


@dag(start_date=datetime(2022, 1, 1))
def mydag2():
    start = add_task(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"new_add_task_{i}")(start, i)


first_dag = mydag()
second_dag = mydag2()

You can also import the above add_task and use it in another DAG file. Suppose the add_task code lives in a file called common.py. You can do this:

from common import add_task
from airflow.decorators import dag
from datetime import datetime


@dag(start_date=datetime(2022, 1, 1))
def use_add_task():
    start = add_task.override(priority_weight=3)(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"new_add_task_{i}", retries=4)(start, i)


created_dag = use_add_task()

Using the TaskFlow API with complex/conflicting Python dependencies

If you have tasks that require complex or conflicting requirements then you will have the ability to use the TaskFlow API with either Python virtual environment (since 2.0.2), Docker container (since 2.2.0), ExternalPythonOperator (since 2.4.0) or KubernetesPodOperator (since 2.4.0).

This functionality allows a much more comprehensive range of use-cases for the TaskFlow API, as you are not limited to the packages and system libraries of the Airflow worker. For all cases of the decorated functions described below, you have to make sure the functions are serializable and that they only use local imports for additional dependencies you use. Those imported additional libraries must be available in the target environment - they do not need to be available in the main Airflow environment.

Which of the operators you should use, depend on several factors:

  • whether you are running Airflow with access to Docker engine or Kubernetes

  • whether you can afford an overhead to dynamically create a virtual environment with the new dependencies

  • whether you can deploy a pre-existing, immutable Python environment for all Airflow components.

These options should allow for far greater flexibility for users who wish to keep their workflows simpler and more Pythonic - and allow you to keep complete logic of your DAG in the DAG itself.

You can also get more context about the approach of managing conflicting dependencies, including more detailed explanation on boundaries and consequences of each of the options in Best practices for handling conflicting/complex Python dependencies

Virtualenv created dynamically for each task

The simplest approach is to create dynamically (every time a task is run) a separate virtual environment on the same machine, you can use the @task.virtualenv decorator. The decorator allows you to create dynamically a new virtualenv with custom libraries and even a different Python version to run your function.

Example (dynamically created virtualenv):

airflow/example_dags/example_python_operator.py[source]

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the function level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",
    python_callable=callable_virtualenv,
    requirements=["colorama==0.4.0"],
    system_site_packages=False,
)

Using Python environment with pre-installed dependencies

A bit more involved @task.external_python decorator allows you to run an Airflow task in pre-defined, immutable virtualenv (or Python binary installed at system level without virtualenv). This virtualenv or system python can also have different set of custom libraries installed and must be made available in all workers that can execute the tasks in the same location.

Example with @task.external_python (using immutable, pre-existing virtualenv):

airflow/example_dags/example_python_operator.py[source]

def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = ExternalPythonOperator(
    task_id="external_python",
    python_callable=callable_external_python,
    python=PATH_TO_PYTHON_BINARY,
)

Dependency separation using Docker Operator

If your Airflow workers have access to a docker engine, you can instead use a DockerOperator and add any needed arguments to correctly run the task. Please note that the docker image must have a working Python installed and take in a bash command as the command argument.

It is worth noting that the Python source code (extracted from the decorated function) and any callable args are sent to the container via (encoded and pickled) environment variables so the length of these is not boundless (the exact limit depends on system settings).

Below is an example of using the @task.docker decorator to run a Python task.

tests/system/docker/example_taskflow_api_docker_virtualenv.py[source]

@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}

Notes on using the operator:

Note

Using @task.docker decorator in one of the earlier Airflow versions

Since @task.docker decorator is available in the docker provider, you might be tempted to use it in Airflow version before 2.2, but this is not going to work. You will get this error if you try:

AttributeError: '_TaskDecorator' object has no attribute 'docker'

You should upgrade to Airflow 2.2 or above in order to use it.

Dependency separation using Kubernetes Pod Operator

If your Airflow workers have access to Kubernetes, you can instead use a KubernetesPodOperator and add any needed arguments to correctly run the task.

Below is an example of using the @task.kubernetes decorator to run a Python task.

tests/system/cncf/kubernetes/example_kubernetes_decorator.py[source]

@task.kubernetes(
    image="python:3.9-slim-buster",
    name="k8s_test",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
    import time

    print("Hello from k8s pod")
    time.sleep(2)

@task.kubernetes(image="python:3.9-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
    n = 5
    for i in range(n):
        # inner loop to handle number of columns
        # values changing acc. to outer loop
        for _ in range(i + 1):
            # printing stars
            print("* ", end="")

        # ending line after each row
        print("\r")

execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()

execute_in_k8s_pod_instance >> print_pattern_instance

Notes on using the operator:

Note

Using @task.kubernetes decorator in one of the earlier Airflow versions

Since @task.kubernetes decorator is available in the docker provider, you might be tempted to use it in Airflow version before 2.4, but this is not going to work. You will get this error if you try:

AttributeError: '_TaskDecorator' object has no attribute 'kubernetes'

You should upgrade to Airflow 2.4 or above in order to use it.

Using the TaskFlow API with Sensor operators

You can apply the @task.sensor decorator to convert a regular Python function to an instance of the BaseSensorOperator class. The Python function implements the poke logic and returns an instance of the PokeReturnValue class as the poke() method in the BaseSensorOperator does. In Airflow 2.3, sensor operators will be able to return XCOM values. This is achieved by returning an instance of the PokeReturnValue object at the end of the poke() method:

from airflow.sensors.base import PokeReturnValue


class SensorWithXcomValue(BaseSensorOperator):
    def poke(self, context: Context) -> Union[bool, PokeReturnValue]:
        # ...
        is_done = ...  # set to true if the sensor should stop poking.
        xcom_value = ...  # return value of the sensor operator to be pushed to XCOM.
        return PokeReturnValue(is_done, xcom_value)

To implement a sensor operator that pushes a XCOM value and supports both version 2.3 and pre-2.3, you need to explicitly push the XCOM value if the version is pre-2.3.

try:
    from airflow.sensors.base import PokeReturnValue
except ImportError:
    PokeReturnValue = None


class SensorWithXcomValue(BaseSensorOperator):
    def poke(self, context: Context) -> bool:
        # ...
        is_done = ...  # set to true if the sensor should stop poking.
        xcom_value = ...  # return value of the sensor operator to be pushed to XCOM.
        if PokeReturnValue is not None:
            return PokeReturnValue(is_done, xcom_value)
        else:
            if is_done:
                context["ti"].xcom_push(key="xcom_key", value=xcom_value)
            return is_done

Alternatively in cases where the sensor doesn’t need to push XCOM values: both poke() and the wrapped function can return a boolean-like value where True designates the sensor’s operation as complete and False designates the sensor’s operation as incomplete.

airflow/example_dags/example_sensor_decorator.py[source]


import pendulum

from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_sensor_decorator():
    # Using a sensor operator to wait for the upstream data to be ready.
    @task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
    def wait_for_upstream() -> PokeReturnValue:
        return PokeReturnValue(is_done=True, xcom_value="xcom_value")
    @task
    def dummy_operator() -> None:
        pass
    wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()

Multiple outputs inference

Tasks can also infer multiple outputs by using dict Python typing.

@task
def identity_dict(x: int, y: int) -> dict[str, int]:
    return {"x": x, "y": y}

By using the typing dict, or any other class that conforms to the typing.Mapping protocol, for the function return type, the multiple_outputs parameter is automatically set to true.

Note, If you manually set the multiple_outputs parameter the inference is disabled and the parameter value is used.

Adding dependencies between decorated and traditional tasks

The above tutorial shows how to create dependencies between TaskFlow functions. However, dependencies can also be set between traditional tasks (such as BashOperator or FileSensor) and TaskFlow functions.

Building this dependency is shown in the code below:

@task()
def extract_from_file():
    """
    #### Extract from file task
    A simple Extract task to get data ready for the rest of the data
    pipeline, by reading the data from a file into a pandas dataframe
    """
    order_data_file = "/tmp/order_data.csv"
    order_data_df = pd.read_csv(order_data_file)
    return order_data_df


file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv")
order_data = extract_from_file()

file_task >> order_data

In the above code block, a new TaskFlow function is defined as extract_from_file which reads the data from a known file location. In the main DAG, a new FileSensor task is defined to check for this file. Please note that this is a Sensor task which waits for the file. The TaskFlow function call is put in a variable order_data. Finally, a dependency between this Sensor task and the TaskFlow function is specified using the variable.

Consuming XComs between decorated and traditional tasks

As noted above, the TaskFlow API allows XComs to be consumed or passed between tasks in a manner that is abstracted away from the DAG author. This section dives further into detailed examples of how this is possible not only between TaskFlow functions but between both TaskFlow functions and traditional tasks.

You may find it necessary to consume an XCom from traditional tasks, either pushed within the task’s execution or via its return value, as an input into downstream tasks. You can access the pushed XCom (also known as an XComArg) by utilizing the .output property exposed for all operators.

By default, using the .output property to retrieve an XCom result is the equivalent of:

task_instance.xcom_pull(task_ids="my_task_id", key="return_value")

To retrieve an XCom result for a key other than return_value, you can use:

my_op = MyOperator(...)
my_op_output = my_op.output["some_other_xcom_key"]
# OR
my_op_output = my_op.output.get("some_other_xcom_key")

Note

Using the .output property as an input to another task is supported only for operator parameters listed as a template_field.

In the code example below, a HttpOperator result is captured via XComs. This XCom result, which is the task output, is then passed to a TaskFlow function which parses the response as JSON.

get_api_results_task = HttpOperator(
    task_id="get_api_results",
    endpoint="/api/query",
    do_xcom_push=True,
    http_conn_id="http",
)


@task
def parse_results(api_results):
    return json.loads(api_results)


parsed_results = parse_results(api_results=get_api_results_task.output)

The reverse can also be done: passing the output of a TaskFlow function as an input to a traditional task.

@task(retries=3)
def create_queue():
    """This is a Python function that creates an SQS queue"""
    hook = SqsHook()
    result = hook.create_queue(queue_name="sample-queue")

    return result["QueueUrl"]


sqs_queue = create_queue()

publish_to_queue = SqsPublishOperator(
    task_id="publish_to_queue",
    sqs_queue=sqs_queue,
    message_content="{{ task_instance }}-{{ execution_date }}",
    message_attributes=None,
    delay_seconds=0,
)

Take note in the code example above, the output from the create_queue TaskFlow function, the URL of a newly-created Amazon SQS Queue, is then passed to a SqsPublishOperator task as the sqs_queue arg.

Finally, not only can you use traditional operator outputs as inputs for TaskFlow functions, but also as inputs to other traditional operators. In the example below, the output from the SalesforceToS3Operator task (which is an S3 URI for a destination file location) is used an input for the S3CopyObjectOperator task to copy the same file to a date-partitioned storage location in S3 for long-term storage in a data lake.

BASE_PATH = "salesforce/customers"
FILE_NAME = "customer_daily_extract_{{ ds_nodash }}.csv"


upload_salesforce_data_to_s3_landing = SalesforceToS3Operator(
    task_id="upload_salesforce_data_to_s3",
    salesforce_query="SELECT Id, Name, Company, Phone, Email, LastModifiedDate, IsActive FROM Customers",
    s3_bucket_name="landing-bucket",
    s3_key=f"{BASE_PATH}/{FILE_NAME}",
    salesforce_conn_id="salesforce",
    aws_conn_id="s3",
    replace=True,
)


store_to_s3_data_lake = S3CopyObjectOperator(
    task_id="store_to_s3_data_lake",
    aws_conn_id="s3",
    source_bucket_key=upload_salesforce_data_to_s3_landing.output,
    dest_bucket_name="data_lake",
    dest_bucket_key=f"""{BASE_PATH}/{"{{ execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
)

Accessing context variables in decorated tasks

When running your callable, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your Jinja templates. For this to work, you can add context keys you would like to receive in the function as keyword arguments.

For example, the callable in the code block below will get values of the ti and next_ds context variables:

@task
def my_python_callable(*, ti, next_ds):
    pass

Changed in version 2.8: Previously the context key arguments must provide a default, e.g. ti=None. This is no longer needed.

You can also choose to receive the entire context with **kwargs. Note that this can incur a slight performance penalty since Airflow will need to expand the entire context that likely contains many things you don’t actually need. It is therefore more recommended for you to use explicit arguments, as demonstrated in the previous paragraph.

@task
def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]

Also, sometimes you might want to access the context somewhere deep in the stack, but you do not want to pass the context variables from the task callable. You can still access execution context via the get_current_context method.

from airflow.providers.standard.operators.python import get_current_context


def some_function_in_your_library():
    context = get_current_context()
    ti = context["ti"]

Current context is accessible only during the task execution. The context is not accessible during pre_execute or post_execute. Calling this method outside execution context will raise an error.

Using templates in decorated tasks

Arguments passed to your decorated function are automatically templated.

You can also use the templates_exts parameter to template entire files.

@task(templates_exts=[".sql"])
def template_test(sql):
    print(f"sql: {sql}")


template_test(sql="sql/test.sql")

This will read the content of sql/test.sql and replace all template variables. You can also pass a list of files and all of them will be templated.

You can pass additional parameters to the template engine through the params parameter.

However, the params parameter must be passed to the decorator and not to your function directly, such as @task(templates_exts=['.sql'], params={'my_param'}) and can then be used with {{ params.my_param }} in your templated files and function parameters.

Alternatively, you can also pass it using the .override() method:

@task()
def template_test(input_var):
    print(f"input_var: {input_var}")


template_test.override(params={"my_param": "wow"})(
    input_var="my param is: {{ params.my_param }}",
)

Finally, you can also manually render templates:

@task(params={"my_param": "wow"})
def template_test():
    template_str = "run_id: {{ run_id }}; params.my_param: {{ params.my_param }}"

    context = get_current_context()
    rendered_template = context["task"].render_template(
        template_str,
        context,
    )

Here is a full example that demonstrates everything above:

airflow/example_dags/tutorial_taskflow_templates.py[source]


import pendulum

from airflow.decorators import dag, task
from airflow.providers.standard.operators.python import get_current_context
@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
    params={"foobar": "param_from_dag", "other_param": "from_dag"},
)
def tutorial_taskflow_templates():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the templates in the TaskFlow API.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task(
        # Causes variables that end with `.sql` to be read and templates
        # within to be rendered.
        templates_exts=[".sql"],
    )
    def template_test(sql, test_var, data_interval_end):
        context = get_current_context()

        # Will print...
        # select * from test_data
        # where 1=1
        #     and run_id = 'scheduled__2024-10-09T00:00:00+00:00'
        #     and something_else = 'param_from_task'
        print(f"sql: {sql}")

        # Will print `scheduled__2024-10-09T00:00:00+00:00`
        print(f"test_var: {test_var}")

        # Will print `2024-10-10 00:00:00+00:00`.
        # Note how we didn't pass this value when calling the task. Instead
        # it was passed by the decorator from the context
        print(f"data_interval_end: {data_interval_end}")

        # Will print...
        # run_id: scheduled__2024-10-09T00:00:00+00:00; params.other_param: from_dag
        template_str = "run_id: {{ run_id }}; params.other_param: {{ params.other_param }}"
        rendered_template = context["task"].render_template(
            template_str,
            context,
        )
        print(f"rendered template: {rendered_template}")

        # Will print the full context dict
        print(f"context: {context}")
    template_test.override(
        # Will be merged with the dict defined in the dag
        # and override existing parameters.
        #
        # Must be passed into the decorator's parameters
        # through `.override()` not into the actual task
        # function
        params={"foobar": "param_from_task"},
    )(
        sql="sql/test.sql",
        test_var="{{ run_id }}",
    )
tutorial_taskflow_templates()

Conditionally skipping tasks

The run_if() and skip_if() are syntactic sugar for TaskFlow that allows you to skip a Task based on a condition. You can use them to simply set execution conditions without changing the structure of the DAG or Task.

It also allows you to set conditions using Context, which is essentially the same as using pre_execute.

An example usage of run_if() is as follows:

@task.run_if(lambda context: context["task_instance"].task_id == "run")
@task.bash()
def echo() -> str:
    return "echo 'run'"

The echo defined in the above code is only executed when the task_id is run.

If you want to leave a log when you skip a task, you have two options.

@task.run_if(lambda context: context["task_instance"].task_id == "run", skip_message="only task_id is 'run'")
@task.bash()
def echo() -> str:
    return "echo 'run'"
@task.run_if(
    lambda context: (context["task_instance"].task_id == "run", f"{context['ts']}: only task_id is 'run'")
)
@task.bash()
def echo() -> str:
    return "echo 'run'"

There is also a skip_if() that works the opposite of run_if(), and is used in the same way.

@task.skip_if(lambda context: context["task_instance"].task_id == "skip")
@task.bash()
def echo() -> str:
    return "echo 'run'"

What’s Next?

You have seen how simple it is to write DAGs using the TaskFlow API paradigm within Airflow 2.0. Here are a few steps you might want to take next:

See also

Was this entry helpful?