Messaging Triggers

Wait for a message in a queue

Use the MessageQueueTrigger to wait for a message in a queue. Parameters of the trigger are:

  • queue - the queue identifier

Additional parameters can be provided depending on the queue provider. Connections needs to be provided with the relevant default connection ID, for example, when connecting to a queue in AWS SQS, the connection ID should be aws_default.

Below is an example of how you can configure an Airflow DAG to be triggered by a message in Amazon SQS.

tests/system/common/messaging/example_message_queue_trigger.py

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher

# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")

# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])

with DAG(dag_id="example_msgq_watcher", schedule=[asset]) as dag:
    EmptyOperator(task_id="task")

How it works

1. Message Queue Trigger: The MessageQueueTrigger listens for messages from an external queue (e.g., AWS SQS, Kafka, or another messaging system).

2. Asset and Watcher: The Asset abstracts the external entity, the SQS queue in this example. The AssetWatcher associate a trigger with a name. This name helps you identify which trigger is associated to which asset.

3. Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).

Was this entry helpful?