semantic clean up

This commit is contained in:
2026-03-10 19:38:10 +03:00
parent 0078c1ae05
commit 4f1c33e02b
31 changed files with 5392 additions and 6647 deletions

View File

@@ -1,70 +1,79 @@
# [DEF:backend.src.core.auth.repository:Module]
#
# @SEMANTICS: auth, repository, database, user, role
# @PURPOSE: Data access layer for authentication-related entities.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> sqlalchemy
# @RELATION: USES -> backend.src.models.auth
# @TIER: CRITICAL
# @SEMANTICS: auth, repository, database, user, role, permission
# @PURPOSE: Data access layer for authentication and user preference entities.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.auth]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.profile]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.logger.belief_scope]
# @INVARIANT: All database read/write operations must execute via the injected SQLAlchemy session boundary.
#
# @INVARIANT: All database operations must be performed within a session.
# [SECTION: IMPORTS]
from typing import Optional, List
from typing import List, Optional
from sqlalchemy.orm import Session
from ...models.auth import User, Role, Permission
from ...models.auth import Permission, Role, User
from ...models.profile import UserDashboardPreference
from ..logger import belief_scope
# [/SECTION]
# [DEF:AuthRepository:Class]
# @PURPOSE: Encapsulates database operations for authentication.
# @PURPOSE: Encapsulates database operations for authentication-related entities.
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
class AuthRepository:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the repository with a database session.
# @PARAM: db (Session) - SQLAlchemy session.
# @PURPOSE: Bind repository instance to an existing SQLAlchemy session.
# @PRE: db is an initialized sqlalchemy.orm.Session instance.
# @POST: self.db points to the provided session and is used by all repository methods.
# @SIDE_EFFECT: Stores session reference on repository instance state.
# @DATA_CONTRACT: Input[Session] -> Output[None]
def __init__(self, db: Session):
self.db = db
with belief_scope("AuthRepository.__init__"):
self.db = db
# [/DEF:__init__:Function]
# [DEF:get_user_by_username:Function]
# @PURPOSE: Retrieves a user by their username.
# @PRE: username is a string.
# @POST: Returns User object if found, else None.
# @PARAM: username (str) - The username to search for.
# @RETURN: Optional[User] - The found user or None.
# @PURPOSE: Retrieve a user entity by unique username.
# @PRE: username is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching User entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_username(self, username: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_username"):
return self.db.query(User).filter(User.username == username).first()
# [/DEF:get_user_by_username:Function]
# [DEF:get_user_by_id:Function]
# @PURPOSE: Retrieves a user by their unique ID.
# @PRE: user_id is a valid UUID string.
# @POST: Returns User object if found, else None.
# @PARAM: user_id (str) - The user's unique identifier.
# @RETURN: Optional[User] - The found user or None.
# @PURPOSE: Retrieve a user entity by identifier.
# @PRE: user_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching User entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_id(self, user_id: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_id"):
return self.db.query(User).filter(User.id == user_id).first()
# [/DEF:get_user_by_id:Function]
# [DEF:get_role_by_name:Function]
# @PURPOSE: Retrieves a role by its name.
# @PRE: name is a string.
# @POST: Returns Role object if found, else None.
# @PARAM: name (str) - The role name to search for.
# @RETURN: Optional[Role] - The found role or None.
# @PURPOSE: Retrieve a role entity by role name.
# @PRE: name is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Role entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Role]]
def get_role_by_name(self, name: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_name"):
return self.db.query(Role).filter(Role.name == name).first()
# [/DEF:get_role_by_name:Function]
# [DEF:update_last_login:Function]
# @PURPOSE: Updates the last_login timestamp for a user.
# @PRE: user object is a valid User instance.
# @POST: User's last_login is updated in the database.
# @SIDE_EFFECT: Commits the transaction.
# @PARAM: user (User) - The user to update.
# @PURPOSE: Update last_login timestamp for the provided user entity.
# @PRE: user is a managed User instance and self.db is a valid open Session.
# @POST: user.last_login is set to current UTC timestamp and transaction is committed.
# @SIDE_EFFECT: Mutates user entity state and commits database transaction.
# @DATA_CONTRACT: Input[User] -> Output[None]
def update_last_login(self, user: User):
with belief_scope("AuthRepository.update_last_login"):
from datetime import datetime
@@ -74,34 +83,33 @@ class AuthRepository:
# [/DEF:update_last_login:Function]
# [DEF:get_role_by_id:Function]
# @PURPOSE: Retrieves a role by its unique ID.
# @PRE: role_id is a string.
# @POST: Returns Role object if found, else None.
# @PARAM: role_id (str) - The role's unique identifier.
# @RETURN: Optional[Role] - The found role or None.
# @PURPOSE: Retrieve a role entity by identifier.
# @PRE: role_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Role entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Role]]
def get_role_by_id(self, role_id: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_id"):
return self.db.query(Role).filter(Role.id == role_id).first()
# [/DEF:get_role_by_id:Function]
# [DEF:get_permission_by_id:Function]
# @PURPOSE: Retrieves a permission by its unique ID.
# @PRE: perm_id is a string.
# @POST: Returns Permission object if found, else None.
# @PARAM: perm_id (str) - The permission's unique identifier.
# @RETURN: Optional[Permission] - The found permission or None.
# @PURPOSE: Retrieve a permission entity by identifier.
# @PRE: perm_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Permission entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Permission]]
def get_permission_by_id(self, perm_id: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_id"):
return self.db.query(Permission).filter(Permission.id == perm_id).first()
# [/DEF:get_permission_by_id:Function]
# [DEF:get_permission_by_resource_action:Function]
# @PURPOSE: Retrieves a permission by resource and action.
# @PRE: resource and action are strings.
# @POST: Returns Permission object if found, else None.
# @PARAM: resource (str) - The resource name.
# @PARAM: action (str) - The action name.
# @RETURN: Optional[Permission] - The found permission or None.
# @PURPOSE: Retrieve a permission entity by resource and action pair.
# @PRE: resource and action are non-empty str values; self.db is a valid open Session.
# @POST: Returns matching Permission entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str, str] -> Output[Optional[Permission]]
def get_permission_by_resource_action(self, resource: str, action: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_resource_action"):
return self.db.query(Permission).filter(
@@ -111,11 +119,11 @@ class AuthRepository:
# [/DEF:get_permission_by_resource_action:Function]
# [DEF:get_user_dashboard_preference:Function]
# @PURPOSE: Retrieves dashboard preference by owner user ID.
# @PRE: user_id is a string.
# @POST: Returns UserDashboardPreference if found, else None.
# @PARAM: user_id (str) - Preference owner identifier.
# @RETURN: Optional[UserDashboardPreference] - Found preference or None.
# @PURPOSE: Retrieve dashboard preference entity owned by specified user.
# @PRE: user_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching UserDashboardPreference entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[UserDashboardPreference]]
def get_user_dashboard_preference(self, user_id: str) -> Optional[UserDashboardPreference]:
with belief_scope("AuthRepository.get_user_dashboard_preference"):
return (
@@ -126,11 +134,11 @@ class AuthRepository:
# [/DEF:get_user_dashboard_preference:Function]
# [DEF:save_user_dashboard_preference:Function]
# @PURPOSE: Persists dashboard preference entity and returns refreshed row.
# @PRE: preference is a valid UserDashboardPreference entity.
# @POST: Preference is committed and refreshed in database.
# @PARAM: preference (UserDashboardPreference) - Preference entity to persist.
# @RETURN: UserDashboardPreference - Persisted preference row.
# @PURPOSE: Persist dashboard preference entity and return refreshed persistent row.
# @PRE: preference is a valid UserDashboardPreference entity and self.db is a valid open Session.
# @POST: preference is committed to DB, refreshed from DB state, and returned.
# @SIDE_EFFECT: Performs INSERT/UPDATE commit and refresh via active DB session.
# @DATA_CONTRACT: Input[UserDashboardPreference] -> Output[UserDashboardPreference]
def save_user_dashboard_preference(
self,
preference: UserDashboardPreference,
@@ -143,14 +151,16 @@ class AuthRepository:
# [/DEF:save_user_dashboard_preference:Function]
# [DEF:list_permissions:Function]
# @PURPOSE: Lists all available permissions.
# @POST: Returns a list of all Permission objects.
# @RETURN: List[Permission] - List of permissions.
# @PURPOSE: List all permission entities available in storage.
# @PRE: self.db is a valid open Session.
# @POST: Returns list containing all Permission entities visible to the session.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[None] -> Output[List[Permission]]
def list_permissions(self) -> List[Permission]:
with belief_scope("AuthRepository.list_permissions"):
return self.db.query(Permission).all()
# [/DEF:list_permissions:Function]
# [/DEF:AuthRepository:Class]
# [/DEF:AuthRepository:Class]
# [/DEF:backend.src.core.auth.repository:Module]