Airflow Summit 2025 is coming October 07-09. Register now to secure your spot!

Source code for airflow.providers.fab.auth_manager.cli_commands.permissions_command

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Permissions cleanup command."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.strings import to_boolean

if TYPE_CHECKING:
    from sqlalchemy.orm import Session

[docs] log = logging.getLogger(__name__)
@provide_session
[docs] def cleanup_dag_permissions(dag_id: str, session: Session = NEW_SESSION) -> None: """ Clean up DAG-specific permissions from Flask-AppBuilder tables. When a DAG is deleted, we need to clean up the corresponding permissions to prevent orphaned entries in the ab_view_menu table. This addresses issue #50905: Deleted DAGs not removed from ab_view_menu table and show up in permissions. :param dag_id: Specific DAG ID to clean up. :param session: Database session. """ from sqlalchemy import delete, select from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role from airflow.security.permissions import RESOURCE_DAG_PREFIX, RESOURCE_DAG_RUN, RESOURCE_DETAILS_MAP # Clean up specific DAG permissions dag_resources = session.scalars( select(Resource).filter( Resource.name.in_( [ f"{RESOURCE_DAG_PREFIX}{dag_id}", # DAG:dag_id f"{RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]['prefix']}{dag_id}", # DAG_RUN:dag_id ] ) ) ).all() log.info("Cleaning up DAG-specific permissions for dag_id: %s", dag_id) if not dag_resources: return dag_resource_ids = [resource.id for resource in dag_resources] # Find all permissions associated with these resources dag_permissions = session.scalars( select(Permission).filter(Permission.resource_id.in_(dag_resource_ids)) ).all() if not dag_permissions: # Delete resources even if no permissions exist session.execute(delete(Resource).where(Resource.id.in_(dag_resource_ids))) return dag_permission_ids = [permission.id for permission in dag_permissions] # Delete permission-role associations first (foreign key constraint) session.execute( delete(assoc_permission_role).where( assoc_permission_role.c.permission_view_id.in_(dag_permission_ids) ) ) # Delete permissions session.execute(delete(Permission).where(Permission.resource_id.in_(dag_resource_ids))) # Delete resources (ab_view_menu entries) session.execute(delete(Resource).where(Resource.id.in_(dag_resource_ids))) log.info("Cleaned up %d DAG-specific permissions", len(dag_permissions))
@cli_utils.action_cli @providers_configuration_loaded
[docs] def permissions_cleanup(args): """Clean up DAG permissions in Flask-AppBuilder tables.""" from sqlalchemy import select from airflow.models import DagModel from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder from airflow.providers.fab.auth_manager.models import Resource from airflow.security.permissions import ( RESOURCE_DAG_PREFIX, RESOURCE_DAG_RUN, RESOURCE_DETAILS_MAP, ) from airflow.utils.session import create_session with get_application_builder() as _: with create_session() as session: # Get all existing DAG IDs from DagModel existing_dag_ids = {dag.dag_id for dag in session.scalars(select(DagModel)).all()} # Get all DAG-related resources from FAB tables dag_resources = session.scalars( select(Resource).filter( Resource.name.like(f"{RESOURCE_DAG_PREFIX}%") | Resource.name.like(f"{RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]['prefix']}%") ) ).all() orphaned_resources = [] orphaned_dag_ids = set() for resource in dag_resources: # Extract DAG ID from resource name dag_id = None if resource.name.startswith(RESOURCE_DAG_PREFIX): dag_id = resource.name[len(RESOURCE_DAG_PREFIX) :] elif resource.name.startswith(RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]["prefix"]): dag_id = resource.name[len(RESOURCE_DETAILS_MAP[RESOURCE_DAG_RUN]["prefix"]) :] # Check if this DAG ID still exists if dag_id and dag_id not in existing_dag_ids: orphaned_resources.append(resource) orphaned_dag_ids.add(dag_id) # Filter by specific DAG ID if provided if args.dag_id: if args.dag_id in orphaned_dag_ids: orphaned_dag_ids = {args.dag_id} print(f"Filtering to clean up permissions for DAG: {args.dag_id}") else: print( f"DAG '{args.dag_id}' not found in orphaned permissions or still exists in database." ) return if not orphaned_dag_ids: if args.dag_id: print(f"No orphaned permissions found for DAG: {args.dag_id}") else: print("No orphaned DAG permissions found.") return print(f"Found orphaned permissions for {len(orphaned_dag_ids)} deleted DAG(s):") for dag_id in sorted(orphaned_dag_ids): print(f" - {dag_id}") if args.dry_run: print("\nDry run mode: No changes will be made.") print(f"Would clean up permissions for {len(orphaned_dag_ids)} orphaned DAG(s).") return # Perform cleanup if not in dry run mode if not args.yes: action = ( f"clean up permissions for {len(orphaned_dag_ids)} DAG(s)" if not args.dag_id else f"clean up permissions for DAG '{args.dag_id}'" ) confirm = input(f"\nDo you want to {action}? [y/N]: ") if not to_boolean(confirm): print("Cleanup cancelled.") return # Perform the actual cleanup cleanup_count = 0 for dag_id in orphaned_dag_ids: try: cleanup_dag_permissions(dag_id, session) cleanup_count += 1 if args.verbose: print(f"Cleaned up permissions for DAG: {dag_id}") except Exception as e: print(f"Failed to clean up permissions for DAG {dag_id}: {e}") print(f"\nSuccessfully cleaned up permissions for {cleanup_count} DAG(s).")

Was this entry helpful?