Listeners¶
You can write listeners to enable Airflow to notify you when events happen. Pluggy powers these listeners.
Warning
Listeners are an advanced feature of Airflow. They are not isolated from the Airflow components they run in, and can slow down or in come cases take down your Airflow instance. As such, extra care should be taken when writing listeners.
Airflow supports notifications for the following events:
Lifecycle Events¶
on_starting
before_stopping
Lifecycle events allow you to react to start and stop events for an Airflow Job
, like SchedulerJob
.
DagRun State Change Events¶
DagRun state change events occur when a DagRun
changes state.
on_dag_run_running
@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to RUNNING.
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
version = dag_run.dag_version.version
print(f"Dag information Queued at: {queued_at} version: {version}")
on_dag_run_success
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to SUCCESS.
"""
print("Dag run in success state")
start_date = dag_run.start_date
end_date = dag_run.end_date
print(f"Dag run start:{start_date} end:{end_date}")
on_dag_run_failed
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger
print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Failed with message: {msg}")
TaskInstance State Change Events¶
TaskInstance state change events occur when a TaskInstance
changes state.
You can use these events to react to LocalTaskJob
state changes.
on_task_instance_running
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to RUNNING.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that is running its dag_run,
task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
state: TaskInstanceState = task_instance.state
name: str = task_instance.task_id
start_date = task_instance.start_date
dagrun = task_instance.dag_run
dagrun_status = dagrun.state
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name} state:{state} start_date:{start_date}")
print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
on_task_instance_success
@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to SUCCESS.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that has succeeded its
dag_run, task and dag information.
"""
print("Task instance in success state")
print(" Previous state of the Task instance:", previous_state)
dag_id = task_instance.dag_id
hostname = task_instance.hostname
operator = task_instance.operator
dagrun = task_instance.dag_run
queued_at = dagrun.queued_at
print(f"Dag name:{dag_id} queued_at:{queued_at}")
print(f"Task hostname:{hostname} operator:{operator}")
on_task_instance_failed
@hookimpl
def on_task_instance_failed(
previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
"""
This method is called when task state changes to FAILED.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that has failed its dag_run,
task and dag information.
"""
print("Task instance in failure state")
start_date = task_instance.start_date
end_date = task_instance.end_date
duration = task_instance.duration
dagrun = task_instance.dag_run
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
print(f"Task start:{start_date} end:{end_date} duration:{duration}")
print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
if error:
print(f"Failure caused by {error}")
Asset Events¶
on_asset_created
on_dataset_alias_created
on_asset_changed
Asset events occur when Asset management operations are run.
Dag Import Error Events¶
on_new_dag_import_error
on_existing_dag_import_error
Dag import error events occur when dag processor finds import error in the Dag code and update the metadata database table.
This is an experimental feature.
Usage¶
To create a listener:
import
airflow.listeners.hookimpl
implement the
hookimpls
for events that you’d like to generate notifications
Airflow defines the specification as hookspec. Your implementation must accept the same named parameters as defined in hookspec. If you don’t use the same parameters as hookspec, Pluggy throws an error when you try to use your plugin. But you don’t need to implement every method. Many listeners only implement one method, or a subset of methods.
To include the listener in your Airflow installation, include it as a part of an Airflow Plugin.
Listener API is meant to be called across all DAGs and all operators. You can’t listen to events generated by specific DAGs. For that behavior, try methods like on_success_callback
and pre_execute
. These provide callbacks for particular DAG authors or operator creators. The logs and print()
calls will be handled as part of the listeners.
Compatibility note¶
The listeners interface might change over time. We are using pluggy
specifications which
means that implementation of the listeners written for older versions of the interface should be
forward-compatible with future versions of Airflow.
However, the opposite is not guaranteed, so if your listener is implemented against a newer version of the interface, it might not work with older versions of Airflow. It is not a problem if you target single version of Airflow, because you can adjust your implementation to the version of Airflow you use, but it is important if you are writing plugins or extensions that could be used with different versions of Airflow.
For example if a new field is added to the interface (like the error
field in the
on_task_instance_failed
method in 2.10.0), the listener implementation will not handle the case when
the field is not present in the event object and such listeners will only work for Airflow 2.10.0 and later.
In order to implement a listener that is compatible with multiple versions of Airflow including using features and fields added in newer versions of Airflow, you should check version of Airflow used and use newer version of the interface implementation, but for older versions of Airflow you should use older version of the interface.
For example if you want to implement a listener that uses the error
field in the
on_task_instance_failed
, you should use code like this:
from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl
airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):
class ClassBasedListener:
...
@hookimpl
def on_task_instance_failed(
self, previous_state, task_instance, error: None | str | BaseException, session
):
# Handle error case here
pass
else:
class ClassBasedListener: # type: ignore[no-redef]
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, session):
# Handle no error case here
pass
List of changes in the listener interfaces since 2.8.0 when they were introduced:
Airflow Version |
Affected method |
Change |
---|---|---|
2.10.0 |
|
An error field added to the interface |