AgentOperator & @task.agent

Use AgentOperator or the @task.agent decorator to run an LLM agent with tools — the agent reasons about the prompt, calls tools (database queries, API calls, etc.) in a multi-turn loop, and returns a final answer.

This is different from LLMOperator, which sends a single prompt and returns the output. AgentOperator manages a stateful tool-call loop where the LLM decides which tools to call and when to stop.

SQL Agent

The most common pattern: give an agent access to a database so it can answer questions by writing and executing SQL.

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag
def example_agent_operator_sql():
    AgentOperator(
        task_id="analyst",
        prompt="What are the top 5 customers by order count?",
        llm_conn_id="pydantic_ai_default",
        system_prompt=(
            "You are a SQL analyst. Use the available tools to explore "
            "the schema and answer the question with data."
        ),
        toolsets=[
            SQLToolset(
                db_conn_id="postgres_default",
                allowed_tables=["customers", "orders"],
                max_rows=20,
            )
        ],
    )


The SQLToolset provides four tools to the agent:

Tool

Description

list_tables

Lists available table names (filtered by allowed_tables if set)

get_schema

Returns column names and types for a table

query

Executes a SQL query and returns rows as JSON

check_query

Validates SQL syntax without executing it

Hook-based Tools

Wrap any Airflow Hook’s methods as agent tools using HookToolset. Only methods you explicitly list are exposed — there is no auto-discovery.

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag
def example_agent_operator_hook():
    from airflow.providers.http.hooks.http import HttpHook

    http_hook = HttpHook(http_conn_id="my_api")

    AgentOperator(
        task_id="api_explorer",
        prompt="What endpoints are available and what does /status return?",
        llm_conn_id="pydantic_ai_default",
        system_prompt="You are an API explorer. Use the tools to discover and call endpoints.",
        toolsets=[
            HookToolset(
                http_hook,
                allowed_methods=["run"],
                tool_name_prefix="http_",
            )
        ],
    )


TaskFlow Decorator

The @task.agent decorator wraps AgentOperator. The function returns the prompt string; all other parameters are passed to the operator.

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag
def example_agent_decorator():
    @task.agent(
        llm_conn_id="pydantic_ai_default",
        system_prompt="You are a data analyst. Use tools to answer questions.",
        toolsets=[
            SQLToolset(
                db_conn_id="postgres_default",
                allowed_tables=["orders"],
            )
        ],
    )
    def analyze(question: str):
        return f"Answer this question about our orders data: {question}"

    analyze("What was our total revenue last month?")


Structured Output

Set output_type to a Pydantic BaseModel subclass to get structured data back. The result is serialized via model_dump() for XCom.

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag
def example_agent_structured_output():
    from pydantic import BaseModel

    class Analysis(BaseModel):
        summary: str
        top_items: list[str]
        row_count: int

    @task.agent(
        llm_conn_id="pydantic_ai_default",
        system_prompt="You are a data analyst. Return structured results.",
        output_type=Analysis,
        toolsets=[SQLToolset(db_conn_id="postgres_default")],
    )
    def analyze(question: str):
        return f"Analyze: {question}"

    analyze("What are the trending products this week?")


Chaining with Downstream Tasks

The agent’s output is pushed to XCom like any other operator, so downstream tasks can consume it.

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag
def example_agent_chain():
    @task.agent(
        llm_conn_id="pydantic_ai_default",
        system_prompt="You are a SQL analyst.",
        toolsets=[SQLToolset(db_conn_id="postgres_default", allowed_tables=["orders"])],
    )
    def investigate(question: str):
        return f"Investigate: {question}"

    @task
    def send_report(analysis: str):
        """Send the agent's analysis to a downstream system."""
        print(f"Report: {analysis}")
        return analysis

    result = investigate("Summarize order trends for last quarter")
    send_report(result)


Parameters

  • prompt: The prompt to send to the agent (operator) or the return value of the decorated function (decorator).

  • llm_conn_id: Airflow connection ID for the LLM provider.

  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection’s extra field.

  • system_prompt: System-level instructions for the agent. Supports Jinja templating.

  • output_type: Expected output type (default: str). Set to a Pydantic BaseModel for structured output.

  • toolsets: List of pydantic-ai toolsets (SQLToolset, HookToolset, etc.).

  • enable_tool_logging: Wrap each toolset in LoggingToolset so that every tool call is logged in real time. Default True.

  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings).

Logging

All AI operators automatically log a post-run summary after run_sync() completes. AgentOperator additionally wraps toolsets for real-time per-tool-call logging (controlled by enable_tool_logging).

Real-time tool call logging (AgentOperator only) — each tool call is logged as it happens:

INFO - Tool call: list_tables
INFO - Tool list_tables returned in 0.12s
INFO - Tool call: get_schema
INFO - Tool get_schema returned in 0.08s
INFO - Tool call: query
INFO - Tool query returned in 0.34s

Tool arguments are logged at DEBUG level to avoid leaking sensitive data at the default log level.

Post-run summary (all operators) — after the LLM run finishes, a summary is logged with model name, token usage, and the full tool call sequence:

INFO - LLM run complete: model=gpt-5, requests=4, tool_calls=3, input_tokens=2847, output_tokens=512, total_tokens=3359
INFO - Tool call sequence: list_tables -> get_schema -> query

At DEBUG level, the LLM output is also logged (truncated to 500 characters).

Both layers use Airflow’s ::group:: / ::endgroup:: log markers, which render as collapsible sections in the Airflow UI task log viewer.

To disable real-time tool logging while keeping the post-run summary:

AgentOperator(
    task_id="my_agent",
    prompt="...",
    llm_conn_id="my_llm",
    toolsets=[SQLToolset(db_conn_id="my_db")],
    enable_tool_logging=False,
)

Security

See also

Toolsets — Security for defense layers, allowed_tables limitations, HookToolset guidelines, recommended configurations, and the production checklist.

Was this entry helpful?