Callbacks

A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given Dag or task, or across all tasks in a given Dag. For example, you may wish to alert when certain tasks have failed, or invoke a callback when your Dag succeeds.

There are three different places where callbacks can be defined.

  • Callbacks set in the Dag definition will be applied at the Dag level.

  • Using default_args, callbacks can be set for each task in a Dag.

  • Individual callbacks can be set for a task by setting that callback within the task definition itself.

Note

Callback functions are only invoked when the Dag or task state changes due to execution by a worker. As such, Dag and task changes set by the command line interface (CLI) or user interface (UI) do not execute callback functions.

Warning

Callback functions are executed after tasks are completed. Errors in callback functions will show up in dag processor logs rather than task logs. By default, dag processor logs do not show up in the UI and instead can be found in $AIRFLOW_HOME/logs/dag_processor/latest/dags-folder/<the_path_for_your_dag>/DAG_FILE.py.log

Note

As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your Dag/task callbacks: e.g on_failure_callback=[callback_func_1, callback_func_2]

Callback Types

There are six types of events that can trigger a callback:

Name

Description

Availability

on_success_callback

Invoked when the Dag succeeds or task succeeds.

Dag or Task

on_failure_callback

Invoked when the Dag fails or task fails.

Dag or Task

on_retry_callback

Invoked when the task is up for retry.

Task

on_execute_callback

Invoked right before the task begins executing.

Task

on_skipped_callback

Invoked when the task is running and AirflowSkipException raised. Explicitly it is NOT called if a task is not started to be executed because of a preceding branching decision in the Dag or a trigger rule which causes execution to skip so that the task execution is never scheduled.

Task

Context Mapping

A context mapping that contains runtime information about a task instance is passed to every callback. Full list of variables available in context are in docs and code.

Dag Callbacks

As the context mapping describes execution of a task instance, contexts passed to Dag callbacks will also contain task instance variables, and the task selected depends on the state of a Dag:

  1. On regular failure, the latest failed task is selected.

  2. On Dag run timeout, the latest started but not finished task is passed.

  3. If tasks are deadlocked, a task that should have run next but couldn’t is passed.

  4. On success, the latest succeeded task is passed.

It’s not recommended to rely on task instance variables in Dag callbacks except for human analysis, as they reflect only partial information about the Dag’s state. For example, a timeout may be caused by a number of stalling tasks, but only one will eventually be selected for context.

Note

Before Airflow 3.2.0, the rules above did not apply and the task instance passed to Dag callback was not related to Dag state, rather being selected as the latest task in the Dag lexicographically.

Examples

Using Custom Callback Methods

In the following example, failures in task1 call the task_failure_alert function, and success at Dag level calls the dag_success_alert function. Before each task begins to execute, the task_execute_callback function will be called:

from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator


def task_execute_callback(context):
    print(f"Task has begun execution, task_instance_key_str: {context['task_instance_key_str']}")


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"Dag has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    on_success_callback=dag_success_alert,
    default_args={"on_execute_callback": task_execute_callback},
):
    task1 = EmptyOperator(task_id="task1", on_failure_callback=[task_failure_alert])
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task1 >> task2 >> task3

Using Notifiers

You can use Notifiers in your Dag definition by passing it as an argument to the on_*_callbacks. For example, you can use it with on_success_callback or on_failure_callback to send notifications based on the status of a task or a Dag run.

Here’s an example of using a custom notifier:

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

from myprovider.notifier import MyNotifier

with DAG(
    dag_id="example_notifier",
    on_success_callback=MyNotifier(message="Success!"),
    on_failure_callback=MyNotifier(message="Failure!"),
):
    task = BashOperator(
        task_id="example_task",
        bash_command="exit 1",
        on_success_callback=MyNotifier(message="Task Succeeded!"),
    )

For a list of community-managed Notifiers, see Notifications. For more information on writing a custom Notifier, see the Notifiers how-to page.

Deadline Alert Callbacks

In addition to the Dag/task lifecycle callbacks above, Airflow supports Deadline Alert callbacks which trigger when a Dag run exceeds a configured time threshold. Deadline Alert callbacks use AsyncCallback (runs in the Triggerer) or SyncCallback (runs in the executor) and are configured on the Dag via the deadline parameter.

For full details, see Deadline Alerts.

Was this entry helpful?