Producer-side ack channel
For upstreams where the producer must advance (commit, delete, or ack) only
after all subscribers have processed an event, override
create_shared_stream_producer()
to return a SharedStreamProducer.
When this factory is overridden, the manager enters ack mode. The
subscriber side does not change: filter_shared_stream receives raw
events exactly as on the fast path, so the same filter code works in both
modes — the framework infers when the broker may advance from each
subscriber’s consumption progress and from the persistence of the trigger
events it derived.
The manager calls create_shared_stream_producer(kwargs) once per
shared-stream group. The returned producer owns the broker connection for
the lifetime of one poll; open the connection lazily inside
open_stream, not in the factory.
The producer’s open_stream yields (event, broker_payload) tuples,
where broker_payload is whatever the producer needs later (e.g. an
SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).
Each subscriber’s filter_shared_stream receives raw events exactly
as in the fast path. A subscriber resolves an event once it has moved
past it (pulled the next raw event, or unsubscribed) and every
TriggerEvent it derived from the event has been persisted to the
metadata database.
Once every subscriber in the fan-out set has resolved an event (by
moving past it, unsubscribing, timing out, or overflowing its queue),
the manager calls await producer.advance(batch) with the contiguous
prefix of fully resolved events in the event’s lane — commit the
offsets, delete the SQS messages, etc. Each item in the batch is an
AdvanceItem carrying the
event’s broker_payload and an
AdvanceOutcome with per-event
counts of how the subscribers resolved.
Rejecting an event: a subscriber’s filter can actively refuse a raw
event instead of yielding a trigger event from it, by calling
reject_shared_stream_event() while
processing that event. This is distinct from an involuntary failure. A
failed count means a subscriber did not finish in time (ack timeout) or
fell behind (queue overflow) — the right response is usually redelivery. A
rejected count means a subscriber decided the event must not produce a
trigger event and should be terminally discarded — the right response is to
dead-letter or nack it, not redeliver. The
AdvanceOutcome reports both
counts separately so the producer can apply the right per-broker action in
advance:
Azure Service Bus: dead-letter the message when rejected is non-zero,
abandon it (so the broker redelivers) when only failed is non-zero,
and complete it when every subscriber accepted the event.
Pub/Sub: nack the message on a reject, otherwise ack it.
The framework only reports the counts — it never dead-letters, nack s,
or redelivers on its own; that broker-specific decision lives entirely in
the producer’s advance.
reject_shared_stream_event is meaningful only while the filter is
processing a raw event in ack mode (the binding window of that event is
open). Called on the fast path, from a standalone run(), or between two
raw events, it logs a warning and does nothing, because there is no broker
advance to influence. Because it resolves the event immediately, there is
nothing to persist and the reject does not wait on the persistence gate.
is_clean is True only when every subscriber that was online at
broadcast accepted the event: no rejects, no failures, and at least one
subscriber acknowledged it. A single reject or failure makes it False, and so
does a zero-subscriber broadcast (all-zero counts) — nothing was accepted,
so there is nothing the producer should commit on.
Example — reject inside a filter:
from airflow.triggers.shared_stream import reject_shared_stream_event
async def filter_shared_stream(self, shared_stream):
async for raw in shared_stream:
if raw.get("malformed"):
# Never produce a trigger event from this; have the broker
# dead-letter it rather than redeliver it.
reject_shared_stream_event()
continue
yield TriggerEvent(raw)
Ordering guarantee: by default every event belongs to the same lane.
The items of a batch are in event order and form their lane’s contiguous
resolved prefix; within a lane, batches arrive strictly in order — the
next advance is awaited only after the previous call returned. If
advance raises, the error is logged and the whole shared-stream
group is terminated: every subscriber receives a failure sentinel and
the broker redelivers from the never-committed offset (a loud, safe
failure rather than a silent data skip). Recovering more gracefully from
transient failures would require the producer to track which offsets are
safe to recommit. A producer can override get_advance_lane to
narrow that ordering to within a lane: events whose lane values compare
equal are batched and advanced in event order relative to each other,
while events in different lanes do not wait for one another. Either way,
at most one advance call is awaited at a time, and cumulative schemes
such as a Kafka offset commit only need to commit the batch’s last item.
When the poll ends, the manager calls await producer.aclose() once,
best-effort.
Example — SQS-like producer:
from collections.abc import AsyncIterator, Sequence
from typing import Any
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
from airflow.triggers.shared_stream import AdvanceItem, SharedStreamProducer
class SqsSharedStreamProducer(SharedStreamProducer):
def __init__(self, queue_url: str):
self.queue_url = queue_url
self.client = None
async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
# Open the connection here, not in the trigger's factory.
self.client = await create_sqs_client()
while True:
messages = await poll_sqs(self.client, self.queue_url)
for msg in messages:
yield msg["Body"], msg["ReceiptHandle"]
async def advance(self, batch: Sequence[AdvanceItem]) -> None:
# Called with one lane's batch of fully resolved messages.
for receipt_handle, outcome in batch:
if outcome.is_clean:
await delete_sqs_message(self.client, self.queue_url, receipt_handle)
# Otherwise leave the message for the visibility timeout to redeliver.
async def aclose(self) -> None:
if self.client is not None:
await self.client.close()
class SqsSharedTrigger(BaseEventTrigger):
def __init__(self, *, queue_url: str, region: str | None = None):
super().__init__()
self.queue_url = queue_url
self.region = region
def serialize(self):
return (
f"{type(self).__module__}.{type(self).__qualname__}",
{"queue_url": self.queue_url, "region": self.region},
)
def shared_stream_key(self):
return ("sqs", self.queue_url)
@classmethod
def create_shared_stream_producer(cls, kwargs) -> SqsSharedStreamProducer:
return SqsSharedStreamProducer(kwargs["queue_url"])
async def filter_shared_stream(self, shared_stream):
async for raw in shared_stream:
if self.region is None or raw.get("region") == self.region:
yield TriggerEvent(raw)
async def run(self):
yield TriggerEvent({})
Example — Kafka cumulative commit across partitions. A Kafka commit
acknowledges every offset up to the committed one within a partition, so
it is only safe if no later event from the same partition can be committed
while an earlier one is still pending — events on other partitions do not
matter. Returning (topic, partition) from get_advance_lane narrows
the ordering guarantee to exactly that granularity: each partition’s
commits stay in order, and a slow partition no longer delays commits on
the other partitions:
class KafkaSharedStreamProducer(SharedStreamProducer):
def __init__(self, topics: list[str]):
self.topics = topics
self.consumer = None
async def open_stream(self):
# Auto-commit must be off (the Kafka default is on), or the
# consumer commits on its own schedule and the ack channel
# no longer controls what the broker considers delivered.
self.consumer = await create_kafka_consumer(self.topics, enable_auto_commit=False)
async for message in self.consumer:
yield message.value, (message.topic, message.partition, message.offset)
def get_advance_lane(self, broker_payload):
topic, partition, _offset = broker_payload
return topic, partition
async def advance(self, batch):
# The batch is one lane's — here, one partition's — contiguous
# resolved prefix, in event order, so committing the offset of
# its last item covers the whole batch and can never skip past
# an event that is still pending.
#
# If this method raises, the whole shared-stream group is
# terminated and the broker redelivers from the last committed
# offset — no silent data skip.
#
# Inspect each item's outcome before committing: non-clean events
# (rejects, failures, or a zero-subscriber broadcast) should go to
# a dead-letter queue rather than be committed as delivered.
to_dlq = []
for broker_payload, outcome in batch:
if not outcome.is_clean:
to_dlq.append(broker_payload)
if to_dlq:
handle_dlq(to_dlq)
topic, partition, offset = batch[-1].broker_payload
await self.consumer.commit(topic, partition, offset + 1)
async def aclose(self):
if self.consumer is not None:
await self.consumer.stop()
The one constraint on filter authors is the binding between raw events and
the trigger events derived from them: yield every TriggerEvent derived
from a raw event before pulling the next raw event from the shared
stream — which is what a straightforward filter loop does anyway.
Snapshot-at-fan-out: the set of subscribers that must resolve a given
event is frozen at the moment the event is broadcast. A subscriber that
joins after the event was dispatched is not added to that event’s pending
set.
Per-event ack timeout: if a subscriber has not finished processing an
event within the ack timeout (default 5 minutes, configurable via the
[triggerer] shared_stream_ack_timeout config option) — it is still on
the event, or some of its derived trigger events were never confirmed
persisted — the manager force-fails that subscriber’s trigger. Other
subscribers are not affected; once they resolve, the producer advances
normally. The ack timeout is a manager-level safety net and does not
replace any native broker session or visibility timeout.
From the subscriber’s perspective the force-fail surfaces as an AckTimeout
(importable from airflow.triggers.shared_stream) raised by the
shared_stream iterator inside filter_shared_stream. Letting it propagate
is fine — the trigger fails through the standard trigger-failure path; catch it
only if the subscriber needs to run cleanup before failing.
Triggerer restart: resolution state lives in memory only. After a
triggerer restart, the broker redelivers messages that were never
advanced. Subscribers must therefore be idempotent.
When multiple triggers sharing the same key restart together, the first
to re-subscribe creates a fresh group and polling starts immediately.
Triggers that re-subscribe later join as late subscribers (outside the
snapshot of any already-broadcast event), so they may miss events
committed in the window between the first subscription and their own.
Set [triggerer] shared_stream_cohort_grace_period to a positive
number of seconds (e.g. 2.0) to delay the start of polling after a
new group is created, giving concurrent re-subscriptions time to join
before any event is broadcast. This is a best-effort window — it reduces
but does not eliminate the risk of a slow-rejoining trigger missing
events.
Durability: the broker advance is gated on persistence. A subscriber’s
resolution completes only after every TriggerEvent it derived from the
event has been stored in the metadata database; the confirmation reaches
the trigger runner on the next state sync, typically within a second or
two. If the confirmation never arrives — the triggerer crashed, or the
event could not be persisted — the ack timeout fails the event, the
producer does not commit, and the broker redelivers. A failure can
therefore cause duplicate delivery but never a lost event; idempotent
subscribers absorb the duplicates. The same trade-off applies when a
group stops while events are still awaiting confirmation (for example,
the last subscriber unsubscribes right after producing an event): the
pending advances are abandoned and the broker redelivers those events.
``shared_stream_subscriber_queue_size`` in ack mode: the config bound
still governs unprocessed raw events per subscriber. The manager does
not wait for outstanding resolutions before pulling the next upstream
event; back-pressure is queue-bound — a subscriber whose queue is full is
force-failed. The queue primarily guards against burst delivery before a
subscriber’s filter runs. Broker advances, by contrast, are dispatched in
order within each lane: an event with pending subscribers delays the
advance of every later event in the same lane — resolved events behind it
accumulate into the next batch — with the ack timeout bounding that wait.