Files
ss-tools/backend/src/core/auth/repository.py

224 lines
11 KiB
Python

# [DEF:backend.src.core.auth.repository:Module]
#
# @TIER: CRITICAL
# @SEMANTICS: auth, repository, database, user, role, permission
# @PURPOSE: Data access layer for authentication and user preference entities.
# @LAYER: Domain
# @PRE: SQLAlchemy session manager and auth models are available.
# @POST: Provides transactional access to Auth-related database entities.
# @SIDE_EFFECT: Performs database I/O via SQLAlchemy sessions.
# @DATA_CONTRACT: Input[Session] -> Model[User, Role, Permission, UserDashboardPreference]
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.auth]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.profile]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.logger.belief_scope]
# @INVARIANT: All database read/write operations must execute via the injected SQLAlchemy session boundary.
#
# [SECTION: IMPORTS]
from typing import List, Optional
from sqlalchemy.orm import Session, selectinload
from ...models.auth import Permission, Role, User
from ...models.profile import UserDashboardPreference
from ..logger import belief_scope, logger
# [/SECTION]
# [DEF:AuthRepository:Class]
# @TIER: CRITICAL
# @PURPOSE: Encapsulates database operations for authentication-related entities.
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
class AuthRepository:
# [DEF:__init__:Function]
# @TIER: CRITICAL
# @PURPOSE: Bind repository instance to an existing SQLAlchemy session.
# @PRE: db is an initialized sqlalchemy.orm.Session instance.
# @POST: self.db points to the provided session and is used by all repository methods.
# @SIDE_EFFECT: Stores session reference on repository instance state.
# @DATA_CONTRACT: Input[Session] -> Output[None]
def __init__(self, db: Session):
with belief_scope("AuthRepository.__init__"):
if not isinstance(db, Session):
logger.explore("Invalid session provided to AuthRepository", extra={"type": type(db)})
raise TypeError("db must be an instance of sqlalchemy.orm.Session")
logger.reason("Binding AuthRepository to database session")
self.db = db
logger.reflect("AuthRepository initialized")
# [/DEF:__init__:Function]
# [DEF:get_user_by_username:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve a user entity by unique username.
# @PRE: username is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching User entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_username(self, username: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_username"):
if not username or not isinstance(username, str):
raise ValueError("username must be a non-empty string")
logger.reason(f"Querying user by username: {username}")
user = (
self.db.query(User)
.options(selectinload(User.roles).selectinload(Role.permissions))
.filter(User.username == username)
.first()
)
if user:
logger.reflect(f"User found: {username}")
else:
logger.explore(f"User not found: {username}")
return user
# [/DEF:get_user_by_username:Function]
# [DEF:get_user_by_id:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve a user entity by identifier.
# @PRE: user_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching User entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_id(self, user_id: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_id"):
if not user_id or not isinstance(user_id, str):
raise ValueError("user_id must be a non-empty string")
logger.reason(f"Querying user by ID: {user_id}")
user = self.db.query(User).filter(User.id == user_id).first()
if user:
logger.reflect(f"User found by ID: {user_id}")
else:
logger.explore(f"User not found by ID: {user_id}")
return user
# [/DEF:get_user_by_id:Function]
# [DEF:get_role_by_name:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve a role entity by role name.
# @PRE: name is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Role entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Role]]
def get_role_by_name(self, name: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_name"):
return self.db.query(Role).filter(Role.name == name).first()
# [/DEF:get_role_by_name:Function]
# [DEF:update_last_login:Function]
# @TIER: CRITICAL
# @PURPOSE: Update last_login timestamp for the provided user entity.
# @PRE: user is a managed User instance and self.db is a valid open Session.
# @POST: user.last_login is set to current UTC timestamp and transaction is committed.
# @SIDE_EFFECT: Mutates user entity state and commits database transaction.
# @DATA_CONTRACT: Input[User] -> Output[None]
def update_last_login(self, user: User):
with belief_scope("AuthRepository.update_last_login"):
if not isinstance(user, User):
raise TypeError("user must be an instance of User")
from datetime import datetime
logger.reason(f"Updating last login for user: {user.username}")
user.last_login = datetime.utcnow()
self.db.add(user)
self.db.commit()
logger.reflect(f"Last login updated and committed for user: {user.username}")
# [/DEF:update_last_login:Function]
# [DEF:get_role_by_id:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve a role entity by identifier.
# @PRE: role_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Role entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Role]]
def get_role_by_id(self, role_id: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_id"):
return self.db.query(Role).filter(Role.id == role_id).first()
# [/DEF:get_role_by_id:Function]
# [DEF:get_permission_by_id:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve a permission entity by identifier.
# @PRE: perm_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Permission entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Permission]]
def get_permission_by_id(self, perm_id: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_id"):
return self.db.query(Permission).filter(Permission.id == perm_id).first()
# [/DEF:get_permission_by_id:Function]
# [DEF:get_permission_by_resource_action:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve a permission entity by resource and action pair.
# @PRE: resource and action are non-empty str values; self.db is a valid open Session.
# @POST: Returns matching Permission entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str, str] -> Output[Optional[Permission]]
def get_permission_by_resource_action(self, resource: str, action: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_resource_action"):
return self.db.query(Permission).filter(
Permission.resource == resource,
Permission.action == action
).first()
# [/DEF:get_permission_by_resource_action:Function]
# [DEF:get_user_dashboard_preference:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve dashboard preference entity owned by specified user.
# @PRE: user_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching UserDashboardPreference entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[UserDashboardPreference]]
def get_user_dashboard_preference(self, user_id: str) -> Optional[UserDashboardPreference]:
with belief_scope("AuthRepository.get_user_dashboard_preference"):
return (
self.db.query(UserDashboardPreference)
.filter(UserDashboardPreference.user_id == user_id)
.first()
)
# [/DEF:get_user_dashboard_preference:Function]
# [DEF:save_user_dashboard_preference:Function]
# @TIER: CRITICAL
# @PURPOSE: Persist dashboard preference entity and return refreshed persistent row.
# @PRE: preference is a valid UserDashboardPreference entity and self.db is a valid open Session.
# @POST: preference is committed to DB, refreshed from DB state, and returned.
# @SIDE_EFFECT: Performs INSERT/UPDATE commit and refresh via active DB session.
# @DATA_CONTRACT: Input[UserDashboardPreference] -> Output[UserDashboardPreference]
def save_user_dashboard_preference(
self,
preference: UserDashboardPreference,
) -> UserDashboardPreference:
with belief_scope("AuthRepository.save_user_dashboard_preference"):
if not isinstance(preference, UserDashboardPreference):
raise TypeError("preference must be an instance of UserDashboardPreference")
logger.reason(f"Saving dashboard preference for user: {preference.user_id}")
self.db.add(preference)
self.db.commit()
self.db.refresh(preference)
logger.reflect(f"Dashboard preference saved and refreshed for user: {preference.user_id}")
return preference
# [/DEF:save_user_dashboard_preference:Function]
# [DEF:list_permissions:Function]
# @TIER: CRITICAL
# @PURPOSE: List all permission entities available in storage.
# @PRE: self.db is a valid open Session.
# @POST: Returns list containing all Permission entities visible to the session.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[None] -> Output[List[Permission]]
def list_permissions(self) -> List[Permission]:
with belief_scope("AuthRepository.list_permissions"):
return self.db.query(Permission).all()
# [/DEF:list_permissions:Function]
# [/DEF:AuthRepository:Class]
# [/DEF:backend.src.core.auth.repository:Module]