Event-driven scheduling¶
Added in version 3.0.
Apache Airflow allows for event-driven scheduling, enabling DAGs to be triggered based on external events rather than predefined time-based schedules. This is particularly useful in modern data architectures where workflows need to react to real-time data changes, messages, or system signals.
By using assets, as described in Data-aware scheduling, you can configure DAGs to start execution when specific external events occur. Assets provide a mechanism to establish dependencies between external events and DAG execution, ensuring that workflows react dynamically to changes in the external environment.
The AssetWatcher
class plays a crucial role in this mechanism. It monitors an external event source, such as a
message queue, and triggers an asset update when a relevant event occurs.
The watchers
parameter in the Asset
definition allows you to associate multiple AssetWatcher
instances with an
asset, enabling it to respond to various event sources.
Example: Triggering a DAG from an external message queue¶
Below is an example of how you can configure an Airflow DAG to be triggered by an external message queue, such as AWS SQS:
from airflow.sdk.definitions.asset import Asset, AssetWatcher
from airflow.providers.common.msgq.triggers.msg_queue import MessageQueueTrigger
from airflow import DAG
from datetime import datetime
# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")
# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])
# Define the DAG that will be triggered when the asset is updated
with DAG(dag_id="event_driven_dag", schedule=[asset], catchup=False) as dag:
...
How it works¶
1. Message Queue Trigger: The MessageQueueTrigger
listens for messages from an external queue
(e.g., AWS SQS, Kafka, or another messaging system).
2. Asset and Watcher: The Asset
abstracts the external entity, the SQS queue in this example.
The AssetWatcher
associate a trigger with a name. This name helps you identify which trigger is associated to which
asset.
3. Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).
Supported triggers for event-driven scheduling¶
Not all triggers in Airflow can be used for event-driven scheduling. As opposed to all triggers that
inherit from BaseTrigger
, only a subset that inherit from BaseEventTrigger
are compatible.
The reason for this restriction is that some triggers are not designed for event-driven scheduling, and using them to
schedule DAGs could lead to unintended results.
BaseEventTrigger
ensures that triggers used for scheduling adhere to an event-driven paradigm, reacting appropriately
to external event changes without causing unexpected DAG behavior.
Writing event-driven compatible triggers¶
To make a trigger compatible with event-driven scheduling, it must inherit from BaseEventTrigger
. There are three
main scenarios for working with triggers in this context:
1. Creating a new event-driven trigger: If you need a new trigger for an unsupported event source, you should create
a new class inheriting from BaseEventTrigger
and implement its logic.
2. Adapting an existing compatible trigger: If an existing trigger (inheriting from BaseEvent
) is proven to be
already compatible with event-driven scheduling, then you just need to change the base class from BaseTrigger
to
BaseEventTrigger
.
3. Adapting an existing incompatible trigger: If an existing trigger does not appear to be compatible with
event-driven scheduling, then a new trigger must be created.
This new trigger must inherit BaseEventTrigger
and ensure it properly works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some common code.
Use cases for event-driven DAGs¶
Data ingestion pipelines: Trigger ETL workflows when new data arrives in a storage system.
Machine learning workflows: Start training models when new datasets become available.
IoT and real-time analytics: React to sensor data, logs, or application events in real-time.
Microservices and event-driven architectures: Orchestrate workflows based on service-to-service messages.