airflow.providers.openlineage.api.sql

Public helpers for emitting OpenLineage events describing SQL query executions.

Functions

emit_query_lineage(*[, query_id, ...])

Emit a START + COMPLETE/FAIL OpenLineage event pair describing a single SQL query execution.

Module Contents

airflow.providers.openlineage.api.sql.emit_query_lineage(*, query_id=None, query_source_namespace=None, query_text=None, inputs=None, outputs=None, start_time=None, end_time=None, is_successful=True, error_message=None, default_database=None, default_schema=None, job_name=None, task_instance=None, additional_run_facets=None, additional_job_facets=None, raise_on_error=False)[source]

Emit a START + COMPLETE/FAIL OpenLineage event pair describing a single SQL query execution.

The emitted events carry a parent run facet pointing at the currently executing Airflow task run. Any OpenLineage root information present on the task instance is propagated to the emitted events so the entire run hierarchy stays connected. This helper can be called multiple times within a single task; each call produces a distinct query event pair identified by a sequential job name suffix (<dag_id>.<task_id>.query.<n>).

Parameters:
  • query_id (str | None) – Unique identifier of the query in the given query_source_namespace. When both query_id and query_source_namespace are provided, an externalQuery run facet is attached to the emitted events.

  • query_source_namespace (str | None) – OpenLineage namespace of the system that ran the query, e.g. "snowflake://org-acct", "databricks://adb-<id>.azuredatabricks.net", "bigquery".

  • query_text (str | None) – Raw SQL query text. When provided, it is attached via a sql JobFacet and inputs/outputs explicitly supplied are enriched with datasets retrieved from query parsing.

  • inputs (list[openlineage.client.event_v2.Dataset] | None) – Additional input datasets.

  • outputs (list[openlineage.client.event_v2.Dataset] | None) – Additional output datasets.

  • start_time (datetime.datetime | None) – Event time of the START event. Defaults to the current UTC time.

  • end_time (datetime.datetime | None) – Event time of the COMPLETE/FAIL event. Defaults to the current UTC time.

  • is_successful (bool) – Whether the query completed successfully (COMPLETE) or failed (FAIL).

  • error_message (str | None) – Optional error message attached as an errorMessage run facet.

  • default_database (str | None) – Default database for resolving unqualified tables in query_text.

  • default_schema (str | None) – Default schema for resolving unqualified tables in query_text.

  • job_name (str | None) – Job name to use in both events. Defaults to <ti_job_name>.manual_query.<counter>.

  • task_instance (airflow.models.taskinstance.TaskInstance | airflow.sdk.execution_time.task_runner.RuntimeTaskInstance | None) – The Airflow task instance to attribute the query to. Defaults to the currently executing task instance obtained from the execution context.

  • additional_run_facets (dict[str, openlineage.client.facet_v2.RunFacet] | None) – Extra run facets to merge into the emitted events.

  • additional_job_facets (dict[str, openlineage.client.facet_v2.JobFacet] | None) – Extra job facets to merge into the emitted events.

  • raise_on_error (bool) – When False (default), any exception raised while building or emitting the events is logged at WARNING level and the function returns silently — so a broken lineage helper never breaks a user’s task. Set to True to opt into normal exception propagation.

Raises:

RuntimeError – When raise_on_error=True, if task_instance is not provided and cannot be resolved from the current execution context.

Example:

from airflow.providers.openlineage.api import emit_query_lineage


@task
def my_task():
    emit_query_lineage(
        query_id="acde070d-8c4c-4f0d-9d8a-162843c10333",
        query_source_namespace="databricks://adb-498971240325220.10.azuredatabricks.net",
        query_text="SELECT * FROM analytics.public.users",
    )

Was this entry helpful?