airflow.triggers.base

Attributes

log

Operator

DiscrimatedTriggerEvent

Classes

StartTriggerArgs

Arguments required for start task execution from triggerer.

BaseTrigger

Base class for all triggers.

BaseEventTrigger

Base class for triggers used to schedule DAGs based on external events.

TriggerEvent

Something that a trigger can fire when its conditions are met.

TaskSuccessEvent

Yield this event in order to end the task successfully.

TaskFailedEvent

Yield this event in order to end the task with failure.

TaskSkippedEvent

Yield this event in order to end the task with status 'skipped'.

Functions

trigger_event_discriminator(v)

Module Contents

airflow.triggers.base.log[source]
type airflow.triggers.base.Operator = MappedOperator | SerializedBaseOperator[source]
class airflow.triggers.base.StartTriggerArgs[source]

Arguments required for start task execution from triggerer.

trigger_cls: str[source]
next_method: str[source]
trigger_kwargs: dict[str, Any] | None = None[source]
next_kwargs: dict[str, Any] | None = None[source]
timeout: datetime.timedelta | None = None[source]
class airflow.triggers.base.BaseTrigger(**kwargs)[source]

Bases: abc.ABC, airflow.sdk.definitions._internal.templater.Templater, airflow.utils.log.logging_mixin.LoggingMixin

Base class for all triggers.

A trigger has two contexts it can exist in:

  • Inside an Operator, when it’s passed to TaskDeferred

  • Actively running in a trigger worker

We use the same class for both situations, and rely on all Trigger classes to be able to return the arguments (possible to encode with Airflow-JSON) that will let them be re-instantiated elsewhere.

supports_triggerer_queue: bool = True[source]
trigger_id = None[source]
template_fields = ()[source]
template_ext = ()[source]
task_id = None[source]
property task: Operator | None[source]
property task_instance: airflow.models.taskinstance.TaskInstance[source]
render_template_fields(context, jinja_env=None)[source]

Template all attributes listed in self.template_fields.

This mutates the attributes in-place and is irreversible.

Parameters:
  • context (airflow.sdk.definitions.context.Context) – Context dict with values to apply on content.

  • jinja_env (jinja2.Environment | None) – Jinja’s environment to use for rendering.

abstract serialize()[source]

Return the information needed to reconstruct this Trigger.

Returns:

Tuple of (class path, keyword arguments needed to re-instantiate).

Return type:

tuple[str, dict[str, Any]]

abstract run()[source]
Async:

Run the trigger in an asynchronous context.

The trigger should yield an Event whenever it wants to fire off an event, and return None if it is finished. Single-event triggers should thus yield and then immediately return.

If it yields, it is likely that it will be resumed very quickly, but it may not be (e.g. if the workload is being moved to another triggerer process, or a multi-event trigger was being used for a single-event task defer).

In either case, Trigger classes should assume they will be persisted, and then rely on cleanup() being called when they are no longer needed.

async cleanup()[source]

Cleanup the trigger.

Called when the trigger is no longer needed, and it’s being removed from the active triggerer process.

This method follows the async/await pattern to allow to run the cleanup in triggerer main event loop. Exceptions raised by the cleanup method are ignored, so if you would like to be able to debug them and be notified that cleanup method failed, you should wrap your code with try/except block and handle it appropriately (in async-compatible way).

async on_kill()[source]

Kill the external job managed by this trigger when the task is killed by a user.

Symmetric with BaseOperator.on_kill() on the worker side: override this method to stop external work (e.g. cancel a BigQuery job, terminate a Databricks run) when a user explicitly acts on the deferred task via mark-failed, clear, or mark-succeeded.

Distinction from cleanup():

  • cleanup() runs on every trigger exit — success, timeout, shutdown, and user kill. It is meant for releasing local resources held by this trigger instance. Putting external job cancellation in cleanup() would cancel in-flight work on every triggerer restart or rolling deploy.

  • on_kill() runs only when a user explicitly kills the task. It is the right place to cancel external work you do not want to keep running after the user performs an action.

This only fires when a user acts on the task. It does not fire on:

  • Triggerer shutdown or restart — the trigger is redistributed, not cancelled.

  • Triggerer redistribution to another triggerer process.

  • Trigger timeout — the trigger is killed, not cancelled by user.

  • Normal trigger completion (the trigger fired an event).

This method runs in the triggerer’s asyncio event loop, so it must be async-safe. Use await for any I/O; do not block the event loop.

Exceptions raised here are logged as warnings and do not propagate — they will not affect the task state or the triggerer. Implement your own retry or error handling inside this method if needed.

on_kill() is given a bounded time to complete. Implementations that call slow external APIs should apply their own timeouts rather than relying on the framework bound.

static repr(classpath, kwargs)[source]
__repr__()[source]
class airflow.triggers.base.BaseEventTrigger(**kwargs)[source]

Bases: BaseTrigger

Base class for triggers used to schedule DAGs based on external events.

BaseEventTrigger is a subclass of BaseTrigger designed to identify triggers compatible with event-driven scheduling.

Sharing an underlying I/O stream between triggers

A subclass that polls an upstream resource which can be safely consumed by multiple sibling triggers (e.g. a directory scan, a polling REST API) may opt in to having the triggerer run a single underlying poll loop and fan its raw events out to every trigger in the group. To do so, override:

  • shared_stream_key() — return a key identifying the shared stream (a tuple of strings is a common choice). Triggers whose key compares equal share one poll.

  • open_shared_stream() — open the shared stream and yield raw events. Called once per group in the triggerer.

  • filter_shared_stream() — convert the shared raw stream into this trigger’s own TriggerEvent instances, applying any per-trigger filtering or transformation. The stream has the same shape in ack mode as on the fast path, so the filter needs no mode-specific branches.

