Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Callbacks

A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given DAG or task, or across all tasks in a given DAG. For example, you may wish to alert when certain tasks have failed, or invoke a callback when your DAG succeeds.

There are three different places where callbacks can be defined.

  • Callbacks set in the DAG definition will be applied at the DAG level.

  • Using default_args, callbacks can be set for each task in a DAG.

  • Individual callbacks can be set for a task by setting that callback within the task definition itself.

Note

Callback functions are only invoked when the DAG or task state changes due to execution by a worker. As such, DAG and task changes set by the command line interface (CLI) or user interface (UI) do not execute callback functions.

Warning

Callback functions are executed after tasks are completed. Errors in callback functions will show up in scheduler logs rather than task logs. By default, scheduler logs do not show up in the UI and instead can be found in $AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log

Callback Types

There are six types of events that can trigger a callback:

Name

Description

on_success_callback

Invoked when the DAG succeeds or task succeeds. Available at the DAG or task level.

on_failure_callback

Invoked when the task fails. Available at the DAG or task level.

on_retry_callback

Invoked when the task is up for retry. Available only at the task level.

on_execute_callback

Invoked right before the task begins executing. Available only at the task level.

on_skipped_callback

Invoked when the task is running and AirflowSkipException raised. Explicitly it is NOT called if a task is not started to be executed because of a preceding branching decision in the DAG or a trigger rule which causes execution to skip so that the task execution is never scheduled. Available only at the task level.

Example

In the following example, failures in task1 call the task_failure_alert function, and success at DAG level calls the dag_success_alert function. Before each task begins to execute, the task_execute_callback function will be called:

import datetime
import pendulum

from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator


def task_execute_callback(context):
    print(f"Task has begun execution, task_instance_key_str: {context['task_instance_key_str']}")


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=dag_success_alert,
    default_args={"on_execute_callback": task_execute_callback},
    tags=["example"],
):
    task1 = EmptyOperator(task_id="task1", on_failure_callback=[task_failure_alert])
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task1 >> task2 >> task3

Note

As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your DAG/task callbacks: e.g on_failure_callback=[callback_func_1, callback_func_2]

Full list of variables available in context in docs and code.

Was this entry helpful?