Listeners

You can write listeners to enable Airflow to notify you when events happen. Pluggy powers these listeners.

Warning

Listeners are an advanced feature of Airflow. They are not isolated from the Airflow components they run in, and can slow down or in come cases take down your Airflow instance. As such, extra care should be taken when writing listeners.

Airflow supports notifications for the following events:

Lifecycle Events

  • on_starting

  • before_stopping

Lifecycle events allow you to react to start and stop events for an Airflow Job, like SchedulerJob.

DagRun State Change Events

DagRun state change events occur when a DagRun changes state. Beginning with Airflow 3, listeners are also notified whenever a state change is triggered through the API (for on_dag_run_success and on_dag_run_failed) e.g., when a DagRun is marked as success from the Airflow UI.

  • on_dag_run_running

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    print("Dag run  in running state")
    queued_at = dag_run.queued_at

    version = dag_run.version_number

    print(f"Dag information Queued at: {queued_at} version: {version}")


  • on_dag_run_success

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    print("Dag run in success state")
    start_date = dag_run.start_date
    end_date = dag_run.end_date

    print(f"Dag run start:{start_date} end:{end_date}")


  • on_dag_run_failed

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    run_type = dag_run.run_type

    print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
    print(f"Failed with message: {msg}")


TaskInstance State Change Events

TaskInstance state change events occur when a RuntimeTaskInstance changes state. You can use these events to react to LocalTaskJob state changes. Starting with Airflow 3, listeners are also notified when a state change is triggered through the API (for on_task_instance_success and on_task_instance_failed) e.g., when marking a task instance as success from the Airflow UI. In such cases, the listener will receive a TaskInstance instance instead of a RuntimeTaskInstance instance.

  • on_task_instance_running

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance):
    """
    Called when task state changes to RUNNING.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that is running, its dag_run, task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    name: str = task_instance.task_id

    context = task_instance.get_template_context()

    task = context["task"]

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name}")
    print(f"Dag name:{dag_name}")


  • on_task_instance_success

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_success(
    previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance | TaskInstance
):
    """
    Called when task state changes to SUCCESS.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that has succeeded, its dag_run, task and dag information.

    A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
    through the API. In that case, the TaskInstance available on the API server will be provided instead.
    """
    print("Task instance in success state")
    print(" Previous state of the Task instance:", previous_state)

    if isinstance(task_instance, TaskInstance):
        print("Task instance's state was changed through the API.")

        print(f"Task operator:{task_instance.operator}")
        return

    context = task_instance.get_template_context()
    operator = context["task"]

    print(f"Task operator:{operator}")


  • on_task_instance_failed

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState,
    task_instance: RuntimeTaskInstance | TaskInstance,
    error: None | str | BaseException,
):
    """
    Called when task state changes to FAILED.

    previous_task_state, task_instance object and error can be used to retrieve more information about current
    task_instance that has failed, its dag_run, task and dag information.

    A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
    through the API. In that case, the TaskInstance available on the API server will be provided instead.
    """
    print("Task instance in failure state")

    if isinstance(task_instance, TaskInstance):
        print("Task instance's state was changed through the API.")

        print(f"Task operator:{task_instance.operator}")
        if error:
            print(f"Failure caused by {error}")
        return

    context = task_instance.get_template_context()
    task = context["task"]

    if TYPE_CHECKING:
        assert task

    print("Task start")
    print(f"Task:{task}")
    if error:
        print(f"Failure caused by {error}")


Asset Events

  • on_asset_created

  • on_dataset_alias_created

  • on_asset_changed

Asset events occur when Asset management operations are run.

Dag Import Error Events

  • on_new_dag_import_error

  • on_existing_dag_import_error

Dag import error events occur when dag processor finds import error in the Dag code and update the metadata database table.

This is an experimental feature.

Usage

To create a listener:

  • import airflow.listeners.hookimpl

  • implement the hookimpls for events that you’d like to generate notifications

Airflow defines the specification as hookspec. Your implementation must accept the same named parameters as defined in hookspec. If you don’t use the same parameters as hookspec, Pluggy throws an error when you try to use your plugin. But you don’t need to implement every method. Many listeners only implement one method, or a subset of methods.

To include the listener in your Airflow installation, include it as a part of an Airflow Plugin.

Listener API is meant to be called across all dags and all operators. You can’t listen to events generated by specific dags. For that behavior, try methods like on_success_callback and pre_execute. These provide callbacks for particular DAG authors or operator creators. The logs and print() calls will be handled as part of the listeners.

Compatibility note

The listeners interface might change over time. We are using pluggy specifications which means that implementation of the listeners written for older versions of the interface should be forward-compatible with future versions of Airflow.

However, the opposite is not guaranteed, so if your listener is implemented against a newer version of the interface, it might not work with older versions of Airflow. It is not a problem if you target single version of Airflow, because you can adjust your implementation to the version of Airflow you use, but it is important if you are writing plugins or extensions that could be used with different versions of Airflow.

For example if a new field is added to the interface (like the error field in the on_task_instance_failed method in 2.10.0), the listener implementation will not handle the case when the field is not present in the event object and such listeners will only work for Airflow 2.10.0 and later.

In order to implement a listener that is compatible with multiple versions of Airflow including using features and fields added in newer versions of Airflow, you should check version of Airflow used and use newer version of the interface implementation, but for older versions of Airflow you should use older version of the interface.

For example if you want to implement a listener that uses the error field in the on_task_instance_failed, you should use code like this:

from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl

airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):

    class ClassBasedListener:
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException):
            # Handle error case here
            pass

else:

    class ClassBasedListener:  # type: ignore[no-redef]
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance):
            # Handle no error case here
            pass

List of changes in the listener interfaces since 2.8.0 when they were introduced:

Airflow Version

Affected method

Change

2.10.0

on_task_instance_failed

An error field added to the interface

3.0.0

on_task_instance_running

session argument removed from task instance listeners, task_instance object is now an instance of RuntimeTaskInstance

3.0.0

on_task_instance_failed, on_task_instance_success

session argument removed from task instance listeners, task_instance object is now an instance of RuntimeTaskInstance when on worker and TaskInstance when on API server

Was this entry helpful?