Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Deadline Alerts

Deadline Alerts allow you to set time thresholds for your Dag runs and automatically respond when those thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in reference point, setting an interval, and defining a response using either Airflow’s Notifiers or a custom callback function.

Creating a Deadline Alert

Creating a Deadline Alert requires three mandatory parameters:

  • Reference: When to start counting from

  • Interval: How far before or after the reference point to trigger the alert

  • Callback: A Callback object which contains a path to a callable and optional kwargs to pass to it if the deadline is exceeded

Here is how Deadlines are calculated:

[Reference] ------ [Interval] ------> [Deadline]
    ^                                     ^
    |                                     |
 Start time                          Trigger point

Below is an example Dag implementation. If the Dag has not finished 15 minutes after it was queued, send a Slack message:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference
from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
from airflow.providers.standard.operators.empty import EmptyOperator

with DAG(
    dag_id="deadline_alert_example",
    deadline=DeadlineAlert(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=timedelta(minutes=15),
        callback=AsyncCallback(
            SlackWebhookNotifier,
            kwargs={
                "slack_conn_id": "slack_default",
                "channel": "#alerts",
                "text": "Dag 'slack_deadline_alert' still running after 30 minutes.",
                "username": "Airflow Alerts",
            },
        ),
    ),
):
    EmptyOperator(task_id="example_task")

The timeline for this example would look like this:

|------|-----------|---------|-----------|--------|
    Scheduled    Queued    Started    Deadline
     00:00       00:03      00:05      00:18

Using Built-in References

Airflow provides several built-in reference points that you can use with DeadlineAlert:

DeadlineReference.DAGRUN_QUEUED_AT

Measures time from when the Dag run was queued. Useful for monitoring resource constraints.

DeadlineReference.DAGRUN_LOGICAL_DATE

References when the Dag run was scheduled to start. For example, setting an interval of timedelta(minutes=15) would trigger the alert if the Dag hasn’t completed 15 minutes after it was scheduled to start, regardless of when (or if) it actually began executing. Useful for ensuring scheduled Dags complete before their next scheduled run.

DeadlineReference.FIXED_DATETIME

Specifies a fixed point in time. Useful when Dags must complete by a specific time.

Here’s an example using a fixed datetime:

tomorrow_at_ten = datetime.combine(datetime.now().date() + timedelta(days=1), time(10, 0))

with DAG(
    dag_id="fixed_deadline_alert",
    deadline=DeadlineAlert(
        reference=DeadlineReference.FIXED_DATETIME(tomorrow_at_ten),
        interval=timedelta(minutes=-30),  # Alert 30 minutes before the reference.
        callback=AsyncCallback(
            SlackWebhookNotifier,
            kwargs={
                "slack_conn_id": "slack_default",
                "channel": "#alerts",
                "text": "Dag 'slack_deadline_alert' still running after 30 minutes.",
                "username": "Airflow Alerts",
            },
        ),
    ),
):
    EmptyOperator(task_id="example_task")

The timeline for this example would look like this:

|------|----------|---------|------------|--------|
     Queued     Start    Deadline    Reference
     09:15      09:17     09:30       10:00

Note

Note that since the interval is a negative value, the deadline is before the reference in this case.

Using Callbacks

When a deadline is exceeded, the callback is executed. You can use an existing Notifier or create a custom callback function. A callback must be either an AsyncCallback or a SyncCallback.

Using Built-in Notifiers

Here’s an example using the Slack Notifier if the Dag run has not finished within 30 minutes of it being queued:

with DAG(
    dag_id="slack_deadline_alert",
    deadline=DeadlineAlert(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=timedelta(minutes=30),
        callback=AsyncCallback(
            SlackWebhookNotifier,
            kwargs={
                "slack_conn_id": "slack_default",
                "channel": "#alerts",
                "text": "Dag 'slack_deadline_alert' still running after 30 minutes.",
                "username": "Airflow Alerts",
            },
        ),
    ),
):
    EmptyOperator(task_id="example_task")

Creating Custom Callbacks

