feat(rbac): hide unauthorized menu sections and enforce route guards
This commit is contained in:
353
backend/src/services/profile_service.py
Normal file
353
backend/src/services/profile_service.py
Normal file
@@ -0,0 +1,353 @@
|
||||
# [DEF:backend.src.services.profile_service:Module]
|
||||
#
|
||||
# @TIER: CRITICAL
|
||||
# @SEMANTICS: profile, service, validation, ownership, filtering, superset, preferences
|
||||
# @PURPOSE: Orchestrates profile preference persistence, Superset account lookup, and deterministic actor matching.
|
||||
# @LAYER: Domain
|
||||
# @RELATION: DEPENDS_ON -> backend.src.models.profile
|
||||
# @RELATION: DEPENDS_ON -> backend.src.schemas.profile
|
||||
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
|
||||
# @RELATION: DEPENDS_ON -> backend.src.core.auth.repository
|
||||
# @RELATION: DEPENDS_ON -> backend.src.models.auth
|
||||
# @RELATION: DEPENDS_ON -> sqlalchemy.orm.Session
|
||||
#
|
||||
# @INVARIANT: Preference mutations are always scoped to authenticated user identity.
|
||||
# @INVARIANT: Username normalization is trim+lower and shared by save and matching paths.
|
||||
#
|
||||
# @TEST_CONTRACT: ProfilePreferenceUpdateRequest -> ProfilePreferenceResponse
|
||||
# @TEST_FIXTURE: valid_profile_update -> {"user_id":"u-1","superset_username":"John_Doe","show_only_my_dashboards":true}
|
||||
# @TEST_EDGE: enable_without_username -> toggle=true with empty username returns validation error
|
||||
# @TEST_EDGE: cross_user_mutation -> attempt to update another user preference returns forbidden
|
||||
# @TEST_EDGE: lookup_env_not_found -> unknown environment_id returns not found
|
||||
# @TEST_INVARIANT: normalization_consistency -> VERIFIED_BY: [valid_profile_update, enable_without_username]
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
from datetime import datetime
|
||||
from typing import Any, Iterable, List, Optional, Sequence
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from ..core.auth.repository import AuthRepository
|
||||
from ..core.logger import logger, belief_scope
|
||||
from ..core.superset_client import SupersetClient
|
||||
from ..core.superset_profile_lookup import SupersetAccountLookupAdapter
|
||||
from ..models.auth import User
|
||||
from ..models.profile import UserDashboardPreference
|
||||
from ..schemas.profile import (
|
||||
ProfilePreference,
|
||||
ProfilePreferenceResponse,
|
||||
ProfilePreferenceUpdateRequest,
|
||||
SupersetAccountLookupRequest,
|
||||
SupersetAccountLookupResponse,
|
||||
SupersetAccountCandidate,
|
||||
)
|
||||
# [/SECTION]
|
||||
|
||||
|
||||
# [DEF:ProfileValidationError:Class]
|
||||
# @TIER: STANDARD
|
||||
# @PURPOSE: Domain validation error for profile preference update requests.
|
||||
class ProfileValidationError(Exception):
|
||||
def __init__(self, errors: Sequence[str]):
|
||||
self.errors = list(errors)
|
||||
super().__init__("Profile preference validation failed")
|
||||
# [/DEF:ProfileValidationError:Class]
|
||||
|
||||
|
||||
# [DEF:EnvironmentNotFoundError:Class]
|
||||
# @TIER: STANDARD
|
||||
# @PURPOSE: Raised when environment_id from lookup request is unknown in app configuration.
|
||||
class EnvironmentNotFoundError(Exception):
|
||||
pass
|
||||
# [/DEF:EnvironmentNotFoundError:Class]
|
||||
|
||||
|
||||
# [DEF:ProfileAuthorizationError:Class]
|
||||
# @TIER: STANDARD
|
||||
# @PURPOSE: Raised when caller attempts cross-user preference mutation.
|
||||
class ProfileAuthorizationError(Exception):
|
||||
pass
|
||||
# [/DEF:ProfileAuthorizationError:Class]
|
||||
|
||||
|
||||
# [DEF:ProfileService:Class]
|
||||
# @TIER: CRITICAL
|
||||
# @PURPOSE: Implements profile preference read/update flow and Superset account lookup degradation strategy.
|
||||
class ProfileService:
|
||||
# [DEF:__init__:Function]
|
||||
# @PURPOSE: Initialize service with DB session and config manager.
|
||||
# @PRE: db session is active and config_manager supports get_environments().
|
||||
# @POST: Service is ready for preference persistence and lookup operations.
|
||||
def __init__(self, db: Session, config_manager: Any):
|
||||
self.db = db
|
||||
self.config_manager = config_manager
|
||||
self.auth_repository = AuthRepository(db)
|
||||
# [/DEF:__init__:Function]
|
||||
|
||||
# [DEF:get_my_preference:Function]
|
||||
# @PURPOSE: Return current user's persisted preference or default non-configured view.
|
||||
# @PRE: current_user is authenticated.
|
||||
# @POST: Returned payload belongs to current_user only.
|
||||
def get_my_preference(self, current_user: User) -> ProfilePreferenceResponse:
|
||||
with belief_scope("ProfileService.get_my_preference", f"user_id={current_user.id}"):
|
||||
logger.reflect("[REFLECT] Loading current user's dashboard preference")
|
||||
preference = self._get_preference_row(current_user.id)
|
||||
if preference is None:
|
||||
return ProfilePreferenceResponse(
|
||||
status="success",
|
||||
message="Preference not configured yet",
|
||||
preference=self._build_default_preference(current_user.id),
|
||||
)
|
||||
return ProfilePreferenceResponse(
|
||||
status="success",
|
||||
message="Preference loaded",
|
||||
preference=ProfilePreference.model_validate(preference, from_attributes=True),
|
||||
)
|
||||
# [/DEF:get_my_preference:Function]
|
||||
|
||||
# [DEF:update_my_preference:Function]
|
||||
# @PURPOSE: Validate and persist current user's profile preference in self-scoped mode.
|
||||
# @PRE: current_user is authenticated and payload is provided.
|
||||
# @POST: Preference row for current_user is created/updated when validation passes.
|
||||
def update_my_preference(
|
||||
self,
|
||||
current_user: User,
|
||||
payload: ProfilePreferenceUpdateRequest,
|
||||
target_user_id: Optional[str] = None,
|
||||
) -> ProfilePreferenceResponse:
|
||||
with belief_scope("ProfileService.update_my_preference", f"user_id={current_user.id}"):
|
||||
logger.reason("[REASON] Evaluating self-scope guard before preference mutation")
|
||||
requested_user_id = str(target_user_id or current_user.id)
|
||||
if requested_user_id != str(current_user.id):
|
||||
logger.explore("[EXPLORE] Cross-user mutation attempt blocked")
|
||||
raise ProfileAuthorizationError("Cross-user preference mutation is forbidden")
|
||||
|
||||
validation_errors = self._validate_update_payload(payload)
|
||||
if validation_errors:
|
||||
logger.reflect("[REFLECT] Validation failed; mutation is denied")
|
||||
raise ProfileValidationError(validation_errors)
|
||||
|
||||
normalized_username = self._normalize_username(payload.superset_username)
|
||||
raw_username = self._sanitize_username(payload.superset_username)
|
||||
|
||||
preference = self._get_or_create_preference_row(current_user.id)
|
||||
preference.superset_username = raw_username
|
||||
preference.superset_username_normalized = normalized_username
|
||||
preference.show_only_my_dashboards = bool(payload.show_only_my_dashboards)
|
||||
preference.updated_at = datetime.utcnow()
|
||||
|
||||
self.auth_repository.save_user_dashboard_preference(preference)
|
||||
|
||||
logger.reason("[REASON] Preference persisted successfully")
|
||||
return ProfilePreferenceResponse(
|
||||
status="success",
|
||||
message="Preference saved",
|
||||
preference=ProfilePreference.model_validate(preference, from_attributes=True),
|
||||
)
|
||||
# [/DEF:update_my_preference:Function]
|
||||
|
||||
# [DEF:lookup_superset_accounts:Function]
|
||||
# @PURPOSE: Query Superset users in selected environment and project canonical account candidates.
|
||||
# @PRE: current_user is authenticated and environment_id exists.
|
||||
# @POST: Returns success payload or degraded payload with warning while preserving manual fallback.
|
||||
def lookup_superset_accounts(
|
||||
self,
|
||||
current_user: User,
|
||||
request: SupersetAccountLookupRequest,
|
||||
) -> SupersetAccountLookupResponse:
|
||||
with belief_scope(
|
||||
"ProfileService.lookup_superset_accounts",
|
||||
f"user_id={current_user.id}, environment_id={request.environment_id}",
|
||||
):
|
||||
environment = self._resolve_environment(request.environment_id)
|
||||
if environment is None:
|
||||
logger.explore("[EXPLORE] Lookup aborted: environment not found")
|
||||
raise EnvironmentNotFoundError(f"Environment '{request.environment_id}' not found")
|
||||
|
||||
sort_column = str(request.sort_column or "username").strip().lower()
|
||||
sort_order = str(request.sort_order or "desc").strip().lower()
|
||||
allowed_columns = {"username", "first_name", "last_name", "email"}
|
||||
if sort_column not in allowed_columns:
|
||||
sort_column = "username"
|
||||
if sort_order not in {"asc", "desc"}:
|
||||
sort_order = "desc"
|
||||
|
||||
logger.reflect(
|
||||
"[REFLECT] Normalized lookup request "
|
||||
f"(env={request.environment_id}, sort_column={sort_column}, sort_order={sort_order}, "
|
||||
f"page_index={request.page_index}, page_size={request.page_size}, "
|
||||
f"search={(request.search or '').strip()!r})"
|
||||
)
|
||||
|
||||
try:
|
||||
logger.reason("[REASON] Performing Superset account lookup")
|
||||
superset_client = SupersetClient(environment)
|
||||
adapter = SupersetAccountLookupAdapter(
|
||||
network_client=superset_client.network,
|
||||
environment_id=request.environment_id,
|
||||
)
|
||||
lookup_result = adapter.get_users_page(
|
||||
search=request.search,
|
||||
page_index=request.page_index,
|
||||
page_size=request.page_size,
|
||||
sort_column=sort_column,
|
||||
sort_order=sort_order,
|
||||
)
|
||||
items = [
|
||||
SupersetAccountCandidate.model_validate(item)
|
||||
for item in lookup_result.get("items", [])
|
||||
]
|
||||
return SupersetAccountLookupResponse(
|
||||
status="success",
|
||||
environment_id=request.environment_id,
|
||||
page_index=request.page_index,
|
||||
page_size=request.page_size,
|
||||
total=max(int(lookup_result.get("total", len(items))), 0),
|
||||
warning=None,
|
||||
items=items,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.explore(f"[EXPLORE] Lookup degraded due to upstream error: {exc}")
|
||||
return SupersetAccountLookupResponse(
|
||||
status="degraded",
|
||||
environment_id=request.environment_id,
|
||||
page_index=request.page_index,
|
||||
page_size=request.page_size,
|
||||
total=0,
|
||||
warning=(
|
||||
"Cannot load Superset accounts for this environment right now. "
|
||||
"You can enter username manually."
|
||||
),
|
||||
items=[],
|
||||
)
|
||||
# [/DEF:lookup_superset_accounts:Function]
|
||||
|
||||
# [DEF:matches_dashboard_actor:Function]
|
||||
# @PURPOSE: Apply trim+case-insensitive actor match across owners OR modified_by.
|
||||
# @PRE: bound_username can be empty; owners may contain mixed payload.
|
||||
# @POST: Returns True when normalized username matches owners or modified_by.
|
||||
def matches_dashboard_actor(
|
||||
self,
|
||||
bound_username: Optional[str],
|
||||
owners: Optional[Iterable[Any]],
|
||||
modified_by: Optional[str],
|
||||
) -> bool:
|
||||
normalized_actor = self._normalize_username(bound_username)
|
||||
if not normalized_actor:
|
||||
return False
|
||||
|
||||
owner_tokens = self._normalize_owner_tokens(owners)
|
||||
modified_token = self._normalize_username(modified_by)
|
||||
|
||||
if normalized_actor in owner_tokens:
|
||||
return True
|
||||
if modified_token and normalized_actor == modified_token:
|
||||
return True
|
||||
return False
|
||||
# [/DEF:matches_dashboard_actor:Function]
|
||||
|
||||
# [DEF:_resolve_environment:Function]
|
||||
# @PURPOSE: Resolve environment model from configured environments by id.
|
||||
# @PRE: environment_id is provided.
|
||||
# @POST: Returns environment object when found else None.
|
||||
def _resolve_environment(self, environment_id: str):
|
||||
environments = self.config_manager.get_environments()
|
||||
for env in environments:
|
||||
if str(getattr(env, "id", "")) == str(environment_id):
|
||||
return env
|
||||
return None
|
||||
# [/DEF:_resolve_environment:Function]
|
||||
|
||||
# [DEF:_get_preference_row:Function]
|
||||
# @PURPOSE: Return persisted preference row for user or None.
|
||||
# @PRE: user_id is provided.
|
||||
# @POST: Returns matching row or None.
|
||||
def _get_preference_row(self, user_id: str) -> Optional[UserDashboardPreference]:
|
||||
return self.auth_repository.get_user_dashboard_preference(str(user_id))
|
||||
# [/DEF:_get_preference_row:Function]
|
||||
|
||||
# [DEF:_get_or_create_preference_row:Function]
|
||||
# @PURPOSE: Return existing preference row or create new unsaved row.
|
||||
# @PRE: user_id is provided.
|
||||
# @POST: Returned row always contains user_id.
|
||||
def _get_or_create_preference_row(self, user_id: str) -> UserDashboardPreference:
|
||||
existing = self._get_preference_row(user_id)
|
||||
if existing is not None:
|
||||
return existing
|
||||
return UserDashboardPreference(user_id=str(user_id))
|
||||
# [/DEF:_get_or_create_preference_row:Function]
|
||||
|
||||
# [DEF:_build_default_preference:Function]
|
||||
# @PURPOSE: Build non-persisted default preference DTO for unconfigured users.
|
||||
# @PRE: user_id is provided.
|
||||
# @POST: Returns ProfilePreference with disabled toggle and empty username.
|
||||
def _build_default_preference(self, user_id: str) -> ProfilePreference:
|
||||
now = datetime.utcnow()
|
||||
return ProfilePreference(
|
||||
user_id=str(user_id),
|
||||
superset_username=None,
|
||||
superset_username_normalized=None,
|
||||
show_only_my_dashboards=False,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
# [/DEF:_build_default_preference:Function]
|
||||
|
||||
# [DEF:_validate_update_payload:Function]
|
||||
# @PURPOSE: Validate username/toggle constraints for preference mutation.
|
||||
# @PRE: payload is provided.
|
||||
# @POST: Returns validation errors list; empty list means valid.
|
||||
def _validate_update_payload(
|
||||
self,
|
||||
payload: ProfilePreferenceUpdateRequest,
|
||||
) -> List[str]:
|
||||
errors: List[str] = []
|
||||
sanitized_username = self._sanitize_username(payload.superset_username)
|
||||
|
||||
if sanitized_username and any(ch.isspace() for ch in sanitized_username):
|
||||
errors.append(
|
||||
"Username should not contain spaces. Please enter a valid Apache Superset username."
|
||||
)
|
||||
if payload.show_only_my_dashboards and not sanitized_username:
|
||||
errors.append("Superset username is required when default filter is enabled.")
|
||||
return errors
|
||||
# [/DEF:_validate_update_payload:Function]
|
||||
|
||||
# [DEF:_sanitize_username:Function]
|
||||
# @PURPOSE: Normalize raw username into trimmed form or None for empty input.
|
||||
# @PRE: value can be empty or None.
|
||||
# @POST: Returns trimmed username or None.
|
||||
def _sanitize_username(self, value: Optional[str]) -> Optional[str]:
|
||||
normalized = str(value or "").strip()
|
||||
if not normalized:
|
||||
return None
|
||||
return normalized
|
||||
# [/DEF:_sanitize_username:Function]
|
||||
|
||||
# [DEF:_normalize_username:Function]
|
||||
# @PURPOSE: Apply deterministic trim+lower normalization for actor matching.
|
||||
# @PRE: value can be empty or None.
|
||||
# @POST: Returns lowercase normalized token or None.
|
||||
def _normalize_username(self, value: Optional[str]) -> Optional[str]:
|
||||
sanitized = self._sanitize_username(value)
|
||||
if sanitized is None:
|
||||
return None
|
||||
return sanitized.lower()
|
||||
# [/DEF:_normalize_username:Function]
|
||||
|
||||
# [DEF:_normalize_owner_tokens:Function]
|
||||
# @PURPOSE: Normalize owners payload into deduplicated lower-cased tokens.
|
||||
# @PRE: owners can be iterable of scalars or dict-like values.
|
||||
# @POST: Returns list of unique normalized owner tokens.
|
||||
def _normalize_owner_tokens(self, owners: Optional[Iterable[Any]]) -> List[str]:
|
||||
if owners is None:
|
||||
return []
|
||||
normalized: List[str] = []
|
||||
for owner in owners:
|
||||
token = self._normalize_username(str(owner or ""))
|
||||
if token and token not in normalized:
|
||||
normalized.append(token)
|
||||
return normalized
|
||||
# [/DEF:_normalize_owner_tokens:Function]
|
||||
# [/DEF:ProfileService:Class]
|
||||
|
||||
# [/DEF:backend.src.services.profile_service:Module]
|
||||
Reference in New Issue
Block a user