Airflow Summit 2025 is coming October 07-09. Register now for early bird ticket!

Source code for airflow.providers.keycloak.auth_manager.cli.commands

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import json
import logging
from typing import get_args

from keycloak import KeycloakAdmin, KeycloakError

from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod
from airflow.api_fastapi.common.types import MenuItem
from airflow.configuration import conf
from airflow.providers.keycloak.auth_manager.constants import (
    CONF_CLIENT_ID_KEY,
    CONF_REALM_KEY,
    CONF_SECTION_NAME,
    CONF_SERVER_URL_KEY,
)
from airflow.providers.keycloak.auth_manager.resources import KeycloakResource
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

[docs] log = logging.getLogger(__name__)
@cli_utils.action_cli @providers_configuration_loaded
[docs] def create_scopes_command(args): """Create Keycloak auth manager scopes in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_scopes(client, client_uuid)
@cli_utils.action_cli @providers_configuration_loaded
[docs] def create_resources_command(args): """Create Keycloak auth manager resources in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_resources(client, client_uuid)
@cli_utils.action_cli @providers_configuration_loaded
[docs] def create_permissions_command(args): """Create Keycloak auth manager permissions in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_permissions(client, client_uuid)
@cli_utils.action_cli @providers_configuration_loaded
[docs] def create_all_command(args): """Create all Keycloak auth manager entities in Keycloak.""" client = _get_client(args) client_uuid = _get_client_uuid(args) _create_scopes(client, client_uuid) _create_resources(client, client_uuid) _create_permissions(client, client_uuid)
def _get_client(args): server_url = conf.get(CONF_SECTION_NAME, CONF_SERVER_URL_KEY) realm = conf.get(CONF_SECTION_NAME, CONF_REALM_KEY) return KeycloakAdmin( server_url=server_url, username=args.username, password=args.password, realm_name=realm, user_realm_name=args.user_realm, client_id=args.client_id, verify=True, ) def _get_client_uuid(args): client = _get_client(args) clients = client.get_clients() client_id = conf.get(CONF_SECTION_NAME, CONF_CLIENT_ID_KEY) matches = [client for client in clients if client["clientId"] == client_id] if not matches: raise ValueError(f"Client with ID='{client_id}' not found in realm '{client.realm_name}'") return matches[0]["id"] def _create_scopes(client: KeycloakAdmin, client_uuid: str): scopes = [{"name": method} for method in get_args(ResourceMethod)] for scope in scopes: client.create_client_authz_scopes(client_id=client_uuid, payload=scope) print("Scopes created successfully.") def _create_resources(client: KeycloakAdmin, client_uuid: str): all_scopes = client.get_client_authz_scopes(client_uuid) scopes = [ {"id": scope["id"], "name": scope["name"]} for scope in all_scopes if scope["name"] in ["GET", "POST", "PUT", "DELETE"] ] for resource in KeycloakResource: client.create_client_authz_resource( client_id=client_uuid, payload={ "name": resource.value, "scopes": scopes, }, skip_exists=True, ) # Create menu item resources scopes = [{"id": scope["id"], "name": scope["name"]} for scope in all_scopes if scope["name"] == "MENU"] for item in MenuItem: client.create_client_authz_resource( client_id=client_uuid, payload={ "name": item.value, "scopes": scopes, }, skip_exists=True, ) print("Resources created successfully.") def _create_permissions(client: KeycloakAdmin, client_uuid: str): _create_read_only_permission(client, client_uuid) _create_admin_permission(client, client_uuid) _create_user_permission(client, client_uuid) _create_op_permission(client, client_uuid) print("Permissions created successfully.") def _create_read_only_permission(client: KeycloakAdmin, client_uuid: str): all_scopes = client.get_client_authz_scopes(client_uuid) scopes = [scope["id"] for scope in all_scopes if scope["name"] in ["GET", "MENU"]] payload = { "name": "ReadOnly", "type": "scope", "logic": "POSITIVE", "decisionStrategy": "UNANIMOUS", "scopes": scopes, } _create_permission(client, client_uuid, payload) def _create_admin_permission(client: KeycloakAdmin, client_uuid: str): all_scopes = client.get_client_authz_scopes(client_uuid) scopes = [scope["id"] for scope in all_scopes if scope["name"] in get_args(ResourceMethod)] payload = { "name": "Admin", "type": "scope", "logic": "POSITIVE", "decisionStrategy": "UNANIMOUS", "scopes": scopes, } _create_permission(client, client_uuid, payload) def _create_user_permission(client: KeycloakAdmin, client_uuid: str): _create_resource_based_permission( client, client_uuid, "User", [KeycloakResource.DAG.value, KeycloakResource.ASSET.value] ) def _create_op_permission(client: KeycloakAdmin, client_uuid: str): _create_resource_based_permission( client, client_uuid, "Op", [ KeycloakResource.CONNECTION.value, KeycloakResource.POOL.value, KeycloakResource.VARIABLE.value, KeycloakResource.BACKFILL.value, ], ) def _create_permission(client: KeycloakAdmin, client_uuid: str, payload: dict): try: client.create_client_authz_scope_permission( client_id=client_uuid, payload=payload, ) except KeycloakError as e: if e.response_body: error = json.loads(e.response_body.decode("utf-8")) if error.get("error_description") == "Conflicting policy": print(f"Policy creation skipped. {error.get('error')}") def _create_resource_based_permission( client: KeycloakAdmin, client_uuid: str, name: str, allowed_resources: list[str] ): all_resources = client.get_client_authz_resources(client_uuid) resources = [resource["_id"] for resource in all_resources if resource["name"] in allowed_resources] payload = { "name": name, "type": "scope", "logic": "POSITIVE", "decisionStrategy": "UNANIMOUS", "resources": resources, } client.create_client_authz_resource_based_permission( client_id=client_uuid, payload=payload, skip_exists=True, )

Was this entry helpful?