airflow.providers.common.ai.toolsets.sql

Curated SQL toolset wrapping DbApiHook for agentic database workflows.

Classes

SQLToolset

Curated toolset that gives an LLM agent safe access to a SQL database.

Module Contents

class airflow.providers.common.ai.toolsets.sql.SQLToolset(db_conn_id, *, allowed_tables=None, schema=None, allow_writes=False, max_rows=50)[source]

Bases: pydantic_ai.toolsets.abstract.AbstractToolset[Any]

Curated toolset that gives an LLM agent safe access to a SQL database.

Provides four tools — list_tables, get_schema, query, and check_query — inspired by LangChain’s SQLDatabaseToolkit pattern.

Uses a DbApiHook resolved lazily from the given db_conn_id.

When a tool fails, the database’s error message is returned to the agent as a retry (pydantic_ai.ModelRetry) so the model can correct its SQL within the run instead of failing the task. pydantic-ai bounds this by the tool’s max_retries, so an unrecoverable error – a bad connection or an auth failure – exhausts the retries and fails the task for Airflow to retry. The toolset does not inspect the error type or message.

Parameters:
  • db_conn_id (str) – Airflow connection ID for the database.

  • allowed_tables (list[str] | None) –

    Restrict which tables the agent can discover via list_tables and get_schema. None (default) exposes all tables in schema. Entries may be schema-qualified ("SCHEMA.TABLE") to span multiple schemas in one database – common on warehouses such as Snowflake. list_tables then introspects each referenced schema and returns the matching tables fully qualified, and get_schema routes to the table’s own schema. Unqualified entries use schema. Matching is case-insensitive, since databases reflect identifiers in their own case.

    Note

    allowed_tables controls metadata visibility only. It does not parse or validate table references in SQL queries. An LLM can still query tables outside this list if it guesses the name. For query-level restrictions, use database-level permissions (e.g. a read-only role with grants limited to specific tables).

  • schema (str | None) – Default schema/namespace for table listing and introspection, used for unqualified allowed_tables entries and unqualified get_schema calls. Schema-qualified allowed_tables entries override it per table.

  • allow_writes (bool) – Allow data-modifying SQL (INSERT, UPDATE, DELETE, etc.). Default False — only SELECT-family statements are permitted.

  • max_rows (int) – Maximum number of rows returned from the query tool. Default 50.

property id: str[source]

An ID for the toolset that is unique among all toolsets registered with the same agent.

If you’re implementing a concrete implementation that users can instantiate more than once, you should let them optionally pass a custom ID to the constructor and return that here.

A toolset needs to have an ID in order to be used in a durable execution environment like Temporal, in which case the ID will be used to identify the toolset’s activities within the workflow.

async get_tools(ctx)[source]

The tools that are available in this toolset.

async call_tool(name, tool_args, ctx, tool)[source]

Call a tool with the given arguments.

Args:

name: The name of the tool to call. tool_args: The arguments to pass to the tool. ctx: The run context. tool: The tool definition returned by [get_tools][pydantic_ai.toolsets.AbstractToolset.get_tools] that was called.

Was this entry helpful?