Airflow Summit 2025 is coming October 07-09. Register now to secure your spot!

Source code for airflow.providers.fab.auth_manager.models

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import datetime

# This product contains a modified portion of 'Flask App Builder' developed by Daniel Vaz Gaspar.
# (https://github.com/dpgaspar/Flask-AppBuilder).
# Copyright 2013, Daniel Vaz Gaspar
from typing import TYPE_CHECKING

from flask import current_app, g
from flask_appbuilder import Model
from sqlalchemy import (
    Boolean,
    Column,
    DateTime,
    ForeignKey,
    Index,
    Integer,
    Sequence,
    String,
    Table,
    UniqueConstraint,
    event,
    func,
    select,
)
from sqlalchemy.orm import Mapped, backref, declared_attr, relationship

from airflow.api_fastapi.auth.managers.models.base_user import BaseUser

try:
    from sqlalchemy.orm import mapped_column
except ImportError:
    # fallback for SQLAlchemy < 2.0
[docs] def mapped_column(*args, **kwargs): from sqlalchemy import Column return Column(*args, **kwargs)
if TYPE_CHECKING: try: from sqlalchemy import Identity except Exception:
[docs] Identity = None
""" Compatibility note: The models in this file are duplicated from Flask AppBuilder. """
[docs] assoc_group_role = Table( "ab_group_role", Model.metadata, Column( "id", Integer, Sequence("ab_group_role_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, ), Column("group_id", Integer, ForeignKey("ab_group.id", ondelete="CASCADE")), Column("role_id", Integer, ForeignKey("ab_role.id", ondelete="CASCADE")), UniqueConstraint("group_id", "role_id"), Index("idx_group_id", "group_id"), Index("idx_group_role_id", "role_id"), )
[docs] assoc_permission_role = Table( "ab_permission_view_role", Model.metadata, Column( "id", Integer, Sequence( "ab_permission_view_role_id_seq", start=1, increment=1, minvalue=1, cycle=False, ), primary_key=True, ), Column( "permission_view_id", Integer, ForeignKey("ab_permission_view.id", ondelete="CASCADE"), ), Column("role_id", Integer, ForeignKey("ab_role.id", ondelete="CASCADE")), UniqueConstraint("permission_view_id", "role_id"), Index("idx_permission_view_id", "permission_view_id"), Index("idx_role_id", "role_id"), )
[docs] assoc_user_role = Table( "ab_user_role", Model.metadata, Column( "id", Integer, Sequence("ab_user_role_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, ), Column("user_id", Integer, ForeignKey("ab_user.id", ondelete="CASCADE")), Column("role_id", Integer, ForeignKey("ab_role.id", ondelete="CASCADE")), UniqueConstraint("user_id", "role_id"), )
[docs] assoc_user_group = Table( "ab_user_group", Model.metadata, Column( "id", Integer, Sequence("ab_user_group_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, ), Column("user_id", Integer, ForeignKey("ab_user.id", ondelete="CASCADE")), Column("group_id", Integer, ForeignKey("ab_group.id", ondelete="CASCADE")), UniqueConstraint("user_id", "group_id"), )
[docs] class Action(Model): """Represents permission actions such as `can_read`."""
[docs] __tablename__ = "ab_permission"
[docs] id: Mapped[int] = mapped_column( Integer, Sequence("ab_permission_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, )
[docs] name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
[docs] def __repr__(self): return self.name
[docs] class Resource(Model): """Represents permission object such as `User` or `Dag`."""
[docs] __tablename__ = "ab_view_menu"
[docs] id: Mapped[int] = mapped_column( Integer, Sequence("ab_view_menu_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, )
[docs] name: Mapped[str] = mapped_column(String(250), unique=True, nullable=False)
[docs] def __eq__(self, other): return (isinstance(other, self.__class__)) and (self.name == other.name)
[docs] def __neq__(self, other): return self.name != other.name
[docs] def __repr__(self): return self.name
[docs] class Role(Model): """Represents a user role to which permissions can be assigned."""
[docs] __tablename__ = "ab_role"
[docs] id: Mapped[int] = mapped_column( Integer, Sequence("ab_role_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, )
[docs] name: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
[docs] permissions: Mapped[list[Permission]] = relationship( "Permission", secondary=assoc_permission_role, backref="role", lazy="joined", passive_deletes=True, )
[docs] def __repr__(self): return self.name
[docs] class Permission(Model): """Permission pair comprised of an Action + Resource combo."""
[docs] __tablename__ = "ab_permission_view"
[docs] __table_args__ = (UniqueConstraint("permission_id", "view_menu_id"),)
[docs] id: Mapped[int] = mapped_column( Integer, Sequence("ab_permission_view_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, )
[docs] action_id: Mapped[int] = mapped_column("permission_id", Integer, ForeignKey("ab_permission.id"))
[docs] action: Mapped[Action] = relationship("Action", lazy="joined", uselist=False)
[docs] resource_id: Mapped[int] = mapped_column("view_menu_id", Integer, ForeignKey("ab_view_menu.id"))
[docs] resource: Mapped[Resource] = relationship("Resource", lazy="joined", uselist=False)
[docs] def __repr__(self): return str(self.action).replace("_", " ") + f" on {str(self.resource)}"
[docs] class Group(Model): """Represents an Airflow user group."""
[docs] __tablename__ = "ab_group"
[docs] id: Mapped[int] = mapped_column( Integer, Sequence("ab_group_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, )
[docs] name: Mapped[str] = Column(String(100), unique=True, nullable=False)
[docs] label: Mapped[str] = Column(String(150))
[docs] description: Mapped[str] = Column(String(512))
[docs] users: Mapped[list[User]] = relationship( "User", secondary=assoc_user_group, backref="groups", passive_deletes=True )
[docs] roles: Mapped[list[Role]] = relationship( "Role", secondary=assoc_group_role, backref="groups", passive_deletes=True )
[docs] def __repr__(self): return self.name
[docs] class User(Model, BaseUser): """Represents an Airflow user which has roles assigned to it."""
[docs] __tablename__ = "ab_user"
[docs] id: Mapped[int] = mapped_column( Integer, Sequence("ab_user_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, )
[docs] first_name: Mapped[str] = mapped_column(String(64), nullable=False)
[docs] last_name: Mapped[str] = mapped_column(String(64), nullable=False)
[docs] username: Mapped[str] = mapped_column( String(512).with_variant(String(512, collation="NOCASE"), "sqlite"), unique=True, nullable=False )
[docs] password: Mapped[str | None] = mapped_column(String(256))
[docs] active: Mapped[bool | None] = mapped_column(Boolean, default=True)
[docs] email: Mapped[str] = mapped_column(String(320), unique=True, nullable=False)
[docs] last_login: Mapped[datetime.datetime | None] = mapped_column(DateTime, nullable=True)
[docs] login_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
[docs] fail_login_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
[docs] roles: Mapped[list[Role]] = relationship( "Role", secondary=assoc_user_role, backref="user", lazy="selectin", passive_deletes=True, )
[docs] created_on: Mapped[datetime.datetime | None] = mapped_column( DateTime, default=lambda: datetime.datetime.now(), nullable=True )
[docs] changed_on: Mapped[datetime.datetime | None] = mapped_column( DateTime, default=lambda: datetime.datetime.now(), nullable=True )
@declared_attr
[docs] def created_by_fk(self) -> Column: return Column(Integer, ForeignKey("ab_user.id"), default=self.get_user_id, nullable=True)
@declared_attr
[docs] def changed_by_fk(self) -> Column: return Column(Integer, ForeignKey("ab_user.id"), default=self.get_user_id, nullable=True)
[docs] created_by: Mapped[User] = relationship( "User", backref=backref("created", uselist=True), remote_side=[id], primaryjoin="User.created_by_fk == User.id", uselist=False, )
[docs] changed_by: Mapped[User] = relationship( "User", backref=backref("changed", uselist=True), remote_side=[id], primaryjoin="User.changed_by_fk == User.id", uselist=False, )
@classmethod
[docs] def get_user_id(cls): try: return g.user.get_id() except Exception: return None
@property
[docs] def is_authenticated(self): return True
@property
[docs] def is_active(self): return self.active
@property
[docs] def is_anonymous(self): return False
@property
[docs] def perms(self): if not self._perms: # Using the ORM here is _slow_ (Creating lots of objects to then throw them away) since this is in # the path for every request. Avoid it if we can! if current_app: sm = current_app.appbuilder.sm self._perms: set[tuple[str, str]] = set( sm.session.execute( select(sm.action_model.name, sm.resource_model.name) .join(sm.permission_model.action) .join(sm.permission_model.resource) .join(sm.permission_model.role) .where(sm.role_model.user.contains(self)) ) ) else: self._perms = { (perm.action.name, perm.resource.name) for role in self.roles for perm in role.permissions } return self._perms
[docs] def get_id(self) -> str: return str(self.id)
[docs] def get_name(self) -> str: return self.username or self.email or self.user_id
[docs] def get_full_name(self): return f"{self.first_name} {self.last_name}"
[docs] def __repr__(self): return self.get_full_name()
_perms = None
[docs] class RegisterUser(Model): """Represents a user registration."""
[docs] __tablename__ = "ab_register_user"
[docs] id = mapped_column( Integer, Sequence("ab_register_user_id_seq", start=1, increment=1, minvalue=1, cycle=False), primary_key=True, )
[docs] first_name: Mapped[str] = mapped_column(String(64), nullable=False)
[docs] last_name: Mapped[str] = mapped_column(String(64), nullable=False)
[docs] username: Mapped[str] = mapped_column( String(512).with_variant(String(512, collation="NOCASE"), "sqlite"), unique=True, nullable=False )
[docs] password: Mapped[str | None] = mapped_column(String(256))
[docs] email: Mapped[str] = mapped_column(String(320), unique=True, nullable=False)
[docs] registration_date: Mapped[datetime.datetime | None] = mapped_column( DateTime, default=lambda: datetime.datetime.now(), nullable=True )
[docs] registration_hash: Mapped[str | None] = mapped_column(String(256))
@event.listens_for(User.__table__, "before_create")
[docs] def add_index_on_ab_user_username_postgres(table, conn, **kw): if conn.dialect.name != "postgresql": return index_name = "idx_ab_user_username" if not any(table_index.name == index_name for table_index in table.indexes): table.indexes.add(Index(index_name, func.lower(table.c.username), unique=True))
@event.listens_for(RegisterUser.__table__, "before_create")
[docs] def add_index_on_ab_register_user_username_postgres(table, conn, **kw): if conn.dialect.name != "postgresql": return index_name = "idx_ab_register_user_username" if not any(table_index.name == index_name for table_index in table.indexes): table.indexes.add(Index(index_name, func.lower(table.c.username), unique=True))

Was this entry helpful?