You can create custom callbacks for more complex handling. The kwargs specified in the Callback are passed to the callback function, if any are provided. Synchronous callbacks (standard python methods) can be defined in the dag bundle and are run in the Executor. Asynchronous callbacks must be defined somewhere in the Triggerer’s system path.

Note

Regarding Async Custom Deadline callbacks:

  • Async callbacks are executed by the Triggerer, so users must ensure they are importable by the Triggerer.

  • One easy way to do this is to place the callback as a top-level method in a new file in the plugins folder.

  • The Triggerer will need to be restarted when a callback is added or changed in order to reload the file.

A custom synchronous callback might look like this:

from datetime import timedelta

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference, SyncCallback


def custom_synchronous_callback(**kwargs):
    """Handle deadline violation with custom logic."""
    print(f"Deadline exceeded for Dag {kwargs.get("dag_id")}!")
    print(f"Alert type: {kwargs.get("alert_type")}")
    # Additional custom handling here


with DAG(
    dag_id="custom_deadline_alert",
    deadline=DeadlineAlert(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=timedelta(minutes=15),
        callback=SyncCallback(
            custom_synchronous_callback,
            kwargs={"alert_type": "time_exceeded", "dag_id": "custom_deadline_alert"},
        ),
    ),
):
    EmptyOperator(task_id="example_task")

A custom asynchronous callback is only slightly more work. Note in the following example that the custom callback code is placed in a separate file, and must be imported in the Dag file.

Place this method in /files/plugins/deadline_callbacks.py:

async def custom_async_callback(**kwargs):
    """Handle deadline violation with custom logic."""
    print(f"Deadline exceeded for Dag {kwargs.get("dag_id")}!")
    print(f"Alert type: {kwargs.get("alert_type")}")
    # Additional custom handling here

Place this in a Dag file:

from datetime import timedelta

from deadline_callbacks import custom_async_callback

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference

with DAG(
    dag_id="custom_deadline_alert",
    deadline=DeadlineAlert(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=timedelta(minutes=15),
        callback=AsyncCallback(
            custom_async_callback,
            kwargs={"alert_type": "time_exceeded", "dag_id": "custom_deadline_alert"},
        ),
    ),
):
    EmptyOperator(task_id="example_task")

Deadline Calculation

A deadline’s trigger time is calculated by adding the interval to the datetime returned by the reference. For FIXED_DATETIME references, negative intervals can be particularly useful to trigger the callback before the reference time.

For example:

next_meeting = datetime(2025, 6, 26, 9, 30)

DeadlineAlert(
    reference=DeadlineReference.FIXED_DATETIME(next_meeting),
    interval=timedelta(hours=-2),
    callback=notify_team,
)

This will trigger the alert 2 hours before the next meeting starts.

For DAGRUN_LOGICAL_DATE, the interval is typically positive, setting a deadline relative to when the Dag was scheduled to run. Here’s an example:

DeadlineAlert(
    reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
    interval=timedelta(hours=1),
    callback=notify_team,
)

In this case, if a Dag is scheduled to run daily at midnight, the deadline would be triggered if the Dag hasn’t completed by 1:00 AM. This is useful for ensuring that scheduled jobs complete within a certain timeframe after their intended start time.

The flexibility of combining different references with positive or negative intervals allows you to create deadlines that suit a wide variety of operational requirements.

Custom References

While the built-in references should cover most use cases, and more will be released over time, you can create custom references by implementing a class that inherits from DeadlineReference. This may be useful if you have calendar integrations or other sources that you want to use as a reference.

class CustomReference(DeadlineReference):
    """A deadline reference that uses a custom data source."""

    # Define any required parameters for your reference
    required_kwargs = {"custom_id"}

    def _evaluate_with(self, *, session: Session, **kwargs) -> datetime:
        """
        Evaluate the reference time using the provided session and kwargs.

        The session parameter can be used for database queries, and kwargs
        will contain any required parameters defined in required_kwargs.
        """
        custom_id = kwargs["custom_id"]
        # Your custom logic here to determine the reference time
        return your_datetime

Was this entry helpful?