LLMSchemaCompareOperator¶
Use LLMSchemaCompareOperator
to compare schemas across different database systems and detect drift using LLM reasoning.
The operator introspects schemas from multiple data sources and uses an LLM to identify
mismatches that would break data loading. The LLM handles complex cross-system type
mapping that simple equality checks miss (e.g., varchar(255) vs string,
timestamp vs timestamptz).
The result is a structured SchemaCompareResult
containing a list of mismatches with severity levels, descriptions, and suggested actions.
See also
Basic Usage¶
Provide db_conn_ids pointing to two or more database connections and
table_names to compare. The operator introspects each table via
DbApiHook.get_table_schema() and sends the schemas to the LLM:
@dag
def example_llm_schema_compare_basic():
LLMSchemaCompareOperator(
task_id="detect_schema_drift",
prompt="Identify schema mismatches that would break data loading between systems",
llm_conn_id="pydantic_ai_default",
db_conn_ids=["postgres_default", "snowflake_default"],
table_names=["customers"],
)
Full Context Strategy¶
Set context_strategy="full" to include primary keys, foreign keys, and indexes
in the schema context sent to the LLM.
@dag
def example_llm_schema_compare_full_context():
LLMSchemaCompareOperator(
task_id="detect_schema_drift",
prompt=(
"Compare schemas and generate a migration plan. "
"Flag any differences that would break nightly ETL loads."
),
llm_conn_id="pydantic_ai_default",
db_conn_ids=["postgres_source", "snowflake_target"],
table_names=["customers", "orders"],
context_strategy="full",
)
With Object Storage¶
Use data_sources with
DataSourceConfig to include
object-storage sources (S3 Parquet, CSV, Iceberg, etc.) in the comparison.
These can be freely combined with db_conn_ids:
@dag
def example_llm_schema_compare_with_object_storage():
s3_source = DataSourceConfig(
conn_id="aws_default",
table_name="customers",
uri="s3://data-lake/customers/",
format="parquet",
)
LLMSchemaCompareOperator(
task_id="compare_s3_vs_db",
prompt="Compare S3 Parquet schema against the Postgres table and flag breaking changes",
llm_conn_id="pydantic_ai_default",
db_conn_ids=["postgres_default"],
table_names=["customers"],
data_sources=[s3_source],
)
Customizing the System Prompt¶
The operator ships with a DEFAULT_SYSTEM_PROMPT that teaches the LLM about
cross-system type equivalences (e.g., varchar vs string, bigint vs
int64) and severity-level definitions (critical, warning, info).
When you pass a custom system_prompt, it replaces the default entirely.
If you want to keep the built-in rules and add any specific instructions
on top, concatenate them:
from airflow.providers.common.ai.operators.llm_schema_compare import (
DEFAULT_SYSTEM_PROMPT,
LLMSchemaCompareOperator,
)
LLMSchemaCompareOperator(
task_id="compare_with_custom_rules",
prompt="Compare schemas and flag breaking changes",
llm_conn_id="pydantic_ai_default",
db_conn_ids=["postgres_source", "snowflake_target"],
table_names=["customers"],
system_prompt=DEFAULT_SYSTEM_PROMPT
+ ("Project-specific rules:\n" "- Snowflake VARIANT columns are compatible with PostgreSQL jsonb.\n"),
)
TaskFlow Decorator¶
The @task.llm_schema_compare decorator lets you write a function that returns
the prompt. The decorator handles schema introspection, LLM comparison, and
structured output:
@dag
def example_llm_schema_compare_decorator():
@task.llm_schema_compare(
llm_conn_id="pydantic_ai_default",
db_conn_ids=["postgres_source", "snowflake_target"],
table_names=["customers"],
)
def check_migration_readiness(ds=None):
return f"Compare schemas as of {ds}. Flag breaking changes and suggest migration actions."
check_migration_readiness()
Conditional ETL Based on Schema Compatibility¶
The operator returns a dict with a compatible boolean. Use it with
@task.branch to gate downstream ETL on schema compatibility:
@dag
def example_llm_schema_compare_conditional():
@task.llm_schema_compare(
llm_conn_id="pydantic_ai_default",
db_conn_ids=["postgres_source", "snowflake_target"],
table_names=["customers"],
context_strategy="full",
)
def check_before_etl():
return (
"Compare schemas and flag any mismatches that would break data loading. "
"No migrations allowed — report only."
)
@task.branch
def decide(comparison_result):
if comparison_result["compatible"]:
return "run_etl"
return "notify_team"
comparison = check_before_etl()
decision = decide(comparison)
@task(task_id="run_etl")
def run_etl():
return "ETL completed"
@task(task_id="notify_team")
def notify_team():
return "Schema drift detected — team notified"
decision >> [run_etl(), notify_team()]
Structured Output¶
The operator always returns a dict (serialized from
SchemaCompareResult)
with these fields:
compatible(bool):Falseif any critical mismatches exist.mismatches(list): Each mismatch contains:source/target: The data source labels.column: Column where the mismatch was detected.source_type/target_type: The data types in each system.severity:"critical","warning", or"info".description: Human-readable explanation.suggested_action: Recommended resolution.migration_query: Suggested migration SQL.
summary(str): High-level summary of the comparison.
Parameters¶
prompt: Instructions for the LLM on what to compare and flag.llm_conn_id: Airflow connection ID for the LLM provider.model_id: Model identifier (e.g."openai:gpt-5"). Overrides the connection’s extra field.system_prompt: Instructions included in the LLM system prompt. Defaults toDEFAULT_SYSTEM_PROMPTwhich contains cross-system type equivalences and severity definitions. Passing a value replaces the default — concatenate withDEFAULT_SYSTEM_PROMPTto extend it (see Customizing the System Prompt above).agent_params: Additional keyword arguments passed to the pydantic-aiAgentconstructor.db_conn_ids: List of database connection IDs to compare. Each must resolve to aDbApiHook.table_names: Tables to introspect from eachdb_conn_id.data_sources: List ofDataSourceConfigobjects for object-storage or catalog-managed sources.context_strategy: To fetch primary keys, foreign keys, and indexes.``full`` orbasic, strongly recommended for cross-system comparisons. default isfull
Logging¶
After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. See AgentOperator — Logging for details on the log format.