Callbacks

A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given task, or across all tasks in a given DAG. For example, you may wish to alert when certain tasks have failed, or have the last task in your DAG invoke a callback when it succeeds.

Note

Callback functions are only invoked when the task state changes due to execution by a worker. As such, task changes set by the command line interface (CLI) or user interface (UI) do not execute callback functions.

Warning

Callback functions are executed after tasks are completed. Errors in callback functions will show up in scheduler logs rather than task logs. By default, scheduler logs do not show up in the UI and instead can be found in $AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log

Callback Types

There are five types of task events that can trigger a callback:

Name

Description

on_success_callback

Invoked when the task succeeds

on_failure_callback

Invoked when the task fails

on_retry_callback

Invoked when the task is up for retry

on_execute_callback

Invoked right before the task begins executing.

on_skipped_callback

Invoked when the task is running and AirflowSkipException raised. Explicitly it is NOT called if a task is not started to be executed because of a preceding branching decision in the DAG or a trigger rule which causes execution to skip so that the task execution is never scheduled.

Example

In the following example, failures in any task call the task_failure_alert function, and success in the last task calls the dag_success_alert function:

import datetime
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

Note

As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your DAG/task callbacks: e.g on_failure_callback=[callback_func_1, callback_func_2]

Full list of variables available in context in docs and code.

Was this entry helpful?