Human-in-the-Loop (HITL) Review For Agentic Operators¶
HITL Review adds an interactive feedback loop to agentic operators. After the LLM Agent produces an initial output, a human reviewer can approve, reject, or request changes through a chat UI. The operator blocks until a terminal action, or until a timeout is reached or max_iterations reached.
This document describes the architecture, workflow, API, XCom schema, and usage.
Overview¶
Components
HITL Review Plugin — FastAPI app mounted at
/hitl-reviewon the Airflow API server. Provides REST endpoints and a chat UI for reviewers.
Storage — All state is stored in XCom on the running task instance. The worker writes session and agent outputs; the plugin writes human feedback and actions. Both sides read and write the same keys.
Compatibility — Requires Airflow 3.1+. Uses the Task SDK execution model where workers communicate via the Execution API and XCom; the plugin runs on the API server and accesses the metadata database.
Important
Worker slot usage — Each HITL task holds a worker slot for the entire
review duration (until approve, reject, or timeout or max_iterations). The operator polls
XCom with time.sleep; it does not defer. With a 10-second poll interval
and review times of 30+ minutes, the worker is occupied for the duration.
Implementation: XCom polling vs deferral — This implementation uses XCom
polling with time.sleep rather than the deferral/Triggerer pattern (as used
by the standard provider’s HITLOperator). Deferral would free the worker
during review but adds cross-process coordination complexity: the agent state
(message history, tool results) lives in the worker process and would need to
be serialized and restored across defer/resume. XCom polling keeps the flow
simple and keeps all agent context in-process. Future versions may have different approaches.
Workflow¶
[Operator] [API Server / Plugin]
| |
| 1. Generate output |
| 2. Push session + output_1 |
| to XCom |
| |
| 3. Poll XCOM_HUMAN_ACTION |
| (sleep, poll, repeat) |
| | 4. Reviewer opens chat UI,
| | submits feedback / approve / reject
| | 5. Plugin writes human action
| | to XCom
| 6. Read action from XCom |
| |
| 7a. approve → return output |
| 7b. reject → raise HITLRejectException
| 7c. changes_requested |
| → regenerate_with_feedback |
| → push output_2, loop to 3 |
| 7d. max_iterations reached |
| (iteration >= max, human requests changes) |
| → push status max_iterations_exceeded, raise HITLMaxIterationsError
| 7e. hitl_timeout elapsed |
| → push status timeout_exceeded, raise HITLTimeoutError
Using HITL Review with AgentOperator¶
Enable the review loop with enable_hitl_review=True:
from airflow.providers.common.ai.operators.agent import AgentOperator
from airflow.providers.common.ai.toolsets.sql import SQLToolset
from datetime import timedelta
AgentOperator(
task_id="summarize",
prompt="Summarize the sales data",
llm_conn_id="openai",
toolsets=[SQLToolset(db_conn_id="postgres")],
enable_hitl_review=True,
hitl_timeout=timedelta(minutes=30),
hitl_poll_interval=10.0,
)
Parameters
enable_hitl_review— WhenTrue, the operator enters the review loop after the first generation. DefaultFalse.max_hitl_iterations— Maximum outputs the reviewer can see (1 = initial output plus subsequent regenerations). When the reviewer requests changes at iteration>= max_hitl_iterations, the task fails withHITLMaxIterationsErrorwithout running the LLM. For example,5allows changes at iterations 1–4; the fifth output must be either approved or rejected. Default5.hitl_timeout— Maximum wall-clock time to wait for all review rounds.None= no timeout (blocks until a terminal action).hitl_poll_interval— Seconds between XCom polls while waiting for a human response. Default10.
Accessing the chat UI — The chat loads as a React plugin on the task
instance page. Use the HITL Review extra link on the task instance, or
navigate to
/dags/{dag_id}/runs/{run_id}/tasks/{task_id}/plugin/hitl-review.
Example DAG
@dag
def example_agent_operator_hitl_review():
"""AgentOperator with HITL review — a human approves output via hitl-review plugin UI."""
AgentOperator(
task_id="summarize_with_review",
prompt="Summarize the Q4 sales report in 3 bullet points.",
llm_conn_id="pydantic_ai_default",
system_prompt="You are a concise business analyst.",
enable_hitl_review=True,
max_hitl_iterations=5,
hitl_timeout=timedelta(minutes=30),
hitl_poll_interval=10.0,
)
REST API¶
The plugin exposes a FastAPI app at /hitl-review. Base URL:
{AIRFLOW_BASE_URL}/hitl-review
Common query parameters (where applicable):
dag_id— DAG ID.run_id— DAG run ID.task_id— Task ID.map_index— Map index for mapped tasks. Use-1for non-mapped tasks or index for dynamic mapping.
Endpoints¶
Method |
Path |
Description |
|---|---|---|
GET |
|
Liveness check. Returns |
GET |
|
Find the feedback session for a task instance. Returns
|
POST |
|
Request changes. Body: |
POST |
|
Approve the current output. Session must be |
POST |
|
Reject the output. Session must be |
Response model: HITLReviewResponse¶
{
"dag_id": str,
"run_id": str,
"task_id": str,
"status": "pending_review"
| "changes_requested"
| "approved"
| "rejected"
| "max_iterations_exceeded"
| "timeout_exceeded",
"iteration": int,
"max_iterations": int,
"prompt": str,
"current_output": str,
"conversation": [{"role": "assistant" | "human", "content": str, "iteration": int}],
"task_completed": bool,
}
XCom keys and storage¶
All keys use the prefix airflow_hitl_review_.
Key |
Writer |
Value |
|---|---|---|
|
Worker |
|
|
Plugin |
|
|
Worker |
Per-iteration AI output (string or JSON) |
|
Plugin |
Per-iteration human feedback text |
Session lifecycle¶
pending_review — Awaiting human action. Plugin accepts approve, reject, or feedback.
changes_requested — Feedback submitted; worker is regenerating (or polling for the next action). Plugin does not accept new actions until the worker pushes a new output and status returns to
pending_review.approved / rejected — Terminal. Worker has exited the loop.
Chat UI¶
The plugin provides an interactive chat UI that loads in the task instance page. The UI:
Fetches session and conversation from the REST API
Displays the current output and feedback history
Submits approve, reject, or feedback via POST endpoints