Messaging Triggers¶
Wait for a message in a queue¶
Use the MessageQueueTrigger
to wait for a message in a
queue. Parameters of the trigger are:
queue
- the queue identifier
Additional parameters can be provided depending on the queue provider. Connections needs to be provided with the relevant
default connection ID, for example, when connecting to a queue in AWS SQS, the connection ID should be
aws_default
.
Below is an example of how you can configure an Airflow DAG to be triggered by a message in Amazon SQS.
tests/system/common/messaging/example_message_queue_trigger.py
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher
# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")
# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])
with DAG(dag_id="example_msgq_watcher", schedule=[asset]) as dag:
EmptyOperator(task_id="task")
How it works¶
1. Message Queue Trigger: The MessageQueueTrigger
listens for messages from an external queue
(e.g., AWS SQS, Kafka, or another messaging system).
2. Asset and Watcher: The Asset
abstracts the external entity, the SQS queue in this example.
The AssetWatcher
associate a trigger with a name. This name helps you identify which trigger is associated to which
asset.
3. Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).