Source code for airflow.providers.common.compat.lineage.hook

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.providers.common.compat.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS


def _get_asset_compat_hook_lineage_collector():
    from airflow.lineage.hook import get_hook_lineage_collector

    collector = get_hook_lineage_collector()

    if all(
        getattr(collector, asset_method_name, None)
        for asset_method_name in ("add_input_asset", "add_output_asset", "collected_assets")
    ):
        return collector

    # dataset is renamed as asset in Airflow 3.0

    from functools import wraps

    from airflow.lineage.hook import DatasetLineageInfo, HookLineage

    DatasetLineageInfo.asset = DatasetLineageInfo.dataset

    def rename_asset_kwargs_to_dataset_kwargs(function):
        @wraps(function)
        def wrapper(*args, **kwargs):
            if "asset_kwargs" in kwargs:
                kwargs["dataset_kwargs"] = kwargs.pop("asset_kwargs")

            if "asset_extra" in kwargs:
                kwargs["dataset_extra"] = kwargs.pop("asset_extra")

            return function(*args, **kwargs)

        return wrapper

    collector.create_asset = rename_asset_kwargs_to_dataset_kwargs(collector.create_dataset)
    collector.add_input_asset = rename_asset_kwargs_to_dataset_kwargs(collector.add_input_dataset)
    collector.add_output_asset = rename_asset_kwargs_to_dataset_kwargs(collector.add_output_dataset)

    def collected_assets_compat(collector) -> HookLineage:
        """Get the collected hook lineage information."""
        lineage = collector.collected_datasets
        return HookLineage(
            [
                DatasetLineageInfo(dataset=item.dataset, count=item.count, context=item.context)
                for item in lineage.inputs
            ],
            [
                DatasetLineageInfo(dataset=item.dataset, count=item.count, context=item.context)
                for item in lineage.outputs
            ],
        )

    setattr(
        collector.__class__,
        "collected_assets",
        property(lambda collector: collected_assets_compat(collector)),
    )

    return collector


[docs]def get_hook_lineage_collector(): # Dataset has been renamed as Asset in 3.0 if AIRFLOW_V_3_0_PLUS: from airflow.lineage.hook import get_hook_lineage_collector return get_hook_lineage_collector() # HookLineageCollector added in 2.10 if AIRFLOW_V_2_10_PLUS: return _get_asset_compat_hook_lineage_collector() # For the case that airflow has not yet upgraded to 2.10 or higher, # but using the providers that already uses `get_hook_lineage_collector` class NoOpCollector: """ NoOpCollector is a hook lineage collector that does nothing. It is used when you want to disable lineage collection. """ # for providers that support asset rename def add_input_asset(self, *_, **__): pass def add_output_asset(self, *_, **__): pass # for providers that do not support asset rename def add_input_dataset(self, *_, **__): pass def add_output_dataset(self, *_, **__): pass return NoOpCollector()

Was this entry helpful?