airflow.providers.common.ai.mixins.hitl_review¶
Attributes¶
Classes¶
Attributes that the host operator must provide. |
|
Mixin that drives an iterative HITL review loop inside |
Module Contents¶
- class airflow.providers.common.ai.mixins.hitl_review.HITLReviewProtocol[source]¶
Bases:
ProtocolAttributes that the host operator must provide.
- hitl_timeout: datetime.timedelta | None[source]¶
- class airflow.providers.common.ai.mixins.hitl_review.HITLReviewMixin[source]¶
Mixin that drives an iterative HITL review loop inside
execute().After the operator generates its first output, the mixin:
Pushes session metadata and the first agent output to XCom.
Polls the human action XCom (
airflow_hitl_review_human_action) athitl_poll_intervalseconds.When a human sets action to
changes_requested(via the plugin API), callsregenerate_with_feedback()and pushes the new agent output.When a human sets action to
approved, returns the output.When a human sets action to
rejected, raises a HITLRejectException
The loop stops after
hitl_timeoutormax_hitl_iterations.Max iterations:
iterationcounts outputs shown to the reviewer (1 = initial, 2 = first regeneration, etc.). When the reviewer requests changes atiteration >= max_hitl_iterations, the mixin raisesHITLMaxIterationsErrorwithout calling the LLM. Withmax_hitl_iterations=5, the reviewer can request changes at most 4 times (iterations 1–4); the fifth output is the last chance to approve or reject.All agent outputs and human feedback are persisted as iteration-keyed XCom entries (
airflow_hitl_review_agent_output_1,airflow_hitl_review_human_feedback_1, etc.) for full auditability.Operators using this mixin must set:
enable_hitl_review(bool)hitl_timeout(timedelta | None)hitl_poll_interval(float, seconds)prompt(str)
And must implement: meth:regenerate_with_feedback.
- run_hitl_review(context, output, *, message_history=None)[source]¶
Execute the full HITL review loop.
- Parameters:
context (airflow.sdk.Context) – Airflow task context.
output (Any) – Initial LLM output (str or BaseModel).
message_history (Any) – Provider-specific conversation state (e.g. pydantic-ai
list[ModelMessage]). Passed toregenerate_with_feedback()on each iteration.
- Returns:
The final approved output as a string.
- Raises:
HITLMaxIterationsError – When max iterations reached without approval.
HITLRejectException – When the reviewer rejects the output.
HITLTimeoutError – When hitl_timeout elapses with no response.
- Return type: