IBM MQ Message Queue

IBM MQ Queue Provider

Implemented by IBMMQMessageQueueProvider

The IBM MQ Queue Provider is a BaseMessageQueueProvider that uses IBM MQ as the underlying message queue system.

It allows you to send and receive messages using IBM MQ queues in your Airflow workflows via the common message queue interface MessageQueueTrigger.

  • It uses ibmmq as scheme for identifying IBM MQ queues.

  • For parameter definitions take a look at AwaitMessageTrigger.

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher

trigger = MessageQueueTrigger(
    queue="ibmmq://mq_default/MY.QUEUE.NAME",
)

asset = Asset("mq_topic_asset", watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)])

IBM MQ Message Queue Trigger

Implemented by AwaitMessageTrigger

Inherited from MessageQueueTrigger

Wait for a message in a queue

Warning

AwaitMessageTrigger consumes IBM MQ messages with MQGMO_NO_SYNCPOINT. This gives the trigger at-most-once delivery semantics: once IBM MQ returns a message, it is removed from the queue before Airflow emits the corresponding TriggerEvent. If the trigger is canceled in that window, the message can be lost.

Below is an example of how you can configure an Airflow DAG to be triggered by a message arriving in an IBM MQ queue.

tests/system/ibm/mq/example_dag_message_queue_trigger.py[source]

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import DAG, Asset, AssetWatcher, task

# Define a trigger that listens to an external message queue (IBM MQ in this case)
trigger = MessageQueueTrigger(
    queue="ibmmq://mq_default/MY.QUEUE.NAME",
)

mq_topic_asset = Asset(
    "mq_topic_asset",
    watchers=[AssetWatcher(name="mq_watcher", trigger=trigger)],
)

with DAG(dag_id="example_ibm_mq_watcher", schedule=[mq_topic_asset]) as dag:

    @task
    def process_message(**context):
        for event in context["triggering_asset_events"][mq_topic_asset]:
            # Get the message from the TriggerEvent payload
            print("Processing event: ", event)
            payload = event.extra["payload"]
            print("Actual payload: ", payload)

How it works

  1. IBM MQ Message Queue Trigger The AwaitMessageTrigger listens for messages from an IBM MQ queue. It uses a non-transactional MQ get, so the integration is at-most-once rather than at-least-once.

  2. Asset and Watcher The Asset abstracts the external entity, the IBM MQ queue in this example. The AssetWatcher associates a trigger with a name. This name helps you identify which trigger is associated with which asset.

  3. Event-Driven DAG Instead of running on a fixed schedule, the DAG executes when the asset receives an update (for example, when a new message arrives in the queue).

For how to use the trigger, refer to the documentation of the Messaging Trigger.

Was this entry helpful?