LLMSchemaCompareOperator

Use LLMSchemaCompareOperator to compare schemas across different database systems and detect drift using LLM reasoning.

The operator introspects schemas from multiple data sources and uses an LLM to identify mismatches that would break data loading. The LLM handles complex cross-system type mapping that simple equality checks miss (e.g., varchar(255) vs string, timestamp vs timestamptz).

The result is a structured SchemaCompareResult containing a list of mismatches with severity levels, descriptions, and suggested actions.

Basic Usage

Provide db_conn_ids pointing to two or more database connections and table_names to compare. The operator introspects each table via DbApiHook.get_table_schema() and sends the schemas to the LLM:

airflow/providers/common/ai/example_dags/example_llm_schema_compare.py[source]

@dag
def example_llm_schema_compare_basic():
    LLMSchemaCompareOperator(
        task_id="detect_schema_drift",
        prompt="Identify schema mismatches that would break data loading between systems",
        llm_conn_id="pydantic_ai_default",
        db_conn_ids=["postgres_default", "snowflake_default"],
        table_names=["customers"],
    )


Full Context Strategy

Set context_strategy="full" to include primary keys, foreign keys, and indexes in the schema context sent to the LLM.

airflow/providers/common/ai/example_dags/example_llm_schema_compare.py[source]

@dag
def example_llm_schema_compare_full_context():
    LLMSchemaCompareOperator(
        task_id="detect_schema_drift",
        prompt=(
            "Compare schemas and generate a migration plan. "
            "Flag any differences that would break nightly ETL loads."
        ),
        llm_conn_id="pydantic_ai_default",
        db_conn_ids=["postgres_source", "snowflake_target"],
        table_names=["customers", "orders"],
        context_strategy="full",
    )


With Object Storage

Use data_sources with DataSourceConfig to include object-storage sources (S3 Parquet, CSV, Iceberg, etc.) in the comparison. These can be freely combined with db_conn_ids:

airflow/providers/common/ai/example_dags/example_llm_schema_compare.py[source]

@dag
def example_llm_schema_compare_with_object_storage():
    s3_source = DataSourceConfig(
        conn_id="aws_default",
        table_name="customers",
        uri="s3://data-lake/customers/",
        format="parquet",
    )

    LLMSchemaCompareOperator(
        task_id="compare_s3_vs_db",
        prompt="Compare S3 Parquet schema against the Postgres table and flag breaking changes",
        llm_conn_id="pydantic_ai_default",
        db_conn_ids=["postgres_default"],
        table_names=["customers"],
        data_sources=[s3_source],
    )


Customizing the System Prompt

The operator ships with a DEFAULT_SYSTEM_PROMPT that teaches the LLM about cross-system type equivalences (e.g., varchar vs string, bigint vs int64) and severity-level definitions (critical, warning, info).

When you pass a custom system_prompt, it replaces the default entirely. If you want to keep the built-in rules and add any specific instructions on top, concatenate them:

from airflow.providers.common.ai.operators.llm_schema_compare import (
    DEFAULT_SYSTEM_PROMPT,
    LLMSchemaCompareOperator,
)

LLMSchemaCompareOperator(
    task_id="compare_with_custom_rules",
    prompt="Compare schemas and flag breaking changes",
    llm_conn_id="pydantic_ai_default",
    db_conn_ids=["postgres_source", "snowflake_target"],
    table_names=["customers"],
    system_prompt=DEFAULT_SYSTEM_PROMPT
    + ("Project-specific rules:\n" "- Snowflake VARIANT columns are compatible with PostgreSQL jsonb.\n"),
)

TaskFlow Decorator

The @task.llm_schema_compare decorator lets you write a function that returns the prompt. The decorator handles schema introspection, LLM comparison, and structured output:

airflow/providers/common/ai/example_dags/example_llm_schema_compare.py[source]

@dag
def example_llm_schema_compare_decorator():
    @task.llm_schema_compare(
        llm_conn_id="pydantic_ai_default",
        db_conn_ids=["postgres_source", "snowflake_target"],
        table_names=["customers"],
    )
    def check_migration_readiness(ds=None):
        return f"Compare schemas as of {ds}. Flag breaking changes and suggest migration actions."

    check_migration_readiness()


Conditional ETL Based on Schema Compatibility

The operator returns a dict with a compatible boolean. Use it with @task.branch to gate downstream ETL on schema compatibility:

airflow/providers/common/ai/example_dags/example_llm_schema_compare.py[source]

@dag
def example_llm_schema_compare_conditional():
    @task.llm_schema_compare(
        llm_conn_id="pydantic_ai_default",
        db_conn_ids=["postgres_source", "snowflake_target"],
        table_names=["customers"],
        context_strategy="full",
    )
    def check_before_etl():
        return (
            "Compare schemas and flag any mismatches that would break data loading. "
            "No migrations allowed — report only."
        )

    @task.branch
    def decide(comparison_result):
        if comparison_result["compatible"]:
            return "run_etl"
        return "notify_team"

    comparison = check_before_etl()
    decision = decide(comparison)

    @task(task_id="run_etl")
    def run_etl():
        return "ETL completed"

    @task(task_id="notify_team")
    def notify_team():
        return "Schema drift detected — team notified"

    decision >> [run_etl(), notify_team()]


Structured Output

The operator always returns a dict (serialized from SchemaCompareResult) with these fields:

  • compatible (bool): False if any critical mismatches exist.

  • mismatches (list): Each mismatch contains:

    • source / target: The data source labels.

    • column: Column where the mismatch was detected.

    • source_type / target_type: The data types in each system.

    • severity: "critical", "warning", or "info".

    • description: Human-readable explanation.

    • suggested_action: Recommended resolution.

    • migration_query: Suggested migration SQL.

  • summary (str): High-level summary of the comparison.

Parameters

  • prompt: Instructions for the LLM on what to compare and flag.

  • llm_conn_id: Airflow connection ID for the LLM provider.

  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection’s extra field.

  • system_prompt: Instructions included in the LLM system prompt. Defaults to DEFAULT_SYSTEM_PROMPT which contains cross-system type equivalences and severity definitions. Passing a value replaces the default — concatenate with DEFAULT_SYSTEM_PROMPT to extend it (see Customizing the System Prompt above).

  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor.

  • db_conn_ids: List of database connection IDs to compare. Each must resolve to a DbApiHook.

  • table_names: Tables to introspect from each db_conn_id.

  • data_sources: List of DataSourceConfig objects for object-storage or catalog-managed sources.

  • context_strategy: To fetch primary keys, foreign keys, and indexes.``full`` or basic, strongly recommended for cross-system comparisons. default is full

Logging

After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. See AgentOperator — Logging for details on the log format.

Was this entry helpful?