Google Cloud Messaging Queues¶
Google Cloud Pub/Sub Queue Provider¶
Implemented by PubSubMessageQueueEventTriggerContainer
The Google Cloud Pub/Sub Queue Provider is a message queue provider that uses Google Cloud Pub/Sub as the underlying message queue system.
It allows you to send and receive messages using Cloud Pub/Sub in your Airflow workflows
with MessageQueueTrigger common message queue interface.
It uses
google+pubsubas the scheme for identifying the provider.For parameter definitions, take a look at
PubsubPullTrigger.from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger from airflow.sdk import Asset, AssetWatcher trigger = MessageQueueTrigger( scheme="google+pubsub", # Additional PubsubPullTrigger parameters as needed project_id="my_project", subscription="my_subscription", ack_messages=True, max_messages=1, gcp_conn_id="google_cloud_default", poke_interval=60.0, ) asset = Asset("pubsub_queue_asset", watchers=[AssetWatcher(name="pubsub_watcher", trigger=trigger)])
Pub/Sub Message Queue Trigger¶
Implemented by PubsubPullTrigger
Inherited from MessageQueueTrigger
Wait for a message in a queue¶
Below is an example of how you can configure an Airflow DAG to be triggered by a message in Pub/Sub.
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher
# Define a trigger that listens to a Google Cloud Pub/Sub subscription
trigger = MessageQueueTrigger(
scheme="google+pubsub",
project_id="my-project",
subscription="test-subscription",
ack_messages=True,
max_messages=1,
gcp_conn_id="google_cloud_default",
poke_interval=60.0,
)
# Define an asset that watches for messages on the Pub/Sub subscription
asset = Asset(
"event_schedule_pubsub_asset_1",
watchers=[AssetWatcher(name="event_schedule_pubsub_watcher_1", trigger=trigger)],
)
with DAG(
dag_id="example_event_schedule_pubsub",
schedule=[asset],
) as dag:
process_message_task = EmptyOperator(task_id="process_pubsub_message")
How it works¶
Pub/Sub Message Queue Trigger: The
PubsubPullTriggerlistens for messages from a Google Cloud Pub/Sub subscription.Asset and Watcher: The
Assetabstracts the external entity, the Pub/Sub subscription in this example. TheAssetWatcherassociate a trigger with a name. This name helps you identify which trigger is associated to which asset.Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).
For how to use the trigger, refer to the documentation of the Messaging Trigger