Source code for airflow.providers.common.ai.utils.hitl_review

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Shared data models, exceptions, and XCom key constants for HITL Review.

Used by both the API-server-side plugin (``plugins.hitl_review``) and the
worker-side operator mixin (``mixins.hitl_review``).  Depends only on
``pydantic`` and the standard library.

**Storage**: all session state is persisted as XCom entries on the running
task instance.  See the *XCom key constants* below for the key naming scheme.
"""

from __future__ import annotations

from datetime import datetime, timezone
from enum import Enum
from typing import Any, Literal

from pydantic import BaseModel, Field

[docs] HumanActionType = Literal["approve", "reject", "changes_requested"]
""" These xcom keys are reserved for agentic operator with HITL feedback loop. """ _XCOM_PREFIX = "airflow_hitl_review_"
[docs] XCOM_AGENT_SESSION = f"{_XCOM_PREFIX}agent_session"
"""Session metadata written by the **worker**. Value: ``{"status": "...", "iteration": N, "max_iterations": M, "prompt": "...", "current_output": "..."}``. """
[docs] XCOM_HUMAN_ACTION = f"{_XCOM_PREFIX}human_action"
"""Human action command written by the **plugin**. Value: ``{"action": "approve"|"reject"|"changes_requested", "feedback": "...", "iteration": N}``. """
[docs] XCOM_AGENT_OUTPUT_PREFIX = f"{_XCOM_PREFIX}agent_output_"
"""Per-iteration AI output (append-only, written by worker). Actual key: ``airflow_hitl_review_agent_output_1``, ``_2``, ... """
[docs] XCOM_HUMAN_FEEDBACK_PREFIX = f"{_XCOM_PREFIX}human_feedback_"
"""Per-iteration human feedback (append-only, written by plugin). Actual key: ``airflow_hitl_review_human_feedback_1``, ``_2``, ... """
[docs] class SessionStatus(str, Enum): """Lifecycle states of a HITL review session."""
[docs] PENDING_REVIEW = "pending_review"
[docs] CHANGES_REQUESTED = "changes_requested"
[docs] APPROVED = "approved"
[docs] REJECTED = "rejected"
[docs] MAX_ITERATIONS_EXCEEDED = "max_iterations_exceeded"
[docs] TIMEOUT_EXCEEDED = "timeout_exceeded"
[docs] class ConversationEntry(BaseModel): """Single turn in the feedback conversation."""
[docs] role: Literal["assistant", "human"]
[docs] content: str
[docs] iteration: int
[docs] timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
[docs] class AgentSessionData(BaseModel): """ Session metadata stored in the ``airflow_hitl_review_agent_session`` XCom. Written by the **worker** only. """
[docs] status: SessionStatus = SessionStatus.PENDING_REVIEW
[docs] iteration: int = 1
[docs] max_iterations: int = 5
[docs] prompt: str = ""
[docs] current_output: str = ""
[docs] class HumanActionData(BaseModel): """ Human action payload stored in the ``airflow_hitl_review_human_action`` XCom. Written by the **plugin** only. Invalid ``action`` values (e.g. typos like "approved") fail validation at parse time instead of causing the worker to loop indefinitely. """
[docs] action: HumanActionType
[docs] feedback: str = ""
[docs] iteration: int = 1
[docs] class HumanFeedbackRequest(BaseModel): """Payload for the ``POST .../feedback`` endpoint."""
[docs] feedback: str
[docs] class HITLReviewResponse(BaseModel): """API response for a HITL review session (combined from multiple XCom entries)."""
[docs] dag_id: str
[docs] run_id: str
[docs] task_id: str
[docs] status: SessionStatus
[docs] iteration: int
[docs] max_iterations: int = 5
[docs] prompt: str
[docs] current_output: str
[docs] conversation: list[ConversationEntry] = []
[docs] task_completed: bool = False
@staticmethod
[docs] def from_xcom( dag_id: str, run_id: str, task_id: str, session: AgentSessionData, outputs: dict[int, Any], human_entries: dict[int, Any], *, task_completed: bool = False, ) -> HITLReviewResponse: """Combine a response from separate XCom values.""" conversation: list[ConversationEntry] = [] for i in range(1, session.iteration + 1): if i in outputs: conversation.append(ConversationEntry(role="assistant", content=str(outputs[i]), iteration=i)) if i in human_entries: conversation.append( ConversationEntry(role="human", content=str(human_entries[i]), iteration=i) ) return HITLReviewResponse( dag_id=dag_id, run_id=run_id, task_id=task_id, status=session.status, iteration=session.iteration, max_iterations=session.max_iterations, prompt=session.prompt, current_output=session.current_output, conversation=conversation, task_completed=task_completed, )

Was this entry helpful?