#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from enum import Enum
[docs]class JobState(str, Enum):
"""All possible states that a Job can be in."""
[docs] RESTARTING = "restarting"
[docs] def __str__(self) -> str:
return self.value
[docs]class TerminalTIState(str, Enum):
"""States that a Task Instance can be in that indicate it has reached a terminal state."""
[docs] SKIPPED = "skipped" # A user can raise a AirflowSkipException from a task & it will be marked as skipped
[docs] def __str__(self) -> str:
return self.value
[docs]class TaskInstanceState(str, Enum):
"""
All possible states that a Task Instance can be in.
Note that None is also allowed, so always use this in a type hint with Optional.
"""
# The scheduler sets a TaskInstance state to None when it's created but not
# yet run, but we don't list it here since TaskInstance is a string enum.
# Use None instead if need this state.
# Set by the scheduler
[docs] REMOVED = TerminalTIState.REMOVED # Task vanished from DAG before it ran
[docs] SCHEDULED = IntermediateTIState.SCHEDULED # Task should run and will be handed to executor soon
# Set by the task instance itself
[docs] QUEUED = IntermediateTIState.QUEUED # Executor has enqueued the task
[docs] RUNNING = "running" # Task is executing
[docs] SUCCESS = TerminalTIState.SUCCESS # Task completed
[docs] RESTARTING = IntermediateTIState.RESTARTING # External request to restart (e.g. cleared when running)
[docs] FAILED = TerminalTIState.FAILED # Task errored out
[docs] UP_FOR_RETRY = IntermediateTIState.UP_FOR_RETRY # Task failed but has retries left
[docs] UP_FOR_RESCHEDULE = IntermediateTIState.UP_FOR_RESCHEDULE # A waiting `reschedule` sensor
[docs] UPSTREAM_FAILED = IntermediateTIState.UPSTREAM_FAILED # One or more upstream deps failed
[docs] SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism
[docs] DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger
[docs] def __str__(self) -> str:
return self.value
[docs]class DagRunState(str, Enum):
"""
All possible states that a DagRun can be in.
These are "shared" with TaskInstanceState in some parts of the code,
so please ensure that their values always match the ones with the
same name in TaskInstanceState.
"""
[docs] def __str__(self) -> str:
return self.value
[docs]class State:
"""Static class with task instance state constants and color methods to avoid hard-coding."""
# Backwards-compat constants for code that does not yet use the enum
# These first three are shared by DagState and TaskState
[docs] SUCCESS = TaskInstanceState.SUCCESS
[docs] RUNNING = TaskInstanceState.RUNNING
[docs] FAILED = TaskInstanceState.FAILED
# These are TaskState only
[docs] REMOVED = TaskInstanceState.REMOVED
[docs] SCHEDULED = TaskInstanceState.SCHEDULED
[docs] QUEUED = TaskInstanceState.QUEUED
[docs] RESTARTING = TaskInstanceState.RESTARTING
[docs] UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
[docs] UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
[docs] UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
[docs] SKIPPED = TaskInstanceState.SKIPPED
[docs] DEFERRED = TaskInstanceState.DEFERRED
[docs] finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
[docs] unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
[docs] task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState)
[docs] dag_states: tuple[DagRunState, ...] = (
DagRunState.QUEUED,
DagRunState.SUCCESS,
DagRunState.RUNNING,
DagRunState.FAILED,
)
[docs] state_color: dict[TaskInstanceState | None, str] = {
None: "lightblue",
TaskInstanceState.QUEUED: "gray",
TaskInstanceState.RUNNING: "lime",
TaskInstanceState.SUCCESS: "green",
TaskInstanceState.RESTARTING: "violet",
TaskInstanceState.FAILED: "red",
TaskInstanceState.UP_FOR_RETRY: "gold",
TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise",
TaskInstanceState.UPSTREAM_FAILED: "orange",
TaskInstanceState.SKIPPED: "hotpink",
TaskInstanceState.REMOVED: "lightgrey",
TaskInstanceState.SCHEDULED: "tan",
TaskInstanceState.DEFERRED: "mediumpurple",
}
@classmethod
[docs] def color(cls, state):
"""Return color for a state."""
return cls.state_color.get(state, "white")
@classmethod
[docs] def color_fg(cls, state):
"""Black&white colors for a state."""
color = cls.color(state)
if color in ["green", "red"]:
return "white"
return "black"
[docs] finished: frozenset[TaskInstanceState] = frozenset(
[
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.SKIPPED,
TaskInstanceState.UPSTREAM_FAILED,
TaskInstanceState.REMOVED,
]
)
"""
A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
further action.
Note that the attempt could have resulted in failure or have been
interrupted; or perhaps never run at all (skip, or upstream_failed) in any
case, it is no longer running.
"""
[docs] unfinished: frozenset[TaskInstanceState | None] = frozenset(
[
None,
TaskInstanceState.SCHEDULED,
TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
TaskInstanceState.DEFERRED,
]
)
"""
A list of states indicating that a task either has not completed
a run or has not even started.
"""
[docs] failed_states: frozenset[TaskInstanceState] = frozenset(
[TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
)
"""
A list of states indicating that a task or dag is a failed state.
"""
[docs] success_states: frozenset[TaskInstanceState] = frozenset(
[TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
)
"""
A list of states indicating that a task or dag is a success state.
"""
[docs] adoptable_states = frozenset(
[TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING]
)
"""
A list of states indicating that a task can be adopted or reset by a scheduler job
if it was queued by another scheduler job that is not running anymore.
"""