Source code for airflow.providers.keycloak.auth_manager.cli.commands

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import json
import logging
from typing import get_args

from keycloak import KeycloakAdmin, KeycloakError

from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod

try:
    from airflow.api_fastapi.auth.managers.base_auth_manager import ExtendedResourceMethod
except ImportError:
    from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod as ExtendedResourceMethod
from airflow.api_fastapi.common.types import MenuItem
from airflow.configuration import conf
from airflow.providers.keycloak.auth_manager.cli.utils import dry_run_message_wrap, dry_run_preview
from airflow.providers.keycloak.auth_manager.constants import (
    CONF_CLIENT_ID_KEY,
    CONF_REALM_KEY,
    CONF_SECTION_NAME,
    CONF_SERVER_URL_KEY,
)
from airflow.providers.keycloak.auth_manager.resources import KeycloakResource
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

[docs] log = logging.getLogger(__name__)
@cli_utils.action_cli @providers_configuration_loaded @dry_run_message_wrap
[docs] def create_scopes_command(args): """Create Keycloak auth manager scopes in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_scopes(client, client_uuid, _dry_run=args.dry_run)
@cli_utils.action_cli @providers_configuration_loaded @dry_run_message_wrap
[docs] def create_resources_command(args): """Create Keycloak auth manager resources in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_resources(client, client_uuid, _dry_run=args.dry_run)
@cli_utils.action_cli @providers_configuration_loaded @dry_run_message_wrap
[docs] def create_permissions_command(args): """Create Keycloak auth manager permissions in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_permissions(client, client_uuid, _dry_run=args.dry_run)
@cli_utils.action_cli @providers_configuration_loaded @dry_run_message_wrap
[docs] def create_all_command(args): """Create all Keycloak auth manager entities in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_scopes(client, client_uuid, _dry_run=args.dry_run) _create_resources(client, client_uuid, _dry_run=args.dry_run) _create_permissions(client, client_uuid, _dry_run=args.dry_run)
def _get_client(args): server_url = conf.get(CONF_SECTION_NAME, CONF_SERVER_URL_KEY) realm = conf.get(CONF_SECTION_NAME, CONF_REALM_KEY) return KeycloakAdmin( server_url=server_url, username=args.username, password=args.password, realm_name=realm, user_realm_name=args.user_realm, client_id=args.client_id, verify=True, ) def _get_client_uuid(args): client = _get_client(args) clients = client.get_clients() client_id = conf.get(CONF_SECTION_NAME, CONF_CLIENT_ID_KEY) matches = [client for client in clients if client["clientId"] == client_id] if not matches: raise ValueError(f"Client with ID='{client_id}' not found in realm '{client.realm_name}'") return matches[0]["id"] def _get_scopes_to_create() -> list[dict]: """Get the list of scopes to be created.""" scopes = [{"name": method} for method in get_args(ResourceMethod)] scopes.extend([{"name": "MENU"}, {"name": "LIST"}]) return scopes def _preview_scopes(*args, **kwargs): """Preview scopes that would be created.""" scopes = _get_scopes_to_create() print("Scopes to be created:") for scope in scopes: print(f" - {scope['name']}") print() @dry_run_preview(_preview_scopes) def _create_scopes(client: KeycloakAdmin, client_uuid: str, *, _dry_run: bool = False): """Create Keycloak scopes.""" scopes = _get_scopes_to_create() for scope in scopes: client.create_client_authz_scopes(client_id=client_uuid, payload=scope) print("Scopes created successfully.") def _get_resources_to_create( client: KeycloakAdmin, client_uuid: str, ) -> tuple[list[tuple[str, list[dict]]], list[tuple[str, list[dict]]]]: """ Get the list of resources to be created. Returns a tuple of (standard_resources, menu_resources). Each is a list of tuples (resource_name, scopes_list). """ all_scopes = client.get_client_authz_scopes(client_uuid) scopes = [ {"id": scope["id"], "name": scope["name"]} for scope in all_scopes if scope["name"] in ["GET", "POST", "PUT", "DELETE", "LIST"] ] menu_scopes = [ {"id": scope["id"], "name": scope["name"]} for scope in all_scopes if scope["name"] == "MENU" ] standard_resources = [(resource.value, scopes) for resource in KeycloakResource] menu_resources = [(item.value, menu_scopes) for item in MenuItem] return standard_resources, menu_resources def _preview_resources(client: KeycloakAdmin, client_uuid: str): """Preview resources that would be created.""" standard_resources, menu_resources = _get_resources_to_create(client, client_uuid) print("Resources to be created:") if standard_resources: for resource_name, resource_scopes in standard_resources: actual_scope_names = ", ".join([s["name"] for s in resource_scopes]) print(f" - {resource_name} (scopes: {actual_scope_names})") print("\nMenu item resources to be created:") for resource_name, resource_scopes in menu_resources: actual_scope_names = ", ".join([s["name"] for s in resource_scopes]) print(f" - {resource_name} (scopes: {actual_scope_names})") print() @dry_run_preview(_preview_resources) def _create_resources(client: KeycloakAdmin, client_uuid: str, *, _dry_run: bool = False): """Create Keycloak resources.""" standard_resources, menu_resources = _get_resources_to_create(client, client_uuid) for resource_name, scopes in standard_resources: client.create_client_authz_resource( client_id=client_uuid, payload={ "name": resource_name, "scopes": scopes, }, skip_exists=True, ) # Create menu item resources for resource_name, scopes in menu_resources: client.create_client_authz_resource( client_id=client_uuid, payload={ "name": resource_name, "scopes": scopes, }, skip_exists=True, ) print("Resources created successfully.") def _get_permissions_to_create(client: KeycloakAdmin, client_uuid: str) -> list[dict]: """ Get the actual permissions to be created with filtered scopes/resources. Returns a list of permission descriptors with actual filtered data. """ perm_configs = [ { "name": "ReadOnly", "type": "scope-based", "scope_names": ["GET", "MENU", "LIST"], }, { "name": "Admin", "type": "scope-based", "scope_names": list(get_args(ExtendedResourceMethod)) + ["LIST"], }, { "name": "User", "type": "resource-based", "resources": [KeycloakResource.DAG.value, KeycloakResource.ASSET.value], }, { "name": "Op", "type": "resource-based", "resources": [ KeycloakResource.CONNECTION.value, KeycloakResource.POOL.value, KeycloakResource.VARIABLE.value, KeycloakResource.BACKFILL.value, ], }, ] all_scopes = client.get_client_authz_scopes(client_uuid) all_resources = client.get_client_authz_resources(client_uuid) result = [] for config in perm_configs: perm = {"name": config["name"], "type": config["type"]} if config["type"] == "scope-based": # Filter to get actual scope IDs that exist and match filtered_scope_ids = [s["id"] for s in all_scopes if s["name"] in config["scope_names"]] filtered_scope_names = [s["name"] for s in all_scopes if s["name"] in config["scope_names"]] perm["scope_ids"] = filtered_scope_ids perm["scope_names"] = filtered_scope_names else: # resource-based # Filter to get actual resource IDs that exist and match filtered_resource_ids = [r["_id"] for r in all_resources if r["name"] in config["resources"]] filtered_resource_names = [r["name"] for r in all_resources if r["name"] in config["resources"]] perm["resource_ids"] = filtered_resource_ids perm["resource_names"] = filtered_resource_names result.append(perm) return result def _preview_permissions(client: KeycloakAdmin, client_uuid: str): """Preview permissions that would be created.""" permissions = _get_permissions_to_create(client, client_uuid) print("Permissions to be created:") for perm in permissions: if perm["type"] == "scope-based": scope_names = ", ".join(perm["scope_names"]) print(f" - {perm['name']} (type: scope-based, scopes: {scope_names})") else: # resource-based resource_names = ", ".join(perm["resource_names"]) print(f" - {perm['name']} (type: resource-based, resources: {resource_names})") print() @dry_run_preview(_preview_permissions) def _create_permissions(client: KeycloakAdmin, client_uuid: str, *, _dry_run: bool = False): """Create Keycloak permissions.""" permissions = _get_permissions_to_create(client, client_uuid) for perm in permissions: if perm["type"] == "scope-based": _create_scope_based_permission(client, client_uuid, perm["name"], perm["scope_ids"]) else: # resource-based _create_resource_based_permission(client, client_uuid, perm["name"], perm["resource_ids"]) print("Permissions created successfully.") def _create_scope_based_permission(client: KeycloakAdmin, client_uuid: str, name: str, scope_ids: list[str]): payload = { "name": name, "type": "scope", "logic": "POSITIVE", "decisionStrategy": "UNANIMOUS", "scopes": scope_ids, } try: client.create_client_authz_scope_permission( client_id=client_uuid, payload=payload, ) except KeycloakError as e: if e.response_body: error = json.loads(e.response_body.decode("utf-8")) if error.get("error_description") == "Conflicting policy": print(f"Policy creation skipped. {error.get('error')}") def _create_resource_based_permission( client: KeycloakAdmin, client_uuid: str, name: str, resource_ids: list[str] ): payload = { "name": name, "type": "scope", "logic": "POSITIVE", "decisionStrategy": "UNANIMOUS", "resources": resource_ids, } client.create_client_authz_resource_based_permission( client_id=client_uuid, payload=payload, skip_exists=True, )

Was this entry helpful?