Source code for structure

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Private service for dag structure.

:meta private:
"""

from __future__ import annotations

from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny, BaseAsset


[docs]def get_upstream_assets( asset_condition: BaseAsset, entry_node_ref: str, level=0 ) -> tuple[list[dict], list[dict]]: edges: list[dict] = [] nodes: list[dict] = [] asset_condition_type: str | None = None assets: list[Asset | AssetAlias] = [] nested_expression: AssetAll | AssetAny | None = None if isinstance(asset_condition, AssetAny): asset_condition_type = "or-gate" elif isinstance(asset_condition, AssetAll): asset_condition_type = "and-gate" if hasattr(asset_condition, "objects"): for obj in asset_condition.objects: if isinstance(obj, (AssetAll, AssetAny)): nested_expression = obj elif isinstance(obj, (Asset, AssetAlias)): assets.append(obj) else: raise TypeError(f"Unsupported type: {type(obj)}") if asset_condition_type and assets: asset_condition_id = f"{asset_condition_type}-{level}" edges.append( { "source_id": asset_condition_id, "target_id": entry_node_ref, "is_source_asset": level == 0, } ) nodes.append( { "id": asset_condition_id, "label": asset_condition_id, "type": "asset-condition", "asset_condition_type": asset_condition_type, } ) for asset in assets: edges.append( { "source_id": asset.name, "target_id": asset_condition_id, } ) nodes.append( { "id": asset.name, "label": asset.name, "type": "asset-alias" if isinstance(asset, AssetAlias) else "asset", } ) if nested_expression is not None: n, e = get_upstream_assets(nested_expression, asset_condition_id, level=level + 1) nodes = nodes + n edges = edges + e return nodes, edges

Was this entry helpful?