airflow.triggers.shared_stream

Shared underlying I/O between BaseEventTrigger instances in the triggerer.

When multiple triggers declare the same non-None shared_stream_key(), the triggerer routes them through SharedStreamManager so that one underlying poll loop produces raw events that are broadcast to every participating trigger. Each trigger then runs filter_shared_stream() to convert the broadcast into its own TriggerEvent instances. Triggers that opt out (the default) keep their independent run()-based poll loops untouched.

Scope and the missing ack channel

The shared-stream channel is one-way: events flow from open_shared_stream out to each subscriber’s filter_shared_stream, with no path back. Subscribers cannot tell the producer “I accepted this event; please advance / commit / ack”. The pattern is therefore only safe for upstreams whose consumption does not need a producer-side side effect tied to a subscriber’s accept / reject decision:

  • Idempotent / read-only reads (filesystem listings, polling REST APIs).

  • Auto-commit Kafka consumers (enable.auto.commit=true).

  • Subscriber-side-effect cleanup (unlink, local marking, …) where the per-event action goes through APIs the subscriber owns independently.

Kafka manual-commit consumers, SQS delete-on-process / visibility extension, and similar message-broker patterns where progress is per-message and tied to the subscriber’s decision are not in scope here today. A producer-side ack channel to cover them is a follow-up that should be designed against a concrete Kafka or SQS consumer rather than against an abstract API. See BaseEventTrigger for the matching subclass-facing notes.

Lifecycle invariants

The manager and groups cooperate to keep a single invariant true at every await-point:

A key is present in SharedStreamManager._groups only while its group’s poll task is alive and accepting new subscribers.

This rules out the late-subscriber races that the naive design admits — a new subscriber for a key whose poll has died or is in the middle of being torn down always falls through to “create a fresh group” rather than attaching to a dead one and hanging on an empty queue. The invariant is maintained synchronously:

  • When _poll ends for any reason other than cancellation (the upstream iterator raised, or returned), the group’s finally block evicts the key from _groups and broadcasts a terminal sentinel to current subscribers — all without yielding, so no other coroutine can interleave.

  • When the last subscriber leaves, SharedStreamManager.unsubscribe() evicts the key from _groups before awaiting group.stop(), so a new subscriber arriving while we wait for cancellation creates a fresh group.

  • SharedStreamManager.stop_all() clears _groups in one synchronous step before awaiting any stop, applying the same rule to shutdown.

Attributes

log

DEFAULT_SUBSCRIBER_QUEUE_MAX

Default per-subscriber queue size for shared streams.

Classes

SharedStreamManager

Coordinate BaseEventTrigger instances that share underlying I/O.

Module Contents

airflow.triggers.shared_stream.log[source]
airflow.triggers.shared_stream.DEFAULT_SUBSCRIBER_QUEUE_MAX = 1024[source]

Default per-subscriber queue size for shared streams.

The SharedStreamManager admits up to this many unconsumed raw events per subscriber before treating the subscriber as too slow to keep up — at which point the subscriber’s trigger is failed with _SubscriberOverflow rather than the queue growing without bound.

Used as the fallback when no value is passed to SharedStreamManager; in the triggerer this is overridden from the [triggerer] shared_stream_subscriber_queue_size config option.

class airflow.triggers.shared_stream.SharedStreamManager(*, log=None, max_subscriber_queue=DEFAULT_SUBSCRIBER_QUEUE_MAX)[source]

Coordinate BaseEventTrigger instances that share underlying I/O.

The manager owns one _SharedStreamGroup per distinct shared_stream_key. Each group runs a single async task that drives open_shared_stream; subscribers receive raw events through their own asyncio queues and convert them to TriggerEvent instances independently.

The manager is single-event-loop and not thread-safe. The triggerer’s TriggerRunner is its sole owner.

log[source]
subscribe(*, trigger_id, trigger, key)[source]

Subscribe a trigger to the shared stream identified by key.

On first subscriber for a given key the group is created and the underlying poll loop is started. Returns an async iterator of raw events the trigger should feed into filter_shared_stream.

async unsubscribe(trigger_id, key)[source]

Remove a subscriber.

When the last subscriber for key leaves, the key is evicted from _groups synchronously and the underlying poll task is cancelled. Eviction happens before awaiting stop() so that a new subscriber arriving while we wait for cancellation builds a fresh group rather than attaching to the dying one.

async stop_all()[source]

Cancel every active group; used during triggerer shutdown.

Was this entry helpful?