Airflow Summit 2026 is coming August 31 - September 2 in Austin, TX. Register now to secure your spot!

Source code for airflow.providers.openlineage.api.emission_policy

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Public authoring-time API for per-task / per-DAG OpenLineage emission control.

The central function is :func:`extend_global_openlineage_emission_policy`. It lets Dag authors
override the global ``emission_policy`` Airflow configuration on individual tasks or entire
Dags at authoring time, *extending* the deployment-wide policy with a per-Dag / per-task delta.

Quick-start examples
--------------------

**Disable all OpenLineage events for a sensitive task**::

    from airflow.providers.openlineage.api.emission_policy import (
        extend_global_openlineage_emission_policy,
    )

    with DAG("my_dag", ...) as dag:
        extract = PythonOperator(task_id="extract", ...)
        sensitive = PythonOperator(task_id="sensitive", ...)

    extend_global_openlineage_emission_policy(sensitive, emit=False)

**Disable source-code capture for an entire Dag, then re-enable for one task**::

    extend_global_openlineage_emission_policy(dag, include_source_code=False)  # all tasks in dag
    extend_global_openlineage_emission_policy(extract, include_source_code=True)  # override

**Use as the return value (fluent / inline style)**::

    with DAG("my_dag", ...) as dag:
        task = extend_global_openlineage_emission_policy(
            PythonOperator(task_id="my_task", python_callable=my_fn),
            include_source_code=False,
            hook_lineage=False,
        )

**Suppress DAG-run events while keeping task events**::

    extend_global_openlineage_emission_policy(dag, emit_dag_events=False)

**Works with XComArg** — flags are applied to the underlying operator::

    result = extend_global_openlineage_emission_policy(
        my_python_task_function(),  # returns XComArg
        extract_operator_metadata=False,
    )

Flags and locked conf rules
----------------------------

These flags sit **above** the ``emission_policy`` Airflow configuration in the resolution stack.
However, if an Airflow admin marks a conf rule with ``locked: true``, that field is protected
and cannot be overridden here — the attempt is silently ignored and an INFO log is emitted.

Example conf rule that locks include_source_code for all tasks::

    [openlineage]
    emission_policy = [{"scope": {}, "controls": {"include_source_code": false}, "locked": true}]

Even if a Dag author calls
``extend_global_openlineage_emission_policy(task, include_source_code=True)`` after that,
the lock wins and source code is NOT included.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, TypeVar

from airflow.providers.common.compat.sdk import DAG, XComArg
from airflow.providers.openlineage.utils.emission_policy import (
    _DAG_FLAG_KEYS,
    _TASK_FLAG_KEYS,
    EMIT,
    EMIT_DAG_EVENTS,
    EMIT_TASK_EVENTS,
    EXTRACT_OPERATOR_METADATA,
    HOOK_LINEAGE,
    INCLUDE_FULL_TASK_INFO,
    INCLUDE_SOURCE_CODE,
    OL_EMISSION_POLICY_PARAM,
    _merge_param,
)

if TYPE_CHECKING:
    from airflow.providers.common.compat.sdk import BaseOperator, MappedOperator

    T = TypeVar("T", bound="DAG | BaseOperator | MappedOperator")

log = logging.getLogger(__name__)

__all__ = [
    "extend_global_openlineage_emission_policy",
]


