Listeners

You can write listeners to enable Airflow to notify you when events happen. Pluggy powers these listeners.

Warning

Listeners are an advanced feature of Airflow. They are not isolated from the Airflow components they run in, and can slow down or in come cases take down your Airflow instance. As such, extra care should be taken when writing listeners.

Airflow supports notifications for the following events:

Lifecycle Events

  • on_starting

  • before_stopping

Lifecycle events allow you to react to start and stop events for an Airflow Job, like SchedulerJob.

DagRun State Change Events

DagRun state change events occur when a DagRun changes state.

  • on_dag_run_running

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    print("Dag run  in running state")
    queued_at = dag_run.queued_at

    version = dag_run.dag_version.version

    print(f"Dag information Queued at: {queued_at} version: {version}")


  • on_dag_run_success

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    print("Dag run in success state")
    start_date = dag_run.start_date
    end_date = dag_run.end_date

    print(f"Dag run start:{start_date} end:{end_date}")


  • on_dag_run_failed

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    external_trigger = dag_run.external_trigger

    print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
    print(f"Failed with message: {msg}")


TaskInstance State Change Events

TaskInstance state change events occur when a TaskInstance changes state. You can use these events to react to LocalTaskJob state changes.

  • on_task_instance_running

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to RUNNING.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that is running its dag_run,
    task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    state: TaskInstanceState = task_instance.state
    name: str = task_instance.task_id
    start_date = task_instance.start_date

    dagrun = task_instance.dag_run
    dagrun_status = dagrun.state

    task = task_instance.task

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name} state:{state} start_date:{start_date}")
    print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")


  • on_task_instance_success

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to SUCCESS.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has succeeded its
    dag_run, task and dag information.
    """
    print("Task instance in success state")
    print(" Previous state of the Task instance:", previous_state)

    dag_id = task_instance.dag_id
    hostname = task_instance.hostname
    operator = task_instance.operator

    dagrun = task_instance.dag_run
    queued_at = dagrun.queued_at
    print(f"Dag name:{dag_id} queued_at:{queued_at}")
    print(f"Task hostname:{hostname} operator:{operator}")


  • on_task_instance_failed

airflow/example_dags/plugins/event_listener.py[source]

@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
    """
    This method is called when task state changes to FAILED.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has failed its dag_run,
    task and dag information.
    """
    print("Task instance in failure state")

    start_date = task_instance.start_date
    end_date = task_instance.end_date
    duration = task_instance.duration

    dagrun = task_instance.dag_run

    task = task_instance.task

    if TYPE_CHECKING:
        assert task

    dag = task.dag

    print(f"Task start:{start_date} end:{end_date} duration:{duration}")
    print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
    if error:
        print(f"Failure caused by {error}")


Asset Events

  • on_asset_created

  • on_dataset_alias_created

  • on_asset_changed

Asset events occur when Asset management operations are run.

Dag Import Error Events

  • on_new_dag_import_error

  • on_existing_dag_import_error

Dag import error events occur when dag processor finds import error in the Dag code and update the metadata database table.

This is an experimental feature.

Usage

To create a listener:

  • import airflow.listeners.hookimpl

  • implement the hookimpls for events that you’d like to generate notifications

Airflow defines the specification as hookspec. Your implementation must accept the same named parameters as defined in hookspec. If you don’t use the same parameters as hookspec, Pluggy throws an error when you try to use your plugin. But you don’t need to implement every method. Many listeners only implement one method, or a subset of methods.

To include the listener in your Airflow installation, include it as a part of an Airflow Plugin.

Listener API is meant to be called across all DAGs and all operators. You can’t listen to events generated by specific DAGs. For that behavior, try methods like on_success_callback and pre_execute. These provide callbacks for particular DAG authors or operator creators. The logs and print() calls will be handled as part of the listeners.

Compatibility note

The listeners interface might change over time. We are using pluggy specifications which means that implementation of the listeners written for older versions of the interface should be forward-compatible with future versions of Airflow.

However, the opposite is not guaranteed, so if your listener is implemented against a newer version of the interface, it might not work with older versions of Airflow. It is not a problem if you target single version of Airflow, because you can adjust your implementation to the version of Airflow you use, but it is important if you are writing plugins or extensions that could be used with different versions of Airflow.

For example if a new field is added to the interface (like the error field in the on_task_instance_failed method in 2.10.0), the listener implementation will not handle the case when the field is not present in the event object and such listeners will only work for Airflow 2.10.0 and later.

In order to implement a listener that is compatible with multiple versions of Airflow including using features and fields added in newer versions of Airflow, you should check version of Airflow used and use newer version of the interface implementation, but for older versions of Airflow you should use older version of the interface.

For example if you want to implement a listener that uses the error field in the on_task_instance_failed, you should use code like this:

from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl

airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):

    class ClassBasedListener:
        ...

        @hookimpl
        def on_task_instance_failed(
            self, previous_state, task_instance, error: None | str | BaseException, session
        ):
            # Handle error case here
            pass

else:

    class ClassBasedListener:  # type: ignore[no-redef]
        ...

        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance, session):
            # Handle no error case here
            pass

List of changes in the listener interfaces since 2.8.0 when they were introduced:

Airflow Version

Affected method

Change

2.10.0

on_task_instance_failed

An error field added to the interface

Was this entry helpful?