airflow.triggers.shared_stream

Shared underlying I/O between BaseEventTrigger instances in the triggerer.

When multiple triggers declare the same non-None shared_stream_key(), the triggerer routes them through SharedStreamManager so that one underlying poll loop produces raw events that are broadcast to every participating trigger. Each trigger then runs filter_shared_stream() to convert the broadcast into its own TriggerEvent instances. Triggers that opt out (the default) keep their independent run()-based poll loops untouched.

Producer-side ack channel

When a trigger overrides create_shared_stream_producer(), the manager switches to ack mode for that stream.

Subscribers receive the same stream of raw events in both modes: the same filter_shared_stream code works unchanged on the fast path and in ack mode. The difference is resolution bookkeeping and the broker advance, both of which the manager derives from consumption progress — no extra subscriber API.

The factory is called once per group and returns a SharedStreamProducer that owns the broker connection for the lifetime of one poll. The manager drives the producer’s open_stream, which yields (raw_event, broker_payload) tuples, and broadcasts each raw event to every subscriber’s queue. A subscriber resolves an event once it has moved past it (pulled the next raw event, or unsubscribed) and every TriggerEvent it derived from the event has been confirmed persisted to the metadata database; an event the filter skips without yielding anything resolves cleanly as soon as the filter loops back for the next raw event. Force-failure resolves the subscriber immediately. When every subscriber that was online at broadcast time has resolved an event, the event is fully resolved and eligible for the broker advance: the manager calls await producer.advance(batch) with one lane’s contiguous prefix of fully resolved events — this is where the producer commits / deletes / acks on the broker. Each AdvanceItem in the batch carries the event’s broker_payload and an AdvanceOutcome with per-event counts of how the subscribers resolved. Advances are dispatched by a single pump task: the producer’s get_advance_lane() assigns each event to a lane; within a lane, batch items arrive in fan-out order and the next batch is awaited only after the call for the previous one returned; if it raises, the whole group is terminated and the broker redelivers from the never-committed offset; lanes do not wait for one another, but at any moment at most one advance call is awaited globally. The default lane assignment (every event in the same lane) preserves the original single global order. When the poll ends, the manager awaits producer.aclose() once, best-effort.

Subscriber reject: instead of yielding a trigger event, a subscriber’s filter can call reject_shared_stream_event() to terminally refuse the raw event it is currently processing. A reject resolves immediately (there is nothing to persist) and is counted in AdvanceOutcome.rejected, separate from an involuntary failed: the producer is expected to dead-letter / nack rejects but redeliver failures. The framework only reports the counts — the per-broker decision lives in advance.

Snapshot-at-fan-out: the set of subscribers that must resolve an event is frozen at broadcast time. A subscriber that joins after the event was broadcast is not added to that event’s pending set.

Persistence-gated advance: the trigger events a subscriber derives from a raw event are assigned sequence numbers as they leave the runner; the supervisor confirms each one after the event is stored in the metadata database, and the confirmation reaches the runner on the next state sync (typically one sync round, a second or two — well within the ack timeout). The subscriber’s resolution only completes once all of its sequence numbers for the event are confirmed. If a confirmation never arrives — the persist failed, or the triggerer crashed in between — the ack timeout fails the event, the producer does not commit, and the broker redelivers. The binding between a trigger event and the raw event it came from relies on the filter yielding each derived event before pulling the next raw event from the shared stream (the natural way to write a filter).

Per-event ack timeout: a background task scans outstanding events. Any subscriber that has not finished processing an event within ack_timeout seconds — still on the event, or its derived trigger events not yet confirmed persisted — is force-failed via the existing _PollFailure path (exception type AckTimeout). Other subscribers are not affected; once they resolve, the producer advances normally.

Triggerer restart: resolution state is in-memory only. After a triggerer restart, the broker will redeliver events that were never advanced. Subscribers must therefore be idempotent. The same applies when a group stops while events are still awaiting persist confirmation (for example, the last subscriber unsubscribes right after producing an event): the pending advances are abandoned and the broker redelivers those events.

``shared_stream_subscriber_queue_size`` in ack mode: the bound is still “unprocessed raw events per subscriber”. The manager does not wait for outstanding resolutions before pulling the next event from the producer’s stream; back-pressure is purely queue-bound — a subscriber whose queue is full is force-failed via _SubscriberOverflow. The queue mainly protects against burst delivery before a subscriber’s filter has had a chance to run. Broker advances, however, are dispatched by a single pump task in per-lane order: while a lane’s head event still has pending subscribers, every later event in that same lane waits — resolved events behind the head accumulate into the next batch (the ack timeout is the backstop that bounds this per-lane head-of-line wait).

