LLMSQLQueryOperator

Use LLMSQLQueryOperator to generate SQL queries from natural language using an LLM.

The operator generates SQL but does not execute it. The generated query is returned as XCom and can be passed to SQLExecuteQueryOperator or used in downstream tasks.

Basic Usage

Provide a natural language prompt and the operator generates a SQL query:

airflow/providers/common/ai/example_dags/example_llm_sql.py[source]

@dag
def example_llm_sql_basic():
    LLMSQLQueryOperator(
        task_id="generate_sql",
        prompt="Find the top 10 customers by total revenue",
        llm_conn_id="pydanticai_default",
        schema_context=(
            "Table: customers\n"
            "Columns: id INT, name TEXT, email TEXT\n\n"
            "Table: orders\n"
            "Columns: id INT, customer_id INT, total DECIMAL, created_at TIMESTAMP"
        ),
    )


With Schema Introspection

Use db_conn_id and table_names to automatically include database schema in the LLM’s context. This produces more accurate queries because the LLM knows the actual column names and types:

airflow/providers/common/ai/example_dags/example_llm_sql.py[source]

@dag
def example_llm_sql_schema_introspection():
    LLMSQLQueryOperator(
        task_id="generate_sql",
        prompt="Calculate monthly revenue for 2024",
        llm_conn_id="pydanticai_default",
        db_conn_id="postgres_default",
        table_names=["orders", "customers"],
        dialect="postgres",
    )


With Object Storage

Use datasource_config to generate queries for data stored in object storage (e.g., S3, local filesystem) via DataFusion. The operator uses DataSourceConfig to register the object storage source as a table so the LLM can include it in the schema context.

airflow/providers/common/ai/example_dags/example_llm_sql.py[source]

@dag
def example_llm_sql_with_object_storage():
    datasource_config = DataSourceConfig(
        conn_id="aws_default",
        table_name="sales_data",
        uri="s3://my-bucket/data/sales/",
        format="parquet",
    )

    LLMSQLQueryOperator(
        task_id="generate_sql",
        prompt="Find the top 5 products by total sales amount",
        llm_conn_id="pydanticai_default",
        datasource_config=datasource_config,
    )


Once the SQL is generated, you can execute it against object storage data using AnalyticsOperator. Chain the two operators so that the generated query flows into the analytics execution step:

from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator
from airflow.providers.common.sql.config import DataSourceConfig
from airflow.providers.common.sql.operators.analytics import AnalyticsOperator

datasource_config = DataSourceConfig(
    conn_id="aws_default",
    table_name="sales_data",
    uri="s3://my-bucket/data/sales/",
    format="parquet",
)

generate_sql = LLMSQLQueryOperator(
    task_id="generate_sql",
    prompt="Find the top 5 products by total sales amount",
    llm_conn_id="pydanticai_default",
    datasource_config=datasource_config,
)

run_query = AnalyticsOperator(
    task_id="run_query",
    datasource_configs=[datasource_config],
    queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"],
)

generate_sql >> run_query

TaskFlow Decorator

The @task.llm_sql decorator lets you write a function that returns the prompt. The decorator handles LLM connection, schema introspection, SQL generation, and safety validation:

airflow/providers/common/ai/example_dags/example_llm_sql.py[source]

@dag
def example_llm_sql_decorator():
    @task.llm_sql(
        llm_conn_id="pydanticai_default",
        schema_context="Table: users\nColumns: id INT, name TEXT, signup_date DATE",
    )
    def build_churn_query(ds=None):
        return f"Find users who signed up before {ds} and have no orders"

    build_churn_query()


Dynamic Task Mapping

Generate SQL for multiple prompts in parallel using expand():

airflow/providers/common/ai/example_dags/example_llm_sql.py[source]

@dag
def example_llm_sql_expand():
    LLMSQLQueryOperator.partial(
        task_id="generate_sql",
        llm_conn_id="pydanticai_default",
        schema_context=(
            "Table: orders\nColumns: id INT, customer_id INT, total DECIMAL, created_at TIMESTAMP"
        ),
    ).expand(
        prompt=[
            "Total revenue by month",
            "Top 10 customers by order count",
            "Average order value by day of week",
        ]
    )


Human-in-the-Loop Approval

Set require_approval=True to pause the task after SQL generation and wait for a human reviewer to approve the query before it is returned. When allow_modifications=True, the reviewer can also edit the SQL — the modified query is re-validated against the same safety rules automatically:

airflow/providers/common/ai/example_dags/example_llm_sql.py[source]

@dag
def example_llm_sql_approval():
    from datetime import timedelta

    LLMSQLQueryOperator(
        task_id="generate_sql_with_approval",
        prompt="Find the top 10 customers by total revenue in the last quarter",
        llm_conn_id="pydanticai_default",
        schema_context=(
            "Table: customers\n"
            "Columns: id INT, name TEXT\n\n"
            "Table: orders\n"
            "Columns: id INT, customer_id INT, total DECIMAL, created_at TIMESTAMP"
        ),
        require_approval=True,
        approval_timeout=timedelta(hours=1),
        allow_modifications=True,
    )


SQL Safety Validation

By default, the operator validates generated SQL using an allowlist approach:

  • Only SELECT, UNION, INTERSECT, and EXCEPT statements are allowed.

  • Multi-statement SQL (semicolon-separated) is rejected.

  • Disallowed statements (INSERT, UPDATE, DELETE, DROP, etc.) raise SQLSafetyError.

You can disable validation with validate_sql=False or customize the allowed statement types with allowed_sql_types.

Logging

After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. At DEBUG level, the generated SQL is also logged (truncated to 500 characters). See AgentOperator — Logging for details on the log format.

Was this entry helpful?