airflow.providers.common.ai.mixins.hitl_review

Attributes

log

Classes

HITLReviewProtocol

Attributes that the host operator must provide.

HITLReviewMixin

Mixin that drives an iterative HITL review loop inside execute().

Module Contents

airflow.providers.common.ai.mixins.hitl_review.log[source]
class airflow.providers.common.ai.mixins.hitl_review.HITLReviewProtocol[source]

Bases: Protocol

Attributes that the host operator must provide.

enable_hitl_review: bool[source]
max_hitl_iterations: int[source]
hitl_timeout: datetime.timedelta | None[source]
hitl_poll_interval: float[source]
prompt: str[source]
task_id: str[source]
log: Any[source]
class airflow.providers.common.ai.mixins.hitl_review.HITLReviewMixin[source]

Mixin that drives an iterative HITL review loop inside execute().

After the operator generates its first output, the mixin:

  1. Pushes session metadata and the first agent output to XCom.

  2. Polls the human action XCom (airflow_hitl_review_human_action) at hitl_poll_interval seconds.

  3. When a human sets action to changes_requested (via the plugin API), calls regenerate_with_feedback() and pushes the new agent output.

  4. When a human sets action to approved, returns the output.

  5. When a human sets action to rejected, raises a HITLRejectException

The loop stops after hitl_timeout or max_hitl_iterations.

Max iterations: iteration counts outputs shown to the reviewer (1 = initial, 2 = first regeneration, etc.). When the reviewer requests changes at iteration >= max_hitl_iterations, the mixin raises HITLMaxIterationsError without calling the LLM. With max_hitl_iterations=5, the reviewer can request changes at most 4 times (iterations 1–4); the fifth output is the last chance to approve or reject.

All agent outputs and human feedback are persisted as iteration-keyed XCom entries (airflow_hitl_review_agent_output_1, airflow_hitl_review_human_feedback_1, etc.) for full auditability.

Operators using this mixin must set:

  • enable_hitl_review (bool)

  • hitl_timeout (timedelta | None)

  • hitl_poll_interval (float, seconds)

  • prompt (str)

And must implement: meth:regenerate_with_feedback.

run_hitl_review(context, output, *, message_history=None)[source]

Execute the full HITL review loop.

Parameters:
  • context (airflow.sdk.Context) – Airflow task context.

  • output (Any) – Initial LLM output (str or BaseModel).

  • message_history (Any) – Provider-specific conversation state (e.g. pydantic-ai list[ModelMessage]). Passed to regenerate_with_feedback() on each iteration.

Returns:

The final approved output as a string.

Raises:
  • HITLMaxIterationsError – When max iterations reached without approval.

  • HITLRejectException – When the reviewer rejects the output.

  • HITLTimeoutError – When hitl_timeout elapses with no response.

Return type:

str

abstract regenerate_with_feedback(*, feedback, message_history)[source]

Re-run the LLM with reviewer feedback.

Parameters:
  • feedback (str) – The reviewer’s feedback text.

  • message_history (Any) – Provider-specific conversation state.

Returns:

(new_output_string, updated_message_history).

Return type:

tuple[str, Any]

Was this entry helpful?