Source code for airflow.providers.fab.www.security_manager

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Callable

from flask import g
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

from import get_auth_manager
from airflow.providers.fab.www.utils import CustomSQLAInterface, get_method_from_fab_action_map
from airflow.utils.log.logging_mixin import LoggingMixin

[docs] EXISTING_ROLES = { "Admin", "Viewer", "User", "Op", "Public", }
[docs] class AirflowSecurityManagerV2(LoggingMixin): """ Minimal security manager needed to run a Flask application. This one is used to run the Flask application needed to run Airflow 2 plugins unless Fab auth manager is configured in the environment. In that case, ``FabAirflowSecurityManagerOverride`` is used. """ def __init__(self, appbuilder) -> None: super().__init__()
[docs] self.appbuilder = appbuilder
# Setup Flask-Limiter
[docs] self.limiter = self.create_limiter()
# Go and fix up the SQLAInterface used from the stock one to our subclass. # This is needed to support the "hack" where we had to edit # FieldConverter.conversion_table in place in utils for attr in dir(self): if attr.endswith("view"): view = getattr(self, attr, None) if view and getattr(view, "datamodel", None): view.datamodel = CustomSQLAInterface(view.datamodel.obj) @staticmethod
[docs] def before_request(): """Run hook before request.""" g.user = get_auth_manager().get_user()
[docs] def create_limiter(self) -> Limiter: app = self.appbuilder.get_app limiter = Limiter(key_func=app.config.get("RATELIMIT_KEY_FUNC", get_remote_address)) limiter.init_app(app) return limiter
[docs] def has_access( self, action_name: str, resource_name: str, user=None, resource_pk: str | None = None ) -> bool: """ Verify whether a given user could perform a certain action on the given resource. Example actions might include can_read, can_write, can_delete, etc. This function is called by FAB when accessing a view. See :param action_name: action_name on resource (e.g can_read, can_edit). :param resource_name: name of view-menu or resource. :param user: user :param resource_pk: the resource primary key (e.g. the connection ID) :return: Whether user could perform certain action on the resource. """ if not user: user = g.user is_authorized_method = self._get_auth_manager_is_authorized_method(resource_name) return is_authorized_method(action_name, resource_pk, user)
[docs] def add_limit_view(self, baseview): if not baseview.limits: return for limit in baseview.limits: self.limiter.limit( limit_value=limit.limit_value, key_func=limit.key_func, per_method=limit.per_method, methods=limit.methods, error_message=limit.error_message, exempt_when=limit.exempt_when, override_defaults=limit.override_defaults, deduct_when=limit.deduct_when, on_breach=limit.on_breach, cost=limit.cost, )(baseview.blueprint)
def _get_auth_manager_is_authorized_method(self, fab_resource_name: str) -> Callable: # The user is trying to access a page specific to the auth manager # (e.g. the user list view in FabAuthManager) or a page defined in a plugin return lambda action, resource_pk, user: get_auth_manager().is_authorized_custom_view( method=get_method_from_fab_action_map().get(action, action), resource_name=fab_resource_name, user=user, )

Was this entry helpful?