Callbacks¶
A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given DAG or task, or across all tasks in a given DAG. For example, you may wish to alert when certain tasks have failed, or invoke a callback when your DAG succeeds.
There are three different places where callbacks can be defined.
Callbacks set in the DAG definition will be applied at the DAG level.
Using
default_args
, callbacks can be set for each task in a DAG.Individual callbacks can be set for a task by setting that callback within the task definition itself.
Note
Callback functions are only invoked when the DAG or task state changes due to execution by a worker. As such, DAG and task changes set by the command line interface (CLI) or user interface (UI) do not execute callback functions.
Warning
Callback functions are executed after tasks are completed.
Errors in callback functions will show up in scheduler logs rather than task logs.
By default, scheduler logs do not show up in the UI and instead can be found in
$AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log
Callback Types¶
There are six types of events that can trigger a callback:
Name |
Description |
---|---|
|
Invoked when the DAG succeeds or task succeeds. Available at the DAG or task level. |
|
Invoked when the task fails. Available at the DAG or task level. |
|
Invoked when the task is up for retry. Available only at the task level. |
|
Invoked right before the task begins executing. Available only at the task level. |
|
Invoked when the task is running and AirflowSkipException raised. Explicitly it is NOT called if a task is not started to be executed because of a preceding branching decision in the DAG or a trigger rule which causes execution to skip so that the task execution is never scheduled. Available only at the task level. |
Example¶
In the following example, failures in task1
call the task_failure_alert
function, and success at DAG level calls the dag_success_alert
function.
Before each task begins to execute, the task_execute_callback
function will be called:
import datetime
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
def task_execute_callback(context):
print(f"Task has begun execution, task_instance_key_str: {context['task_instance_key_str']}")
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=dag_success_alert,
default_args={"on_execute_callback": task_execute_callback},
tags=["example"],
):
task1 = EmptyOperator(task_id="task1", on_failure_callback=[task_failure_alert])
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task1 >> task2 >> task3
Note
As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions
to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your DAG/task
callbacks: e.g on_failure_callback=[callback_func_1, callback_func_2]
Full list of variables available in context
in docs and code.