airflow.providers.openlineage.sqlparser

Attributes

log

DEFAULT_NAMESPACE

DEFAULT_INFORMATION_SCHEMA_COLUMNS

DEFAULT_INFORMATION_SCHEMA_TABLE_NAME

Classes

GetTableSchemasParams

get_table_schemas params.

DatabaseInfo

Contains database specific information needed to process SQL statement parse result.

SQLParser

Interface for openlineage-sql.

Functions

default_normalize_name_method(name)

from_table_meta(table_meta, database, namespace, ...)

get_openlineage_facets_with_sql(hook, sql, conn_id, ...)

Module Contents

airflow.providers.openlineage.sqlparser.log[source]
airflow.providers.openlineage.sqlparser.DEFAULT_NAMESPACE = 'default'[source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_COLUMNS = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_TABLE_NAME = 'information_schema.columns'[source]
airflow.providers.openlineage.sqlparser.default_normalize_name_method(name)[source]
class airflow.providers.openlineage.sqlparser.GetTableSchemasParams[source]

Bases: TypedDict

get_table_schemas params.

normalize_name: Callable[[str], str][source]
is_cross_db: bool[source]
information_schema_columns: list[str][source]
information_schema_table: str[source]
use_flat_cross_db_query: bool[source]
is_uppercase_names: bool[source]
database: str | None[source]
class airflow.providers.openlineage.sqlparser.DatabaseInfo[source]

Contains database specific information needed to process SQL statement parse result.

Parameters:
  • scheme – Scheme part of URI in OpenLineage namespace.

  • authority – Authority part of URI in OpenLineage namespace. For most cases it should return {host}:{port} part of Airflow connection. See: https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

  • database – Takes precedence over parsed database name.

  • information_schema_columns – List of columns names from information schema table.

  • information_schema_table_name – Information schema table name.

  • use_flat_cross_db_query

    Specifies whether a single, “global” information schema table should be used for cross-database queries (e.g., in Redshift), or if multiple, per-database “local” information schema tables should be queried individually.

    If True, assumes a single, universal information schema table is available (for example, in Redshift, the SVV_REDSHIFT_COLUMNS view) [https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_REDSHIFT_COLUMNS.html]. In this mode, we query only information_schema_table_name directly. Depending on the is_information_schema_cross_db argument, you can also filter by database name in the WHERE clause.

    If False, treats each database as having its own local information schema table containing metadata for that database only. As a result, one query per database may be generated and then combined (often via UNION ALL). This approach is necessary for dialects that do not maintain a single global view of all metadata or that require per-database queries. Depending on the is_information_schema_cross_db argument, queries can include or omit database information in both identifiers and filters.

    See is_information_schema_cross_db which also affects how final queries are constructed.

  • is_information_schema_cross_db

    Specifies whether database information should be tracked and included in queries that retrieve schema information from the information_schema_table. In short, this determines whether queries are capable of spanning multiple databases.

    If True, database identifiers are included wherever applicable, allowing retrieval of metadata from more than one database. For instance, in Snowflake or MS SQL (where each database is treated as a top-level namespace), you might have a query like:

    ` SELECT ... FROM db1.information_schema.columns WHERE ... UNION ALL SELECT ... FROM db2.information_schema.columns WHERE ... `

    In Redshift, setting this to True together with use_flat_cross_db_query=True allows adding database filters to the query, for example:

    ` SELECT ... FROM SVV_REDSHIFT_COLUMNS WHERE SVV_REDSHIFT_COLUMNS.database == db1  # This is skipped when False AND SVV_REDSHIFT_COLUMNS.schema == schema1 AND SVV_REDSHIFT_COLUMNS.table IN (table1, table2) OR ... `

    However, certain databases (e.g., PostgreSQL) do not permit true cross-database queries. In such dialects, enabling cross-database support may lead to errors or be unnecessary. Always consult your dialect’s documentation or test sample queries to confirm if cross-database querying is supported.

    If False, database qualifiers are ignored, effectively restricting queries to a single database (or making the database-level qualifier optional). This is typically safer for databases that do not support cross-database operations or only provide a two-level namespace (schema + table) instead of a three-level one (database + schema + table). For example, some MySQL or PostgreSQL contexts might not need or permit cross-database queries at all.

    See use_flat_cross_db_query which also affects how final queries are constructed.

  • is_uppercase_names – Specifies if database accepts only uppercase names (e.g. Snowflake).

  • normalize_name_method – Method to normalize database, schema and table names. Defaults to name.lower().

scheme: str[source]
authority: str | None = None[source]
database: str | None = None[source]
information_schema_columns: list[str] = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
information_schema_table_name: str = 'information_schema.columns'[source]
use_flat_cross_db_query: bool = False[source]
is_information_schema_cross_db: bool = False[source]
is_uppercase_names: bool = False[source]
normalize_name_method: Callable[[str], str][source]
airflow.providers.openlineage.sqlparser.from_table_meta(table_meta, database, namespace, is_uppercase)[source]
class airflow.providers.openlineage.sqlparser.SQLParser(dialect=None, default_schema=None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Interface for openlineage-sql.

Parameters:
  • dialect (str | None) – dialect specific to the database

  • default_schema (str | None) – schema applied to each table with no schema parsed

dialect = None[source]
default_schema = None[source]
parse(sql)[source]

Parse a single or a list of SQL statements.

parse_table_schemas(hook, inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None, sqlalchemy_engine=None)[source]

Parse schemas for input and output tables.

get_metadata_from_parser(inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None)[source]
attach_column_lineage(datasets, database, parse_result)[source]

Attaches column lineage facet to the list of datasets.

Note that currently each dataset has the same column lineage information set. This would be a matter of change after OpenLineage SQL Parser improvements.

generate_openlineage_metadata_from_sql(sql, hook, database_info, database=None, sqlalchemy_engine=None, use_connection=True)[source]

Parse SQL statement(s) and generate OpenLineage metadata.

Generated OpenLineage metadata contains:

  • input tables with schemas parsed

  • output tables with schemas parsed

  • run facets

  • job facets.

Parameters:
  • sql (list[str] | str) – a SQL statement or list of SQL statement to be parsed

  • hook (airflow.hooks.base.BaseHook) – Airflow Hook used to connect to the database

  • database_info (DatabaseInfo) – database specific information

  • database (str | None) – when passed it takes precedence over parsed database name

  • sqlalchemy_engine (sqlalchemy.engine.Engine | None) – when passed, engine’s dialect is used to compile SQL queries

static create_namespace(database_info)[source]
classmethod normalize_sql(sql)[source]

Make sure to return a semicolon-separated SQL statement.

classmethod split_sql_string(sql)[source]

Split SQL string into list of statements.

Tries to use DbApiHook.split_sql_string if available. Otherwise, uses the same logic.

create_information_schema_query(tables, normalize_name, is_cross_db, information_schema_columns, information_schema_table, is_uppercase_names, use_flat_cross_db_query, database=None, sqlalchemy_engine=None)[source]

Create SELECT statement to query information schema table.

airflow.providers.openlineage.sqlparser.get_openlineage_facets_with_sql(hook, sql, conn_id, database)[source]

Was this entry helpful?