Google Cloud Messaging Queues

Google Cloud Pub/Sub Queue Provider

Implemented by PubSubMessageQueueEventTriggerContainer

The Google Cloud Pub/Sub Queue Provider is a message queue provider that uses Google Cloud Pub/Sub as the underlying message queue system.

It allows you to send and receive messages using Cloud Pub/Sub in your Airflow workflows with MessageQueueTrigger common message queue interface.

  • It uses google+pubsub as the scheme for identifying the provider.

  • For parameter definitions, take a look at PubsubPullTrigger.

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher

trigger = MessageQueueTrigger(
    scheme="google+pubsub",
    # Additional PubsubPullTrigger parameters as needed
    project_id="my_project",
    subscription="my_subscription",
    ack_messages=True,
    max_messages=1,
    gcp_conn_id="google_cloud_default",
    poke_interval=60.0,
)

asset = Asset("pubsub_queue_asset", watchers=[AssetWatcher(name="pubsub_watcher", trigger=trigger)])

Pub/Sub Message Queue Trigger

Implemented by PubsubPullTrigger

Inherited from MessageQueueTrigger

Wait for a message in a queue

Below is an example of how you can configure an Airflow DAG to be triggered by a message in Pub/Sub.

tests/system/google/event_scheduling/example_event_schedule_pubsub.py[source]

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher

# Define a trigger that listens to a Google Cloud Pub/Sub subscription
trigger = MessageQueueTrigger(
    scheme="google+pubsub",
    project_id="my-project",
    subscription="test-subscription",
    ack_messages=True,
    max_messages=1,
    gcp_conn_id="google_cloud_default",
    poke_interval=60.0,
)

# Define an asset that watches for messages on the Pub/Sub subscription
asset = Asset(
    "event_schedule_pubsub_asset_1",
    watchers=[AssetWatcher(name="event_schedule_pubsub_watcher_1", trigger=trigger)],
)

with DAG(
    dag_id="example_event_schedule_pubsub",
    schedule=[asset],
) as dag:
    process_message_task = EmptyOperator(task_id="process_pubsub_message")

How it works

  1. Pub/Sub Message Queue Trigger: The PubsubPullTrigger listens for messages from a Google Cloud Pub/Sub subscription.

  2. Asset and Watcher: The Asset abstracts the external entity, the Pub/Sub subscription in this example. The AssetWatcher associate a trigger with a name. This name helps you identify which trigger is associated to which asset.

  3. Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).

For how to use the trigger, refer to the documentation of the Messaging Trigger

Was this entry helpful?