Messaging Triggers¶
Wait for a message in a queue¶
Use the MessageQueueTrigger
to wait for a message in a
queue. Mandatory parameters of the trigger are:
scheme
- the queue scheme (e.g., ‘kafka’, ‘redis+pubsub’, ‘sqs’) you are using
Additional parameters can be provided depending on the queue provider. Connections needs to be provided with the relevant
default connection ID, for example, when connecting to a queue in AWS SQS, the connection ID should be
aws_default
.
Below is an example of how you can configure an Airflow Dag to be triggered by a message in Amazon SQS.
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher
# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(scheme="sqs", sqs_queue="https://sqs.us-east-1.amazonaws.com/0123456789/Test")
# Define an asset that watches for messages on the queue
asset = Asset("sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)])
with DAG(dag_id="example_msgq_watcher", schedule=[asset]) as dag:
EmptyOperator(task_id="task")
How it works¶
1. Message Queue Trigger: The MessageQueueTrigger
listens for messages from an external queue
(e.g., AWS SQS, Kafka, or another messaging system).
2. Asset and Watcher: The Asset
abstracts the external entity, the SQS queue in this example.
The AssetWatcher
associate a trigger with a name. This name helps you identify which trigger is associated to which
asset.
3. Event-Driven Dag: Instead of running on a fixed schedule, the Dag executes when the asset receives an update (e.g., a new message in the queue).
Use message payload in the Dag¶
When a message is received from the queue, the trigger passes the message payload as part of the trigger event.
You can access this payload in your DAG tasks using the triggering_asset_events
parameter.
from airflow.decorators import task
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import DAG, Asset, AssetWatcher, chain
# Define the asset with trigger
trigger = MessageQueueTrigger(scheme="kafka", topics=["my-kafka-topic"])
asset = Asset("kafka_queue_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)])
with DAG(dag_id="example_msgq_payload", schedule=[asset]) as dag:
@task
def process_message(triggering_asset_events):
for event in triggering_asset_events[asset]:
# Access the message payload
payload = event.extra["payload"]
# Process the payload as needed
print(f"Received message: {payload}")
chain(process_message())
The triggering_asset_events
parameter contains the events that triggered the DAG run, indexed by asset.
Each event includes an extra
dictionary where the message payload is stored under the payload
key.