Human-in-the-Loop (HITL) Review For Agentic Operators

HITL Review adds an interactive feedback loop to agentic operators. After the LLM Agent produces an initial output, a human reviewer can approve, reject, or request changes through a chat UI. The operator blocks until a terminal action, or until a timeout is reached or max_iterations reached.

This document describes the architecture, workflow, API, XCom schema, and usage.

Overview

Components

  • HITL Review Plugin — FastAPI app mounted at /hitl-review on the Airflow API server. Provides REST endpoints and a chat UI for reviewers.

Storage — All state is stored in XCom on the running task instance. The worker writes session and agent outputs; the plugin writes human feedback and actions. Both sides read and write the same keys.

Compatibility — Requires Airflow 3.1+. Uses the Task SDK execution model where workers communicate via the Execution API and XCom; the plugin runs on the API server and accesses the metadata database.

Important

Worker slot usage — Each HITL task holds a worker slot for the entire review duration (until approve, reject, or timeout or max_iterations). The operator polls XCom with time.sleep; it does not defer. With a 10-second poll interval and review times of 30+ minutes, the worker is occupied for the duration.

Implementation: XCom polling vs deferral — This implementation uses XCom polling with time.sleep rather than the deferral/Triggerer pattern (as used by the standard provider’s HITLOperator). Deferral would free the worker during review but adds cross-process coordination complexity: the agent state (message history, tool results) lives in the worker process and would need to be serialized and restored across defer/resume. XCom polling keeps the flow simple and keeps all agent context in-process. Future versions may have different approaches.

Workflow

[Operator]                    [API Server / Plugin]
     |                                 |
     | 1. Generate output              |
     | 2. Push session + output_1      |
     |    to XCom                      |
     |                                 |
     | 3. Poll XCOM_HUMAN_ACTION       |
     |    (sleep, poll, repeat)        |
     |                                 | 4. Reviewer opens chat UI,
     |                                 |    submits feedback / approve / reject
     |                                 | 5. Plugin writes human action
     |                                 |    to XCom
     | 6. Read action from XCom        |
     |                                 |
     | 7a. approve → return output     |
     | 7b. reject  → raise HITLRejectException
     | 7c. changes_requested           |
     |     → regenerate_with_feedback  |
     |     → push output_2, loop to 3  |
     | 7d. max_iterations reached      |
     |     (iteration >= max, human requests changes) |
     |     → push status max_iterations_exceeded, raise HITLMaxIterationsError
     | 7e. hitl_timeout elapsed        |
     |     → push status timeout_exceeded, raise HITLTimeoutError

Using HITL Review with AgentOperator

Enable the review loop with enable_hitl_review=True:

from airflow.providers.common.ai.operators.agent import AgentOperator
from airflow.providers.common.ai.toolsets.sql import SQLToolset
from datetime import timedelta

AgentOperator(
    task_id="summarize",
    prompt="Summarize the sales data",
    llm_conn_id="openai",
    toolsets=[SQLToolset(db_conn_id="postgres")],
    enable_hitl_review=True,
    hitl_timeout=timedelta(minutes=30),
    hitl_poll_interval=10.0,
)

Parameters

  • enable_hitl_review — When True, the operator enters the review loop after the first generation. Default False.

  • max_hitl_iterations — Maximum outputs the reviewer can see (1 = initial output plus subsequent regenerations). When the reviewer requests changes at iteration >= max_hitl_iterations, the task fails with HITLMaxIterationsError without running the LLM. For example, 5 allows changes at iterations 1–4; the fifth output must be either approved or rejected. Default 5.

  • hitl_timeout — Maximum wall-clock time to wait for all review rounds. None = no timeout (blocks until a terminal action).

  • hitl_poll_interval — Seconds between XCom polls while waiting for a human response. Default 10.

Accessing the chat UI — The chat loads as a React plugin on the task instance page. Use the HITL Review extra link on the task instance, or navigate to /dags/{dag_id}/runs/{run_id}/tasks/{task_id}/plugin/hitl-review.

Example DAG

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag
def example_agent_operator_hitl_review():
    """AgentOperator with HITL review — a human approves output via hitl-review plugin UI."""
    AgentOperator(
        task_id="summarize_with_review",
        prompt="Summarize the Q4 sales report in 3 bullet points.",
        llm_conn_id="pydantic_ai_default",
        system_prompt="You are a concise business analyst.",
        enable_hitl_review=True,
        max_hitl_iterations=5,
        hitl_timeout=timedelta(minutes=30),
        hitl_poll_interval=10.0,
    )


REST API

The plugin exposes a FastAPI app at /hitl-review. Base URL:

{AIRFLOW_BASE_URL}/hitl-review

Common query parameters (where applicable):

  • dag_id — DAG ID.

  • run_id — DAG run ID.

  • task_id — Task ID.

  • map_index — Map index for mapped tasks. Use -1 for non-mapped tasks or index for dynamic mapping.

Endpoints

Method

Path

Description

GET

/health

Liveness check. Returns {"status": "ok"}.

GET

/sessions/find

Find the feedback session for a task instance. Returns HITLReviewResponse or 404 if no session.

POST

/sessions/feedback

Request changes. Body: {"feedback": "..."}. Session must be pending_review. Returns updated session.

POST

/sessions/approve

Approve the current output. Session must be pending_review.

POST

/sessions/reject

Reject the output. Session must be pending_review.

Response model: HITLReviewResponse

{
    "dag_id": str,
    "run_id": str,
    "task_id": str,
    "status": "pending_review"
    | "changes_requested"
    | "approved"
    | "rejected"
    | "max_iterations_exceeded"
    | "timeout_exceeded",
    "iteration": int,
    "max_iterations": int,
    "prompt": str,
    "current_output": str,
    "conversation": [{"role": "assistant" | "human", "content": str, "iteration": int}],
    "task_completed": bool,
}

XCom keys and storage

All keys use the prefix airflow_hitl_review_.

Key

Writer

Value

airflow_hitl_review_agent_session

Worker

AgentSessionData: status, iteration, prompt, current_output

airflow_hitl_review_human_action

Plugin

HumanActionData: action (approve|reject|changes_requested), feedback, iteration

airflow_hitl_review_agent_output_1, _2, …

Worker

Per-iteration AI output (string or JSON)

airflow_hitl_review_human_feedback_1, _2, …

Plugin

Per-iteration human feedback text

Session lifecycle

  • pending_review — Awaiting human action. Plugin accepts approve, reject, or feedback.

  • changes_requested — Feedback submitted; worker is regenerating (or polling for the next action). Plugin does not accept new actions until the worker pushes a new output and status returns to pending_review.

  • approved / rejected — Terminal. Worker has exited the loop.

Chat UI

The plugin provides an interactive chat UI that loads in the task instance page. The UI:

  • Fetches session and conversation from the REST API

  • Displays the current output and feedback history

  • Submits approve, reject, or feedback via POST endpoints

Was this entry helpful?