airflow.providers.common.ai.utils.hitl_review

Shared data models, exceptions, and XCom key constants for HITL Review.

Used by both the API-server-side plugin (plugins.hitl_review) and the worker-side operator mixin (mixins.hitl_review). Depends only on pydantic and the standard library.

Storage: all session state is persisted as XCom entries on the running task instance. See the XCom key constants below for the key naming scheme.

Attributes

HumanActionType

These xcom keys are reserved for agentic operator with HITL feedback loop.

XCOM_AGENT_SESSION

Session metadata written by the worker.

XCOM_HUMAN_ACTION

Human action command written by the plugin.

XCOM_AGENT_OUTPUT_PREFIX

Per-iteration AI output (append-only, written by worker).

XCOM_HUMAN_FEEDBACK_PREFIX

Per-iteration human feedback (append-only, written by plugin).

Classes

SessionStatus

Lifecycle states of a HITL review session.

ConversationEntry

Single turn in the feedback conversation.

AgentSessionData

Session metadata stored in the airflow_hitl_review_agent_session XCom.

HumanActionData

Human action payload stored in the airflow_hitl_review_human_action XCom.

HumanFeedbackRequest

Payload for the POST .../feedback endpoint.

HITLReviewResponse

API response for a HITL review session (combined from multiple XCom entries).

Module Contents

airflow.providers.common.ai.utils.hitl_review.HumanActionType[source]

These xcom keys are reserved for agentic operator with HITL feedback loop.

airflow.providers.common.ai.utils.hitl_review.XCOM_AGENT_SESSION = 'airflow_hitl_review_agent_session'[source]

Session metadata written by the worker.

Value: {"status": "...", "iteration": N, "max_iterations": M, "prompt": "...", "current_output": "..."}.

airflow.providers.common.ai.utils.hitl_review.XCOM_HUMAN_ACTION = 'airflow_hitl_review_human_action'[source]

Human action command written by the plugin.

Value: {"action": "approve"|"reject"|"changes_requested", "feedback": "...", "iteration": N}.

airflow.providers.common.ai.utils.hitl_review.XCOM_AGENT_OUTPUT_PREFIX = 'airflow_hitl_review_agent_output_'[source]

Per-iteration AI output (append-only, written by worker).

Actual key: airflow_hitl_review_agent_output_1, _2, …

airflow.providers.common.ai.utils.hitl_review.XCOM_HUMAN_FEEDBACK_PREFIX = 'airflow_hitl_review_human_feedback_'[source]

Per-iteration human feedback (append-only, written by plugin).

Actual key: airflow_hitl_review_human_feedback_1, _2, …

class airflow.providers.common.ai.utils.hitl_review.SessionStatus[source]

Bases: str, enum.Enum

Lifecycle states of a HITL review session.

PENDING_REVIEW = 'pending_review'[source]
CHANGES_REQUESTED = 'changes_requested'[source]
APPROVED = 'approved'[source]
REJECTED = 'rejected'[source]
MAX_ITERATIONS_EXCEEDED = 'max_iterations_exceeded'[source]
TIMEOUT_EXCEEDED = 'timeout_exceeded'[source]
class airflow.providers.common.ai.utils.hitl_review.ConversationEntry(/, **data)[source]

Bases: pydantic.BaseModel

Single turn in the feedback conversation.

role: Literal['assistant', 'human'][source]
content: str[source]
iteration: int[source]
timestamp: datetime.datetime = None[source]
class airflow.providers.common.ai.utils.hitl_review.AgentSessionData(/, **data)[source]

Bases: pydantic.BaseModel

Session metadata stored in the airflow_hitl_review_agent_session XCom.

Written by the worker only.

status: SessionStatus[source]
iteration: int = 1[source]
max_iterations: int = 5[source]
prompt: str = ''[source]
current_output: str = ''[source]
class airflow.providers.common.ai.utils.hitl_review.HumanActionData(/, **data)[source]

Bases: pydantic.BaseModel

Human action payload stored in the airflow_hitl_review_human_action XCom.

Written by the plugin only. Invalid action values (e.g. typos like “approved”) fail validation at parse time instead of causing the worker to loop indefinitely.

action: HumanActionType[source]
feedback: str = ''[source]
iteration: int = 1[source]
class airflow.providers.common.ai.utils.hitl_review.HumanFeedbackRequest(/, **data)[source]

Bases: pydantic.BaseModel

Payload for the POST .../feedback endpoint.

feedback: str[source]
class airflow.providers.common.ai.utils.hitl_review.HITLReviewResponse(/, **data)[source]

Bases: pydantic.BaseModel

API response for a HITL review session (combined from multiple XCom entries).

dag_id: str[source]
run_id: str[source]
task_id: str[source]
status: SessionStatus[source]
iteration: int[source]
max_iterations: int = 5[source]
prompt: str[source]
current_output: str[source]
conversation: list[ConversationEntry] = [][source]
task_completed: bool = False[source]
static from_xcom(dag_id, run_id, task_id, session, outputs, human_entries, *, task_completed=False)[source]

Combine a response from separate XCom values.

Was this entry helpful?