[docs] def extend_global_openlineage_emission_policy( obj: T, *, emit: bool | None = None, emit_task_events: bool | None = None, emit_dag_events: bool | None = None, extract_operator_metadata: bool | None = None, include_source_code: bool | None = None, hook_lineage: bool | None = None, include_full_task_info: bool | None = None, ) -> T: """ Extend the global OpenLineage emission policy with per-task / per-DAG overrides. Flags left as ``None`` are not set and will fall through to the ``emission_policy`` Airflow configuration (or its built-in defaults). Only explicitly provided flags are stored — successive calls **merge** into any previously stored flags. When called on a **DAG**, flags are applied as follows: - Task-relevant flags (``emit``, ``emit_task_events``, ``extract_operator_metadata``, ``include_source_code``, ``hook_lineage``, ``include_full_task_info``) are **propagated to all tasks** in the Dag at the time of the call. - DAG-run-level flags (``emit``, ``emit_dag_events``) are stored on the Dag itself. - ``emit_dag_events`` is meaningless on a task and logs a warning if provided. .. warning:: **DAG-level calls only propagate to tasks that already exist on the DAG when the call runs.** Tasks added later (for example, when ``extend_global_openlineage_emission_policy`` is called inside ``with DAG(...) as dag:`` before the operators are defined) will **not** inherit the flags. Call ``extend_global_openlineage_emission_policy(dag, ...)`` **after** all task definitions, or set flags per-task. .. note:: There is no "unset" API. Passing ``None`` for a flag is treated as "not provided" — it does not remove a previously-stored value. To override, pass the explicit boolean you want, or call again with the new value (successive calls merge, later wins per key). ``emit`` is a shorthand that affects both task and Dag events: - ``emit=False`` — disables **all** OpenLineage events (task + Dag). - ``emit=True, emit_task_events=False`` — Dag-run events only (task events off). - ``emit=True, emit_dag_events=False`` — task events only (Dag-run events off). These flags sit **above** the ``emission_policy`` Airflow configuration in priority. If an admin marks a conf rule with ``locked: true``, that field is protected and cannot be overridden by this function. .. note:: **No global authoring scope.** ``extend_global_openlineage_emission_policy`` only accepts a single DAG, operator, or :class:`XComArg` — there is no equivalent of "apply globally to every DAG in the deployment." Deployment-wide changes are an admin concern and belong in the ``emission_policy`` Airflow configuration with an empty ``"scope": {}``. Passing any other object type raises :class:`TypeError`. :param obj: An Airflow Dag, operator, or XComArg. :param emit: Enable/disable all OpenLineage events (shorthand for both scopes). :param emit_task_events: Enable/disable task-level events only; takes precedence over ``emit`` for task events. :param emit_dag_events: Enable/disable Dag-run-level events only; takes precedence over ``emit`` for Dag events. Ignored (with a warning) when called on a task. :param extract_operator_metadata: Whether to run operator-specific extractor-based metadata collection. :param include_source_code: Whether to include operator source code in Python/Bash operator events. :param hook_lineage: Whether to use ``HookLineageCollector`` as a fallback. :param include_full_task_info: Whether to include the full serialized operator state. :return: The same *obj* — allows use as a decorator or in chained calls. """ if isinstance(obj, XComArg): extend_global_openlineage_emission_policy( obj.operator, # type: ignore[arg-type] emit=emit, emit_task_events=emit_task_events, emit_dag_events=emit_dag_events, extract_operator_metadata=extract_operator_metadata, include_source_code=include_source_code, hook_lineage=hook_lineage, include_full_task_info=include_full_task_info, ) return obj # Type guard: only DAGs and task/operator objects are supported here. # There is no global authoring scope — see the function docstring and # the 'emission_policy' Airflow configuration for deployment-wide changes. if not isinstance(obj, DAG) and not hasattr(obj, "task_id"): raise TypeError( "extend_global_openlineage_emission_policy() must be called on a DAG, an Operator, " f"or an XComArg; got {type(obj).__name__!s}. There is no global authoring scope — " "for deployment-wide changes, use the [openlineage] 'emission_policy' Airflow " "configuration with a global rule ('scope': {}) instead." ) provided: dict[str, bool] = { k: v for k, v in { EMIT: emit, EMIT_TASK_EVENTS: emit_task_events, EMIT_DAG_EVENTS: emit_dag_events, EXTRACT_OPERATOR_METADATA: extract_operator_metadata, INCLUDE_SOURCE_CODE: include_source_code, HOOK_LINEAGE: hook_lineage, INCLUDE_FULL_TASK_INFO: include_full_task_info, }.items() if v is not None } if not provided: log.warning( "OpenLineage extend_global_openlineage_emission_policy(): no emission-control flags were " "provided for %r — the call has no effect. Pass at least one flag (e.g. emit=False) to " "store an override.", getattr(obj, "task_id", None) or getattr(obj, "dag_id", None) or repr(obj), ) return obj if isinstance(obj, DAG): dag_flags = {k: v for k, v in provided.items() if k in _DAG_FLAG_KEYS} if dag_flags: _merge_param(obj, OL_EMISSION_POLICY_PARAM, dag_flags) task_flags = {k: v for k, v in provided.items() if k in _TASK_FLAG_KEYS} if task_flags: for task in obj.task_dict.values(): _merge_param(task, OL_EMISSION_POLICY_PARAM, task_flags) else: # Task / operator call if EMIT_DAG_EVENTS in provided: log.warning( "OpenLineage extend_global_openlineage_emission_policy(): 'emit_dag_events' has no effect " "on a task (task_id=%r) — set it on the Dag instead.", getattr(obj, "task_id", repr(obj)), ) task_flags = {k: v for k, v in provided.items() if k in _TASK_FLAG_KEYS} if task_flags: _merge_param(obj, OL_EMISSION_POLICY_PARAM, task_flags) return obj

Was this entry helpful?