Triggers that do not override create_shared_stream_producer run the fast path: no event IDs and no resolution bookkeeping. Subscribers see the exact same stream shape in both modes, so opting a trigger into ack mode never changes its filter code (backward-compatible).

Lifecycle invariants

The manager and groups cooperate to keep a single invariant true at every await-point:

A key is present in SharedStreamManager._groups only while its group’s poll task is alive and accepting new subscribers.

This rules out the late-subscriber races that the naive design admits — a new subscriber for a key whose poll has died or is in the middle of being torn down always falls through to “create a fresh group” rather than attaching to a dead one and hanging on an empty queue. The invariant is maintained synchronously:

  • When _poll ends for any reason other than cancellation (the upstream iterator raised, or returned), the group’s finally block evicts the key from _groups and broadcasts a terminal sentinel to current subscribers — all without yielding, so no other coroutine can interleave.

  • When the last subscriber leaves, SharedStreamManager.unsubscribe() evicts the key from _groups before awaiting group.stop(), so a new subscriber arriving while we wait for cancellation creates a fresh group.

  • SharedStreamManager.stop_all() clears _groups in one synchronous step before awaiting any stop, applying the same rule to shutdown.

Exceptions

AckTimeout

Raised in a subscriber that did not finish processing an event within the per-event timeout.

Classes

AdvanceOutcome

Per-event resolution counts handed to SharedStreamProducer.advance().

AdvanceItem

One resolved event in the batch handed to SharedStreamProducer.advance().

SharedStreamProducer

Broker-side half of a shared stream running in ack mode.

SharedStreamManager

Coordinate BaseEventTrigger instances that share underlying I/O.

Functions

reject_shared_stream_event()

Reject the shared-stream raw event the calling filter is currently processing.

Module Contents

airflow.triggers.shared_stream.reject_shared_stream_event()[source]

Reject the shared-stream raw event the calling filter is currently processing.

Call this from inside filter_shared_stream() when, instead of yielding a TriggerEvent, you want the broker to treat the raw event as terminally refused — dead-letter it (Azure Service Bus), nack it (Pub/Sub), and so on. A reject is distinct from an involuntary failure: it counts toward AdvanceOutcome.rejected rather than failed, so a producer can dead-letter rejects while still redelivering failures.

The reject resolves the event immediately; there is no derived trigger event to persist first.

Only meaningful while a raw event’s binding window is open — that is, inside filter_shared_stream of an ack-mode stream, right after the filter received a raw event. Called anywhere else (the fast path, standalone run(), or between two raw events) it logs a warning and is a no-op, because there is no broker advance to influence.

Invariant: this relies on the filter running in the same asyncio task as the one driving the raw-event binding window (a task-local context variable carries the open window). Driving the filter iteration from a different task (for example via asyncio.to_thread() or a freshly created task) would make the open window invisible here and turn every reject into a no-op.

exception airflow.triggers.shared_stream.AckTimeout[source]

Bases: Exception

Raised in a subscriber that did not finish processing an event within the per-event timeout.

The subscriber is either still on the event, or some of its derived trigger events were never confirmed persisted. Treated the same as _SubscriberOverflow — the subscriber’s trigger fails through the standard trigger-failure path. Other subscribers in the same group are unaffected; once they resolve, the producer advances normally.

class airflow.triggers.shared_stream.AdvanceOutcome[source]

Per-event resolution counts handed to SharedStreamProducer.advance().

Every subscriber that was online when the event was broadcast is counted in exactly one field:

  • acked — the subscriber moved past the event (pulled the next raw event, or unsubscribed while the event was outstanding) and every trigger event it derived from the event was confirmed persisted.

  • failed — force-failed by the manager (ack timeout — including a persist confirmation that never arrived — or queue overflow).

  • rejected — the subscriber actively refused the event by calling reject_shared_stream_event() from its filter. Terminal: the broker should dead-letter / nack such events rather than redeliver them, which is what distinguishes a reject from an involuntary failed (where redelivery is the right response).

A producer reconciles these per-broker in SharedStreamProducer.advance() — the framework only reports the counts. For example a Service Bus producer dead-letters when rejected is non-zero, abandons (redelivers) when only failed is non-zero, and completes when every subscriber accepted the event; a Pub/Sub producer nack s on a reject and ack s otherwise.

An event broadcast while no subscribers were online carries all-zero counts and is not clean — nothing was accepted, so there is nothing the producer should commit on.

acked: int[source]
failed: int[source]
rejected: int = 0[source]
property is_clean: bool[source]

Whether every subscriber accepted the event.

No active reject, no involuntary failure, and at least one subscriber acknowledged. A zero-subscriber broadcast (all-zero counts) is not clean.

class airflow.triggers.shared_stream.AdvanceItem[source]

