airflow.triggers.shared_stream
Shared underlying I/O between BaseEventTrigger instances in the triggerer.
When multiple triggers declare the same non-None
shared_stream_key(), the
triggerer routes them through SharedStreamManager so that one
underlying poll loop produces raw events that are broadcast to every
participating trigger. Each trigger then runs
filter_shared_stream() to
convert the broadcast into its own TriggerEvent
instances. Triggers that opt out (the default) keep their independent
run()-based poll loops untouched.
Producer-side ack channel
When a trigger overrides
create_shared_stream_producer(),
the manager switches to ack mode for that stream.
Subscribers receive the same stream of raw events in both modes: the same
filter_shared_stream code works unchanged on the fast path and in ack
mode. The difference is resolution bookkeeping and the broker advance,
both of which the manager derives from consumption progress — no extra
subscriber API.
The factory is called once per group and returns a
SharedStreamProducer that owns the broker connection for the
lifetime of one poll. The manager drives the producer’s open_stream,
which yields (raw_event, broker_payload) tuples, and broadcasts each
raw event to every subscriber’s queue. A subscriber resolves an event
once it has moved past it (pulled the next raw event, or unsubscribed)
and every TriggerEvent it derived from
the event has been confirmed persisted to the metadata database; an event
the filter skips without yielding anything resolves cleanly as soon as
the filter loops back for the next raw event. Force-failure resolves the
subscriber immediately. When every subscriber that was online at
broadcast time has resolved an event, the event is fully resolved and
eligible for the broker advance: the manager calls
await producer.advance(batch) with one lane’s contiguous prefix of
fully resolved events — this is where the producer commits / deletes /
acks on the broker. Each AdvanceItem in the batch carries the
event’s broker_payload and an AdvanceOutcome with per-event
counts of how the subscribers resolved. Advances are dispatched by a
single pump task: the producer’s
get_advance_lane() assigns each event to a
lane; within a lane, batch items arrive in fan-out order and the next
batch is awaited only after the call for the previous one returned; if it
raises, the whole group is terminated and the broker redelivers from the
never-committed offset; lanes do not wait for one another, but at any
moment at most one advance call is awaited globally. The default lane
assignment (every event in the same lane) preserves the original single
global order. When the poll ends, the manager awaits producer.aclose()
once, best-effort.
Subscriber reject: instead of yielding a trigger event, a subscriber’s
filter can call reject_shared_stream_event() to terminally refuse the
raw event it is currently processing. A reject resolves immediately (there
is nothing to persist) and is counted in AdvanceOutcome.rejected,
separate from an involuntary failed: the producer is expected to
dead-letter / nack rejects but redeliver failures. The framework only
reports the counts — the per-broker decision lives in advance.
Snapshot-at-fan-out: the set of subscribers that must resolve an event is frozen at broadcast time. A subscriber that joins after the event was broadcast is not added to that event’s pending set.
Persistence-gated advance: the trigger events a subscriber derives from a raw event are assigned sequence numbers as they leave the runner; the supervisor confirms each one after the event is stored in the metadata database, and the confirmation reaches the runner on the next state sync (typically one sync round, a second or two — well within the ack timeout). The subscriber’s resolution only completes once all of its sequence numbers for the event are confirmed. If a confirmation never arrives — the persist failed, or the triggerer crashed in between — the ack timeout fails the event, the producer does not commit, and the broker redelivers. The binding between a trigger event and the raw event it came from relies on the filter yielding each derived event before pulling the next raw event from the shared stream (the natural way to write a filter).
Per-event ack timeout: a background task scans outstanding events.
Any subscriber that has not finished processing an event within
ack_timeout seconds — still on the event, or its derived trigger
events not yet confirmed persisted — is force-failed via the existing
_PollFailure path (exception type AckTimeout). Other
subscribers are not affected; once they resolve, the producer advances
normally.
Triggerer restart: resolution state is in-memory only. After a triggerer restart, the broker will redeliver events that were never advanced. Subscribers must therefore be idempotent. The same applies when a group stops while events are still awaiting persist confirmation (for example, the last subscriber unsubscribes right after producing an event): the pending advances are abandoned and the broker redelivers those events.
``shared_stream_subscriber_queue_size`` in ack mode: the bound is
still “unprocessed raw events per subscriber”. The manager does not
wait for outstanding resolutions before pulling the next event from the
producer’s stream; back-pressure is purely queue-bound — a subscriber
whose queue is full is force-failed via _SubscriberOverflow. The
queue mainly protects against burst delivery before a subscriber’s filter
has had a chance to run. Broker advances, however, are dispatched by a
single pump task in per-lane order: while a lane’s head event still has
pending subscribers, every later event in that same lane waits — resolved
events behind the head accumulate into the next batch (the ack timeout is
the backstop that bounds this per-lane head-of-line wait).
Triggers that do not override create_shared_stream_producer run the
fast path: no event IDs and no resolution bookkeeping. Subscribers
see the exact same stream shape in both modes, so opting a trigger into
ack mode never changes its filter code (backward-compatible).
Lifecycle invariants
The manager and groups cooperate to keep a single invariant true at every
await-point:
A key is present in
SharedStreamManager._groupsonly while its group’s poll task is alive and accepting new subscribers.
This rules out the late-subscriber races that the naive design admits — a new subscriber for a key whose poll has died or is in the middle of being torn down always falls through to “create a fresh group” rather than attaching to a dead one and hanging on an empty queue. The invariant is maintained synchronously:
When
_pollends for any reason other than cancellation (the upstream iterator raised, or returned), the group’sfinallyblock evicts the key from_groupsand broadcasts a terminal sentinel to current subscribers — all without yielding, so no other coroutine can interleave.When the last subscriber leaves,
SharedStreamManager.unsubscribe()evicts the key from_groupsbefore awaitinggroup.stop(), so a new subscriber arriving while we wait for cancellation creates a fresh group.SharedStreamManager.stop_all()clears_groupsin one synchronous step before awaiting any stop, applying the same rule to shutdown.
Exceptions
Raised in a subscriber that did not finish processing an event within the per-event timeout. |
Classes
Per-event resolution counts handed to |
|
One resolved event in the batch handed to |
|
Broker-side half of a shared stream running in ack mode. |
|
Coordinate |
Functions
Reject the shared-stream raw event the calling filter is currently processing. |
Module Contents
- airflow.triggers.shared_stream.reject_shared_stream_event()[source]
Reject the shared-stream raw event the calling filter is currently processing.
Call this from inside
filter_shared_stream()when, instead of yielding aTriggerEvent, you want the broker to treat the raw event as terminally refused — dead-letter it (Azure Service Bus),nackit (Pub/Sub), and so on. A reject is distinct from an involuntary failure: it counts towardAdvanceOutcome.rejectedrather thanfailed, so a producer can dead-letter rejects while still redelivering failures.The reject resolves the event immediately; there is no derived trigger event to persist first.
Only meaningful while a raw event’s binding window is open — that is, inside
filter_shared_streamof an ack-mode stream, right after the filter received a raw event. Called anywhere else (the fast path, standalonerun(), or between two raw events) it logs a warning and is a no-op, because there is no broker advance to influence.Invariant: this relies on the filter running in the same asyncio task as the one driving the raw-event binding window (a task-local context variable carries the open window). Driving the filter iteration from a different task (for example via
asyncio.to_thread()or a freshly created task) would make the open window invisible here and turn every reject into a no-op.
- exception airflow.triggers.shared_stream.AckTimeout[source]
Bases:
ExceptionRaised in a subscriber that did not finish processing an event within the per-event timeout.
The subscriber is either still on the event, or some of its derived trigger events were never confirmed persisted. Treated the same as
_SubscriberOverflow— the subscriber’s trigger fails through the standard trigger-failure path. Other subscribers in the same group are unaffected; once they resolve, the producer advances normally.
- class airflow.triggers.shared_stream.AdvanceOutcome[source]
Per-event resolution counts handed to
SharedStreamProducer.advance().Every subscriber that was online when the event was broadcast is counted in exactly one field:
acked— the subscriber moved past the event (pulled the next raw event, or unsubscribed while the event was outstanding) and every trigger event it derived from the event was confirmed persisted.failed— force-failed by the manager (ack timeout — including a persist confirmation that never arrived — or queue overflow).rejected— the subscriber actively refused the event by callingreject_shared_stream_event()from its filter. Terminal: the broker should dead-letter /nacksuch events rather than redeliver them, which is what distinguishes a reject from an involuntaryfailed(where redelivery is the right response).
A producer reconciles these per-broker in
SharedStreamProducer.advance()— the framework only reports the counts. For example a Service Bus producer dead-letters whenrejectedis non-zero, abandons (redelivers) when onlyfailedis non-zero, and completes when every subscriber accepted the event; a Pub/Sub producernacks on a reject andacks otherwise.An event broadcast while no subscribers were online carries all-zero counts and is not clean — nothing was accepted, so there is nothing the producer should commit on.
- class airflow.triggers.shared_stream.AdvanceItem[source]
Bases:
NamedTupleOne resolved event in the batch handed to
SharedStreamProducer.advance().- broker_payload: Any[source]
The opaque object the producer yielded with the raw event from
open_stream.
- outcome: AdvanceOutcome[source]
How the subscribers resolved the event.
- class airflow.triggers.shared_stream.SharedStreamProducer[source]
Bases:
abc.ABCBroker-side half of a shared stream running in ack mode.
Returned by
create_shared_stream_producer(); one instance owns the broker connection for the lifetime of one poll.- abstract open_stream()[source]
Open the broker connection and yield
(raw_event, broker_payload)pairs.Implement as an async generator.
broker_payloadis any opaque object this producer needs later to advance the broker (e.g. a Kafka offset, SQS receipt handle, Pub/Sub ack ID). Called once per poll; open the broker connection here, not in the factory or__init__.Implementations are expected to run for the lifetime of the group — returning without raising is treated as an error and propagated to every subscriber, so the contract is “yield forever, or raise”.
- abstract advance(batch)[source]
- Async:
Advance the broker for one lane’s batch of fully resolved events.
batchis never empty. Its items are in fan-out order and form a contiguous resolved prefix of one lane — events for whichget_advance_lane()returned equal values. Within a lane, batches arrive strictly in order: the next batch is awaited only after the call for the previous one returned. Across lanes the relative order is not guaranteed, but at any moment at most oneadvancecall is awaited globally. This makes cumulative schemes such as a Kafka offset commit safe within a lane — committing the offset of the batch’s last item covers the whole batch. Each item’sAdvanceOutcomecarries the per-event resolution counts; use them to decide whether to commit, skip, or trigger a broker-side redeliver.If this method raises, the error is logged and the whole shared-stream group is terminated: every subscriber receives a failure sentinel and the broker redelivers from the never-committed offset. Terminating is the safe default; finer-grained per-lane retry would require tracking which offsets are safe to recommit.
- get_advance_lane(broker_payload)[source]
Return the advance lane for one event.
Events whose lane values compare equal are advanced strictly in fan-out order relative to each other; events in different lanes do not wait for one another. At any moment at most one
advancecall is awaited globally, regardless of how many lanes exist. The default implementation puts every event in the same lane, which preserves the single global fan-out order.Called synchronously once per event before fan-out, so it must be cheap (O(1)) and must not block. If it raises, the whole poll is treated as failed: the group terminates and the error propagates to every subscriber.
Example: a Kafka producer can return
(topic, partition)here — a cumulative offset commit only needs ordering within a partition, so a slow partition no longer delays commits on the others.
- async aclose()[source]
Release broker resources; called once when the poll ends.
Best-effort: a raised exception is logged and not propagated. The default implementation does nothing.
- class airflow.triggers.shared_stream.SharedStreamManager(*, log=None, max_subscriber_queue=DEFAULT_SUBSCRIBER_QUEUE_MAX, ack_timeout=DEFAULT_ACK_TIMEOUT, _now=time.monotonic)[source]
Coordinate
BaseEventTriggerinstances that share underlying I/O.The manager owns one
_SharedStreamGroupper distinctshared_stream_key. Each group runs a single async task that drivesopen_shared_stream; subscribers receive raw events through their own asyncio queues and convert them toTriggerEventinstances independently.The manager is single-event-loop and not thread-safe. The triggerer’s
TriggerRunneris its sole owner.- log[source]
- subscribe(*, trigger_id, trigger, key)[source]
Subscribe a trigger to the shared stream identified by
key.On first subscriber for a given key the group is created and the underlying poll loop is started. Returns an async iterator of raw events the trigger should feed into
filter_shared_stream.
- bind_pending_event(*, trigger_id, key)[source]
Bind a trigger event the subscriber just emitted to its shared-stream ack state.
Returns the persist-confirmation seq the runner must report through
confirm_persisted()once the event is stored, orNonewhen there is nothing to gate — the group is gone, the stream is not in ack mode, or the subscriber has no open binding window. Synchronous and O(1); call it between taking the event off the trigger and queueing it outbound, with noawaitin between.
- confirm_persisted(seqs)[source]
Record that the trigger events behind
seqswere persisted.Broadcast to every live group; each group resolves the sequence numbers it owns and ignores the rest. Sequence numbers whose binding already resolved (timeout, overflow) or whose group has stopped are ignored.
- async unsubscribe(trigger_id, key)[source]
Remove a subscriber.
When the last subscriber for
keyleaves, the key is evicted from_groupssynchronously and the underlying poll task is cancelled. Eviction happens before awaitingstop()so that a new subscriber arriving while we wait for cancellation builds a fresh group rather than attaching to the dying one.
- async stop_all()[source]
Cancel every active group; used during triggerer shutdown.
Was this entry helpful?