airflow.triggers.shared_stream
Shared underlying I/O between BaseEventTrigger instances in the triggerer.
When multiple triggers declare the same non-None
shared_stream_key(), the
triggerer routes them through SharedStreamManager so that one
underlying poll loop produces raw events that are broadcast to every
participating trigger. Each trigger then runs
filter_shared_stream() to
convert the broadcast into its own TriggerEvent
instances. Triggers that opt out (the default) keep their independent
run()-based poll loops untouched.
Scope and the missing ack channel
The shared-stream channel is one-way: events flow from
open_shared_stream out to each subscriber’s filter_shared_stream,
with no path back. Subscribers cannot tell the producer “I accepted this
event; please advance / commit / ack”. The pattern is therefore only safe
for upstreams whose consumption does not need a producer-side side effect
tied to a subscriber’s accept / reject decision:
Idempotent / read-only reads (filesystem listings, polling REST APIs).
Auto-commit Kafka consumers (
enable.auto.commit=true).Subscriber-side-effect cleanup (
unlink, local marking, …) where the per-event action goes through APIs the subscriber owns independently.
Kafka manual-commit consumers, SQS delete-on-process / visibility
extension, and similar message-broker patterns where progress is per-message
and tied to the subscriber’s decision are not in scope here today. A
producer-side ack channel to cover them is a follow-up that should be
designed against a concrete Kafka or SQS consumer rather than against an
abstract API. See BaseEventTrigger for the
matching subclass-facing notes.
Lifecycle invariants
The manager and groups cooperate to keep a single invariant true at every
await-point:
A key is present in
SharedStreamManager._groupsonly while its group’s poll task is alive and accepting new subscribers.
This rules out the late-subscriber races that the naive design admits — a new subscriber for a key whose poll has died or is in the middle of being torn down always falls through to “create a fresh group” rather than attaching to a dead one and hanging on an empty queue. The invariant is maintained synchronously:
When
_pollends for any reason other than cancellation (the upstream iterator raised, or returned), the group’sfinallyblock evicts the key from_groupsand broadcasts a terminal sentinel to current subscribers — all without yielding, so no other coroutine can interleave.When the last subscriber leaves,
SharedStreamManager.unsubscribe()evicts the key from_groupsbefore awaitinggroup.stop(), so a new subscriber arriving while we wait for cancellation creates a fresh group.SharedStreamManager.stop_all()clears_groupsin one synchronous step before awaiting any stop, applying the same rule to shutdown.
Attributes
Default per-subscriber queue size for shared streams. |
Classes
Coordinate |
Module Contents
- airflow.triggers.shared_stream.log[source]
- airflow.triggers.shared_stream.DEFAULT_SUBSCRIBER_QUEUE_MAX = 1024[source]
Default per-subscriber queue size for shared streams.
The
SharedStreamManageradmits up to this many unconsumed raw events per subscriber before treating the subscriber as too slow to keep up — at which point the subscriber’s trigger is failed with_SubscriberOverflowrather than the queue growing without bound.Used as the fallback when no value is passed to
SharedStreamManager; in the triggerer this is overridden from the[triggerer] shared_stream_subscriber_queue_sizeconfig option.
- class airflow.triggers.shared_stream.SharedStreamManager(*, log=None, max_subscriber_queue=DEFAULT_SUBSCRIBER_QUEUE_MAX)[source]
Coordinate
BaseEventTriggerinstances that share underlying I/O.The manager owns one
_SharedStreamGroupper distinctshared_stream_key. Each group runs a single async task that drivesopen_shared_stream; subscribers receive raw events through their own asyncio queues and convert them toTriggerEventinstances independently.The manager is single-event-loop and not thread-safe. The triggerer’s
TriggerRunneris its sole owner.- log[source]
- subscribe(*, trigger_id, trigger, key)[source]
Subscribe a trigger to the shared stream identified by
key.On first subscriber for a given key the group is created and the underlying poll loop is started. Returns an async iterator of raw events the trigger should feed into
filter_shared_stream.
- async unsubscribe(trigger_id, key)[source]
Remove a subscriber.
When the last subscriber for
keyleaves, the key is evicted from_groupssynchronously and the underlying poll task is cancelled. Eviction happens before awaitingstop()so that a new subscriber arriving while we wait for cancellation builds a fresh group rather than attaching to the dying one.
- async stop_all()[source]
Cancel every active group; used during triggerer shutdown.
Was this entry helpful?