Bases: NamedTuple

One resolved event in the batch handed to SharedStreamProducer.advance().

broker_payload: Any[source]

The opaque object the producer yielded with the raw event from open_stream.

outcome: AdvanceOutcome[source]

How the subscribers resolved the event.

class airflow.triggers.shared_stream.SharedStreamProducer[source]

Bases: abc.ABC

Broker-side half of a shared stream running in ack mode.

Returned by create_shared_stream_producer(); one instance owns the broker connection for the lifetime of one poll.

abstract open_stream()[source]

Open the broker connection and yield (raw_event, broker_payload) pairs.

Implement as an async generator. broker_payload is any opaque object this producer needs later to advance the broker (e.g. a Kafka offset, SQS receipt handle, Pub/Sub ack ID). Called once per poll; open the broker connection here, not in the factory or __init__.

Implementations are expected to run for the lifetime of the group — returning without raising is treated as an error and propagated to every subscriber, so the contract is “yield forever, or raise”.

abstract advance(batch)[source]
Async:

Advance the broker for one lane’s batch of fully resolved events.

batch is never empty. Its items are in fan-out order and form a contiguous resolved prefix of one lane — events for which get_advance_lane() returned equal values. Within a lane, batches arrive strictly in order: the next batch is awaited only after the call for the previous one returned. Across lanes the relative order is not guaranteed, but at any moment at most one advance call is awaited globally. This makes cumulative schemes such as a Kafka offset commit safe within a lane — committing the offset of the batch’s last item covers the whole batch. Each item’s AdvanceOutcome carries the per-event resolution counts; use them to decide whether to commit, skip, or trigger a broker-side redeliver.

If this method raises, the error is logged and the whole shared-stream group is terminated: every subscriber receives a failure sentinel and the broker redelivers from the never-committed offset. Terminating is the safe default; finer-grained per-lane retry would require tracking which offsets are safe to recommit.

get_advance_lane(broker_payload)[source]

Return the advance lane for one event.

Events whose lane values compare equal are advanced strictly in fan-out order relative to each other; events in different lanes do not wait for one another. At any moment at most one advance call is awaited globally, regardless of how many lanes exist. The default implementation puts every event in the same lane, which preserves the single global fan-out order.

Called synchronously once per event before fan-out, so it must be cheap (O(1)) and must not block. If it raises, the whole poll is treated as failed: the group terminates and the error propagates to every subscriber.

Example: a Kafka producer can return (topic, partition) here — a cumulative offset commit only needs ordering within a partition, so a slow partition no longer delays commits on the others.

async aclose()[source]

Release broker resources; called once when the poll ends.

Best-effort: a raised exception is logged and not propagated. The default implementation does nothing.

class airflow.triggers.shared_stream.SharedStreamManager(*, log=None, max_subscriber_queue=DEFAULT_SUBSCRIBER_QUEUE_MAX, ack_timeout=DEFAULT_ACK_TIMEOUT, _now=time.monotonic)[source]

Coordinate BaseEventTrigger instances that share underlying I/O.

The manager owns one _SharedStreamGroup per distinct shared_stream_key. Each group runs a single async task that drives open_shared_stream; subscribers receive raw events through their own asyncio queues and convert them to TriggerEvent instances independently.

The manager is single-event-loop and not thread-safe. The triggerer’s TriggerRunner is its sole owner.

log[source]
subscribe(*, trigger_id, trigger, key)[source]

Subscribe a trigger to the shared stream identified by key.

On first subscriber for a given key the group is created and the underlying poll loop is started. Returns an async iterator of raw events the trigger should feed into filter_shared_stream.

bind_pending_event(*, trigger_id, key)[source]

Bind a trigger event the subscriber just emitted to its shared-stream ack state.

Returns the persist-confirmation seq the runner must report through confirm_persisted() once the event is stored, or None when there is nothing to gate — the group is gone, the stream is not in ack mode, or the subscriber has no open binding window. Synchronous and O(1); call it between taking the event off the trigger and queueing it outbound, with no await in between.

confirm_persisted(seqs)[source]

Record that the trigger events behind seqs were persisted.

Broadcast to every live group; each group resolves the sequence numbers it owns and ignores the rest. Sequence numbers whose binding already resolved (timeout, overflow) or whose group has stopped are ignored.

async unsubscribe(trigger_id, key)[source]

Remove a subscriber.

When the last subscriber for key leaves, the key is evicted from _groups synchronously and the underlying poll task is cancelled. Eviction happens before awaiting stop() so that a new subscriber arriving while we wait for cancellation builds a fresh group rather than attaching to the dying one.

async stop_all()[source]

Cancel every active group; used during triggerer shutdown.

Was this entry helpful?