Triggers whose shared_stream_key returns None (the default) keep the existing behavior: each trigger gets its own poll loop via run().

Producer factory (ack mode)

Override create_shared_stream_producer() to enable ack mode. The manager calls this classmethod once per group; the returned SharedStreamProducer owns the broker connection, supplies events through its open_stream method (instead of open_shared_stream()), and is told to advance the broker — commit, delete, or ack the messages — through per-lane batches of events whose subscribers have all moved past them and whose derived trigger events were persisted to the metadata database. Subscribers receive raw events in both modes; ack mode changes only the bookkeeping behind the stream, never the filter code.

See airflow.triggers.shared_stream for the full ack-mode design, including snapshot-at-fan-out semantics, per-event timeout behavior, and triggerer-restart redeliver notes.

supports_triggerer_queue: bool = False[source]
static hash(classpath, kwargs)[source]

Return the hash of the trigger classpath and kwargs. This is used to uniquely identify a trigger.

We do not want to have this logic in BaseTrigger because, when used to defer tasks, 2 triggers can have the same classpath and kwargs. This is not true for event driven scheduling.

shared_stream_key()[source]

Identify an underlying I/O stream that can be shared with sibling triggers.

Two trigger instances whose shared_stream_key() return values compare equal (and are not None) will share a single underlying poll loop in the triggerer. Each instance still receives the events it cares about through its own filter_shared_stream() call.

Returning None (the default) opts out of sharing — the trigger runs its own independent poll loop via run(), exactly as today.

The return value is read once when run_trigger first starts this trigger; any change to the key afterwards has no effect on group membership for this instance. To share one poll across a set of sibling triggers, ensure every trigger in the set returns the same key from the outset.

The key must be deterministic — derive it from configuration fields, never from per-call values such as time.time() or uuid.uuid4(), because the comparison must be stable across the lifetime of the group.

Note

This method is called after render_template_fields(), so any templated attribute (for example a directory derived from a Jinja expression) is already resolved when the key is constructed. Two sibling triggers that render to the same path will correctly share their poll.

classmethod open_shared_stream(kwargs)[source]
Abstractmethod:

Async:

Open the shared underlying stream and yield raw events.

Called once per shared-stream group in the triggerer. kwargs is taken from one trigger in the group; implementations should rely only on fields whose values participate in shared_stream_key(), because other fields may differ between siblings in the group.

Implementations are expected to run for the lifetime of the group — the triggerer drives the iterator from a single task and cancels it when the last subscriber leaves. Returning without raising (e.g. because the upstream resource closed) is treated as an error and propagated to every subscriber, so the contract is “yield forever, or raise”. If an upstream EOF is a meaningful end-of-life condition, raise an exception that conveys it.

Declared as a classmethod (not staticmethod) so subclasses can compose via super().open_shared_stream(kwargs) and reach cls for class-scoped state or diagnostics.

Required only when shared_stream_key() returns non-None and create_shared_stream_producer() is not overridden. In ack mode the events come from the producer’s open_stream instead and this method is not called.

classmethod create_shared_stream_producer(kwargs)[source]
Abstractmethod:

Build the broker-side producer for this trigger’s shared stream (ack mode).

Overriding this classmethod opts the shared stream into ack mode. The manager calls it once per shared-stream group; the returned SharedStreamProducer owns the broker connection for the lifetime of one poll — it supplies events through open_stream, is told to advance the broker in per-lane batches of events whose subscribers have all moved past them and had their derived trigger events confirmed persisted, and is closed when the poll ends. Do not open the broker connection here; open it lazily inside the producer’s open_stream.

Triggers that do not override this method run the fast path: subscribers receive raw events from open_shared_stream() exactly as before. The stream shape is the same in both modes.

abstract filter_shared_stream(shared_stream)[source]
Async:

Transform the shared raw event stream into this trigger’s events.

The triggerer calls this method (instead of run()) when this trigger participates in a shared-stream group. Iterate shared_stream to receive raw events from the shared poll, and yield a TriggerEvent for each one that should fire this trigger.

In ack mode, to have the broker terminally drop a raw event rather than producing a trigger event from it, call reject_shared_stream_event() while handling that raw event instead of yielding.

Required only when shared_stream_key() returns non-None.

class airflow.triggers.base.TriggerEvent(payload, **kwargs)[source]

Bases: pydantic.BaseModel

Something that a trigger can fire when its conditions are met.

Events must have a uniquely identifying value that would be the same wherever the trigger is run; this is to ensure that if the same trigger is being run in two locations (for HA reasons) that we can deduplicate its events.

payload: Any = None[source]

The payload for the event to send back to the task.

Must be natively JSON-serializable, or registered with the airflow serialization code.

__repr__()[source]
class airflow.triggers.base.TaskSuccessEvent(*, xcoms=None, **kwargs)[source]

Bases: BaseTaskEndEvent

Yield this event in order to end the task successfully.

task_instance_state: airflow.utils.state.TaskInstanceState[source]
class airflow.triggers.base.TaskFailedEvent(*, xcoms=None, **kwargs)[source]

Bases: BaseTaskEndEvent

Yield this event in order to end the task with failure.

task_instance_state: airflow.utils.state.TaskInstanceState[source]
class airflow.triggers.base.TaskSkippedEvent(*, xcoms=None, **kwargs)[source]

Bases: BaseTaskEndEvent

Yield this event in order to end the task with status ‘skipped’.

task_instance_state: airflow.utils.state.TaskInstanceState[source]
airflow.triggers.base.trigger_event_discriminator(v)[source]
airflow.triggers.base.DiscrimatedTriggerEvent[source]

Was this entry helpful?