LLMSQLQueryOperator¶
Use LLMSQLQueryOperator to generate
SQL queries from natural language using an LLM.
The operator generates SQL but does not execute it. The generated query is returned
as XCom and can be passed to SQLExecuteQueryOperator or used in downstream tasks.
See also
Basic Usage¶
Provide a natural language prompt and the operator generates a SQL query:
@dag
def example_llm_sql_basic():
LLMSQLQueryOperator(
task_id="generate_sql",
prompt="Find the top 10 customers by total revenue",
llm_conn_id="pydanticai_default",
schema_context=(
"Table: customers\n"
"Columns: id INT, name TEXT, email TEXT\n\n"
"Table: orders\n"
"Columns: id INT, customer_id INT, total DECIMAL, created_at TIMESTAMP"
),
)
With Schema Introspection¶
Use db_conn_id and table_names to automatically include database schema
in the LLM’s context. This produces more accurate queries because the LLM knows
the actual column names and types:
@dag
def example_llm_sql_schema_introspection():
LLMSQLQueryOperator(
task_id="generate_sql",
prompt="Calculate monthly revenue for 2024",
llm_conn_id="pydanticai_default",
db_conn_id="postgres_default",
table_names=["orders", "customers"],
dialect="postgres",
)
With Object Storage¶
Use datasource_config to generate queries for data stored in object storage
(e.g., S3, local filesystem) via DataFusion.
The operator uses DataSourceConfig
to register the object storage source as a table so the LLM can include it in
the schema context.
@dag
def example_llm_sql_with_object_storage():
datasource_config = DataSourceConfig(
conn_id="aws_default",
table_name="sales_data",
uri="s3://my-bucket/data/sales/",
format="parquet",
)
LLMSQLQueryOperator(
task_id="generate_sql",
prompt="Find the top 5 products by total sales amount",
llm_conn_id="pydanticai_default",
datasource_config=datasource_config,
)
Once the SQL is generated, you can execute it against object storage data using
AnalyticsOperator.
Chain the two operators so that the generated query flows into the analytics
execution step:
from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator
from airflow.providers.common.sql.config import DataSourceConfig
from airflow.providers.common.sql.operators.analytics import AnalyticsOperator
datasource_config = DataSourceConfig(
conn_id="aws_default",
table_name="sales_data",
uri="s3://my-bucket/data/sales/",
format="parquet",
)
generate_sql = LLMSQLQueryOperator(
task_id="generate_sql",
prompt="Find the top 5 products by total sales amount",
llm_conn_id="pydanticai_default",
datasource_config=datasource_config,
)
run_query = AnalyticsOperator(
task_id="run_query",
datasource_configs=[datasource_config],
queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
)
generate_sql >> run_query
TaskFlow Decorator¶
The @task.llm_sql decorator lets you write a function that returns the
prompt. The decorator handles LLM connection, schema introspection, SQL generation,
and safety validation:
@dag
def example_llm_sql_decorator():
@task.llm_sql(
llm_conn_id="pydanticai_default",
schema_context="Table: users\nColumns: id INT, name TEXT, signup_date DATE",
)
def build_churn_query(ds=None):
return f"Find users who signed up before {ds} and have no orders"
build_churn_query()
Dynamic Task Mapping¶
Generate SQL for multiple prompts in parallel using expand():
@dag
def example_llm_sql_expand():
LLMSQLQueryOperator.partial(
task_id="generate_sql",
llm_conn_id="pydanticai_default",
schema_context=(
"Table: orders\nColumns: id INT, customer_id INT, total DECIMAL, created_at TIMESTAMP"
),
).expand(
prompt=[
"Total revenue by month",
"Top 10 customers by order count",
"Average order value by day of week",
]
)
Human-in-the-Loop Approval¶
Set require_approval=True to pause the task after SQL generation and wait
for a human reviewer to approve the query before it is returned.
When allow_modifications=True, the reviewer can also edit the SQL — the
modified query is re-validated against the same safety rules automatically:
@dag
def example_llm_sql_approval():
from datetime import timedelta
LLMSQLQueryOperator(
task_id="generate_sql_with_approval",
prompt="Find the top 10 customers by total revenue in the last quarter",
llm_conn_id="pydanticai_default",
schema_context=(
"Table: customers\n"
"Columns: id INT, name TEXT\n\n"
"Table: orders\n"
"Columns: id INT, customer_id INT, total DECIMAL, created_at TIMESTAMP"
),
require_approval=True,
approval_timeout=timedelta(hours=1),
allow_modifications=True,
)
SQL Safety Validation¶
By default, the operator validates generated SQL using an allowlist approach:
Only
SELECT,UNION,INTERSECT, andEXCEPTstatements are allowed.Multi-statement SQL (semicolon-separated) is rejected.
Disallowed statements (
INSERT,UPDATE,DELETE,DROP, etc.) raiseSQLSafetyError.
You can disable validation with validate_sql=False or customize the allowed
statement types with allowed_sql_types.
Logging¶
After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. At DEBUG level, the generated SQL is also logged (truncated to 500 characters). See AgentOperator — Logging for details on the log format.