airflow.providers.common.ai.operators.agent

Operator for running pydantic-ai agents with tools and multi-turn reasoning.

Classes

HITLReviewLink

Link that opens the live chat window for a running feedback session.

AgentOperator

Run a pydantic-ai Agent with tools and multi-turn reasoning.

Module Contents

Bases: airflow.providers.common.compat.sdk.BaseOperatorLink

Link that opens the live chat window for a running feedback session.

The URL is constructed directly from the task instance key so that the link is available immediately — even while the task is still running — without waiting for an XCom value to be committed.

name = 'HITL Review'[source]

Name of the link. This will be the button name on the task UI.

Link to external system.

Parameters:
  • operator (airflow.providers.common.compat.sdk.BaseOperator) – The Airflow operator object this link is associated to.

  • ti_key (airflow.providers.common.compat.sdk.TaskInstanceKey) – TaskInstance ID to return link for.

Returns:

link to external system

Return type:

str

class airflow.providers.common.ai.operators.agent.AgentOperator(*, prompt, llm_conn_id, model_id=None, system_prompt='', output_type=str, toolsets=None, enable_tool_logging=True, agent_params=None, durable=False, enable_hitl_review=False, max_hitl_iterations=5, hitl_timeout=None, hitl_poll_interval=10.0, **kwargs)[source]

Bases: airflow.providers.common.compat.sdk.BaseOperator, airflow.providers.common.ai.mixins.hitl_review.HITLReviewMixin

Run a pydantic-ai Agent with tools and multi-turn reasoning.

Provide llm_conn_id and optional toolsets to let the operator build and run the agent. The agent reasons about the prompt, calls tools in a multi-turn loop, and returns a final answer.

Parameters:
  • prompt (str) – The prompt to send to the agent.

  • llm_conn_id (str) – Connection ID for the LLM provider.

  • model_id (str | None) – Model identifier (e.g. "openai:gpt-5"). Overrides the model stored in the connection’s extra field.

  • system_prompt (str) – System-level instructions for the agent.

  • output_type (type) – Expected output type. Default str. Set to a Pydantic BaseModel subclass for structured output.

  • toolsets (list[pydantic_ai.toolsets.abstract.AbstractToolset] | None) – List of pydantic-ai toolsets the agent can use (e.g. SQLToolset, HookToolset).

  • enable_tool_logging (bool) – When True (default), wraps each toolset in a LoggingToolset that logs tool calls with timing at INFO level and arguments at DEBUG level. Set to False to disable.

  • agent_params (dict[str, Any] | None) – Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings).

  • durable (bool) – When True, enables step-level caching of model responses and tool results for durable execution. On retry, cached steps are replayed instead of re-executing. Default False. Requires [common.ai] durable_cache_path to be set.

HITL Review parameters (requires the hitl_review plugin):

Parameters:
  • enable_hitl_review (bool) – When True, the operator enters an iterative review loop after the first generation. A human reviewer can approve, reject, or request changes via the plugin’s REST API at /hitl-review or through the HITL Review extra link on the task instance. Default False.

  • max_hitl_iterations (int) – Maximum outputs shown to the reviewer (1 = initial output). When the reviewer requests changes at iteration >= this limit, the task fails with HITLMaxIterationsError without calling the LLM. E.g. 5 allows changes at iterations 1–4. Default 5.

  • hitl_timeout (datetime.timedelta | None) – Maximum wall-clock time to wait for all review rounds combined. None means no timeout (the operator blocks until a terminal action).

  • hitl_poll_interval (float) – Seconds between XCom polls while waiting for a human response. Default 10.

template_fields: collections.abc.Sequence[str] = ('prompt', 'llm_conn_id', 'model_id', 'system_prompt', 'agent_params')[source]
prompt[source]
llm_conn_id[source]
model_id = None[source]
system_prompt = ''[source]
output_type[source]
toolsets = None[source]
enable_tool_logging = True[source]
agent_params[source]
durable = False[source]
enable_hitl_review = False[source]
max_hitl_iterations = 5[source]
hitl_timeout = None[source]
hitl_poll_interval = 10.0[source]
property llm_hook: airflow.providers.common.ai.hooks.pydantic_ai.PydanticAIHook[source]

Return PydanticAIHook for the configured LLM connection.

execute(context)[source]

Derive when creating an operator.

The main method to execute the task. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

regenerate_with_feedback(*, feedback, message_history)[source]

Re-run the agent with feedback appended to the conversation history.

Was this entry helpful?