Azure Service Bus Message Queue

Azure Service Bus Queue Provider

Implemented by AzureServiceBusMessageQueueProvider

The Azure Service Bus Queue Provider is a BaseMessageQueueProvider that uses Azure Service Bus as the underlying message queue system. It allows you to send and receive messages using Azure Service Bus queues in your Airflow workflows with MessageQueueTrigger common message queue interface.

  • It uses azure+servicebus as the scheme for identifying the provider.

  • For parameter definitions, take a look at AzureServiceBusQueueTrigger.

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher

trigger = MessageQueueTrigger(
    scheme="azure+servicebus",
    # AzureServiceBusQueueTrigger parameters
    queues=["my-queue"],
    azure_service_bus_conn_id="azure_service_bus_default",
    poll_interval=60,
)

asset = Asset(
    "asb_queue_asset",
    watchers=[AssetWatcher(name="asb_watcher", trigger=trigger)],
)

Azure Service Bus Message Queue Trigger

Implemented by AzureServiceBusQueueTrigger

Inherited from MessageQueueTrigger

Wait for a message in a queue

Below is an example of how you can configure an Airflow DAG to be triggered by a message in Azure Service Bus.

tests/system/microsoft/azure/example_event_schedule_asb.py[source]

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher

# Define a trigger that listens to an Azure Service Bus queue
trigger = MessageQueueTrigger(
    scheme="azure+servicebus",
    queues=["my-queue"],
    azure_service_bus_conn_id="azure_service_bus_default",
    poll_interval=60,
)

# Define an asset that watches for messages on the Azure Service Bus queue
asset = Asset(
    "event_schedule_asb_asset_1",
    watchers=[AssetWatcher(name="event_schedule_asb_watcher_1", trigger=trigger)],
)

with DAG(
    dag_id="example_event_schedule_asb",
    schedule=[asset],
) as dag:
    process_message_task = EmptyOperator(task_id="process_asb_message")

How it works

  1. Azure Service Bus Message Queue Trigger: The AzureServiceBusQueueTrigger listens for messages from Azure Service Bus queue(s).

  2. Asset and Watcher: The Asset abstracts the external entity, the Azure Service Bus queue in this example. The AssetWatcher associate a trigger with a name. This name helps you identify which trigger is associated to which asset.

  3. Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).

For how to use the trigger, refer to the documentation of the Messaging Trigger

Was this entry helpful?