AgentOperator & @task.agent¶
Use AgentOperator or
the @task.agent decorator to run an LLM agent with tools — the agent
reasons about the prompt, calls tools (database queries, API calls, etc.) in
a multi-turn loop, and returns a final answer.
This is different from
LLMOperator, which sends
a single prompt and returns the output. AgentOperator manages a stateful
tool-call loop where the LLM decides which tools to call and when to stop.
See also
SQL Agent¶
The most common pattern: give an agent access to a database so it can answer questions by writing and executing SQL.
@dag
def example_agent_operator_sql():
AgentOperator(
task_id="analyst",
prompt="What are the top 5 customers by order count?",
llm_conn_id="pydantic_ai_default",
system_prompt=(
"You are a SQL analyst. Use the available tools to explore "
"the schema and answer the question with data."
),
toolsets=[
SQLToolset(
db_conn_id="postgres_default",
allowed_tables=["customers", "orders"],
max_rows=20,
)
],
)
The SQLToolset provides four tools to the agent:
Tool |
Description |
|---|---|
|
Lists available table names (filtered by |
|
Returns column names and types for a table |
|
Executes a SQL query and returns rows as JSON |
|
Validates SQL syntax without executing it |
Hook-based Tools¶
Wrap any Airflow Hook’s methods as agent tools using HookToolset. Only
methods you explicitly list are exposed — there is no auto-discovery.
@dag
def example_agent_operator_hook():
from airflow.providers.http.hooks.http import HttpHook
http_hook = HttpHook(http_conn_id="my_api")
AgentOperator(
task_id="api_explorer",
prompt="What endpoints are available and what does /status return?",
llm_conn_id="pydantic_ai_default",
system_prompt="You are an API explorer. Use the tools to discover and call endpoints.",
toolsets=[
HookToolset(
http_hook,
allowed_methods=["run"],
tool_name_prefix="http_",
)
],
)
TaskFlow Decorator¶
The @task.agent decorator wraps AgentOperator. The function returns
the prompt string; all other parameters are passed to the operator.
@dag
def example_agent_decorator():
@task.agent(
llm_conn_id="pydantic_ai_default",
system_prompt="You are a data analyst. Use tools to answer questions.",
toolsets=[
SQLToolset(
db_conn_id="postgres_default",
allowed_tables=["orders"],
)
],
)
def analyze(question: str):
return f"Answer this question about our orders data: {question}"
analyze("What was our total revenue last month?")
Structured Output¶
Set output_type to a Pydantic BaseModel subclass to get structured
data back. The result is serialized via model_dump() for XCom.
@dag
def example_agent_structured_output():
from pydantic import BaseModel
class Analysis(BaseModel):
summary: str
top_items: list[str]
row_count: int
@task.agent(
llm_conn_id="pydantic_ai_default",
system_prompt="You are a data analyst. Return structured results.",
output_type=Analysis,
toolsets=[SQLToolset(db_conn_id="postgres_default")],
)
def analyze(question: str):
return f"Analyze: {question}"
analyze("What are the trending products this week?")
Chaining with Downstream Tasks¶
The agent’s output is pushed to XCom like any other operator, so downstream tasks can consume it.
@dag
def example_agent_chain():
@task.agent(
llm_conn_id="pydantic_ai_default",
system_prompt="You are a SQL analyst.",
toolsets=[SQLToolset(db_conn_id="postgres_default", allowed_tables=["orders"])],
)
def investigate(question: str):
return f"Investigate: {question}"
@task
def send_report(analysis: str):
"""Send the agent's analysis to a downstream system."""
print(f"Report: {analysis}")
return analysis
result = investigate("Summarize order trends for last quarter")
send_report(result)
Parameters¶
prompt: The prompt to send to the agent (operator) or the return value of the decorated function (decorator).llm_conn_id: Airflow connection ID for the LLM provider.model_id: Model identifier (e.g."openai:gpt-5"). Overrides the connection’s extra field.system_prompt: System-level instructions for the agent. Supports Jinja templating.output_type: Expected output type (default:str). Set to a PydanticBaseModelfor structured output.toolsets: List of pydantic-ai toolsets (SQLToolset,HookToolset, etc.).enable_tool_logging: Wrap each toolset inLoggingToolsetso that every tool call is logged in real time. DefaultTrue.agent_params: Additional keyword arguments passed to the pydantic-aiAgentconstructor (e.g.retries,model_settings).
Logging¶
All AI operators automatically log a post-run summary after run_sync()
completes. AgentOperator additionally wraps toolsets for real-time
per-tool-call logging (controlled by enable_tool_logging).
Real-time tool call logging (AgentOperator only) — each tool call is logged as it happens:
INFO - Tool call: list_tables
INFO - Tool list_tables returned in 0.12s
INFO - Tool call: get_schema
INFO - Tool get_schema returned in 0.08s
INFO - Tool call: query
INFO - Tool query returned in 0.34s
Tool arguments are logged at DEBUG level to avoid leaking sensitive data at the default log level.
Post-run summary (all operators) — after the LLM run finishes, a summary is logged with model name, token usage, and the full tool call sequence:
INFO - LLM run complete: model=gpt-5, requests=4, tool_calls=3, input_tokens=2847, output_tokens=512, total_tokens=3359
INFO - Tool call sequence: list_tables -> get_schema -> query
At DEBUG level, the LLM output is also logged (truncated to 500 characters).
Both layers use Airflow’s ::group:: / ::endgroup:: log markers, which
render as collapsible sections in the Airflow UI task log viewer.
To disable real-time tool logging while keeping the post-run summary:
AgentOperator(
task_id="my_agent",
prompt="...",
llm_conn_id="my_llm",
toolsets=[SQLToolset(db_conn_id="my_db")],
enable_tool_logging=False,
)
Security¶
See also
Toolsets — Security for defense layers,
allowed_tables limitations, HookToolset guidelines, recommended
configurations, and the production checklist.