Callbacks¶
A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given task, or across all tasks in a given DAG. For example, you may wish to alert when certain tasks have failed, or have the last task in your DAG invoke a callback when it succeeds.
Note
Callback functions are only invoked when the task state changes due to execution by a worker. As such, task changes set by the command line interface (CLI) or user interface (UI) do not execute callback functions.
Warning
Callback functions are executed after tasks are completed.
Errors in callback functions will show up in scheduler logs rather than task logs.
By default, scheduler logs do not show up in the UI and instead can be found in
$AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log
Callback Types¶
There are five types of task events that can trigger a callback:
Name |
Description |
---|---|
|
Invoked when the task succeeds |
|
Invoked when the task fails |
|
Invoked when the task is up for retry |
|
Invoked right before the task begins executing. |
|
Invoked when the task is running and AirflowSkipException raised. Explicitly it is NOT called if a task is not started to be executed because of a preceding branching decision in the DAG or a trigger rule which causes execution to skip so that the task execution is never scheduled. |
Example¶
In the following example, failures in any task call the task_failure_alert
function, and success in the last task calls the dag_success_alert
function:
import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3
Note
As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions
to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your DAG/task
callbacks: e.g on_failure_callback=[callback_func_1, callback_func_2]
Full list of variables available in context
in docs and code.