Airflow Summit 2025 is coming October 07-09. Register now to secure your spot!

Messaging Triggers

Wait for a message in a queue

Use the MessageQueueTrigger to wait for a message in a queue. Mandatory parameters of the trigger are:

  • scheme - the queue scheme (e.g., ‘kafka’, ‘redis+pubsub’, ‘sqs’) you are using

Additional parameters can be provided depending on the queue provider. Connections needs to be provided with the relevant default connection ID, for example, when connecting to a queue in AWS SQS, the connection ID should be aws_default.

Below is an example of how you can configure an Airflow Dag to be triggered by a message in Amazon SQS.

tests/system/common/messaging/example_message_queue_trigger.py[source]

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher

# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(scheme="sqs", sqs_queue="https://sqs.us-east-1.amazonaws.com/0123456789/Test")
# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])

with DAG(dag_id="example_msgq_watcher", schedule=[asset]) as dag:
    EmptyOperator(task_id="task")

How it works

1. Message Queue Trigger: The MessageQueueTrigger listens for messages from an external queue (e.g., AWS SQS, Kafka, or another messaging system).

2. Asset and Watcher: The Asset abstracts the external entity, the SQS queue in this example. The AssetWatcher associate a trigger with a name. This name helps you identify which trigger is associated to which asset.

3. Event-Driven Dag: Instead of running on a fixed schedule, the Dag executes when the asset receives an update (e.g., a new message in the queue).

Use message payload in the Dag

When a message is received from the queue, the trigger passes the message payload as part of the trigger event. You can access this payload in your DAG tasks using the triggering_asset_events parameter.

from airflow.decorators import task
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import DAG, Asset, AssetWatcher, chain

# Define the asset with trigger
trigger = MessageQueueTrigger(scheme="kafka", topics=["my-kafka-topic"])
asset = Asset("kafka_queue_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)])

with DAG(dag_id="example_msgq_payload", schedule=[asset]) as dag:

    @task
    def process_message(triggering_asset_events):
        for event in triggering_asset_events[asset]:
            # Access the message payload
            payload = event.extra["payload"]
            # Process the payload as needed
            print(f"Received message: {payload}")

    chain(process_message())

The triggering_asset_events parameter contains the events that triggered the DAG run, indexed by asset. Each event includes an extra dictionary where the message payload is stored under the payload key.

Was this entry helpful?