IBM MQ Message Queue¶
IBM MQ Queue Provider¶
Implemented by IBMMQMessageQueueProvider
The IBM MQ Queue Provider is a
BaseMessageQueueProvider
that uses IBM MQ as the underlying message queue system.
It allows you to send and receive messages using IBM MQ queues in your Airflow workflows
via the common message queue interface
MessageQueueTrigger.
It uses
ibmmqas scheme for identifying IBM MQ queues.For parameter definitions take a look at
AwaitMessageTrigger.from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger from airflow.sdk import Asset, AssetWatcher trigger = MessageQueueTrigger( queue="ibmmq://mq_default/MY.QUEUE.NAME", ) asset = Asset("mq_topic_asset", watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)])
IBM MQ Message Queue Trigger¶
Implemented by AwaitMessageTrigger
Inherited from
MessageQueueTrigger
Wait for a message in a queue¶
Warning
AwaitMessageTrigger consumes IBM MQ messages with
MQGMO_NO_SYNCPOINT. This gives the trigger at-most-once delivery
semantics: once IBM MQ returns a message, it is removed from the queue
before Airflow emits the corresponding TriggerEvent. If the trigger is
canceled in that window, the message can be lost.
Below is an example of how you can configure an Airflow DAG to be triggered by a message arriving in an IBM MQ queue.
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import DAG, Asset, AssetWatcher, task
# Define a trigger that listens to an external message queue (IBM MQ in this case)
trigger = MessageQueueTrigger(
queue="ibmmq://mq_default/MY.QUEUE.NAME",
)
mq_topic_asset = Asset(
"mq_topic_asset",
watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)],
)
with DAG(dag_id="example_ibm_mq_watcher", schedule=[mq_topic_asset]) as dag:
@task
def process_message(**context):
for event in context["triggering_asset_events"][mq_topic_asset]:
# Get the message from the TriggerEvent payload
print("Processing event: ", event)
payload = event.extra["payload"]
print("Actual payload: ", payload)
How it works¶
IBM MQ Message Queue Trigger The
AwaitMessageTriggerlistens for messages from an IBM MQ queue. It uses a non-transactional MQ get, so the integration is at-most-once rather than at-least-once.Asset and Watcher The
Assetabstracts the external entity, the IBM MQ queue in this example. TheAssetWatcherassociates a trigger with a name. This name helps you identify which trigger is associated with which asset.Event-Driven DAG Instead of running on a fixed schedule, the DAG executes when the asset receives an update (for example, when a new message arrives in the queue).
For how to use the trigger, refer to the documentation of the Messaging Trigger.