Files
ss-tools/backend/src/services/profile_service.py

675 lines
30 KiB
Python

# [DEF:backend.src.services.profile_service:Module]
#
# @TIER: CRITICAL
# @SEMANTICS: profile, service, validation, ownership, filtering, superset, preferences
# @PURPOSE: Orchestrates profile preference persistence, Superset account lookup, and deterministic actor matching.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.models.profile
# @RELATION: DEPENDS_ON -> backend.src.schemas.profile
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
# @RELATION: DEPENDS_ON -> backend.src.core.auth.repository
# @RELATION: DEPENDS_ON -> backend.src.models.auth
# @RELATION: DEPENDS_ON -> sqlalchemy.orm.Session
#
# @INVARIANT: Preference mutations are always scoped to authenticated user identity.
# @INVARIANT: Username normalization is trim+lower and shared by save and matching paths.
#
# @TEST_CONTRACT: ProfilePreferenceUpdateRequest -> ProfilePreferenceResponse
# @TEST_FIXTURE: valid_profile_update -> {"user_id":"u-1","superset_username":"John_Doe","show_only_my_dashboards":true}
# @TEST_EDGE: enable_without_username -> toggle=true with empty username returns validation error
# @TEST_EDGE: cross_user_mutation -> attempt to update another user preference returns forbidden
# @TEST_EDGE: lookup_env_not_found -> unknown environment_id returns not found
# @TEST_INVARIANT: normalization_consistency -> VERIFIED_BY: [valid_profile_update, enable_without_username]
# [SECTION: IMPORTS]
from datetime import datetime
from typing import Any, Iterable, List, Optional, Sequence, Set, Tuple
from sqlalchemy.orm import Session
from ..core.auth.repository import AuthRepository
from ..core.logger import logger, belief_scope
from ..core.superset_client import SupersetClient
from ..core.superset_profile_lookup import SupersetAccountLookupAdapter
from ..models.auth import User
from ..models.profile import UserDashboardPreference
from .llm_provider import EncryptionManager
from .rbac_permission_catalog import discover_declared_permissions
from ..schemas.profile import (
ProfilePermissionState,
ProfilePreference,
ProfilePreferenceResponse,
ProfilePreferenceUpdateRequest,
ProfileSecuritySummary,
SupersetAccountLookupRequest,
SupersetAccountLookupResponse,
SupersetAccountCandidate,
)
# [/SECTION]
SUPPORTED_START_PAGES = {"dashboards", "datasets", "reports"}
SUPPORTED_DENSITIES = {"compact", "comfortable"}
# [DEF:ProfileValidationError:Class]
# @TIER: STANDARD
# @PURPOSE: Domain validation error for profile preference update requests.
class ProfileValidationError(Exception):
def __init__(self, errors: Sequence[str]):
self.errors = list(errors)
super().__init__("Profile preference validation failed")
# [/DEF:ProfileValidationError:Class]
# [DEF:EnvironmentNotFoundError:Class]
# @TIER: STANDARD
# @PURPOSE: Raised when environment_id from lookup request is unknown in app configuration.
class EnvironmentNotFoundError(Exception):
pass
# [/DEF:EnvironmentNotFoundError:Class]
# [DEF:ProfileAuthorizationError:Class]
# @TIER: STANDARD
# @PURPOSE: Raised when caller attempts cross-user preference mutation.
class ProfileAuthorizationError(Exception):
pass
# [/DEF:ProfileAuthorizationError:Class]
# [DEF:ProfileService:Class]
# @TIER: CRITICAL
# @PURPOSE: Implements profile preference read/update flow and Superset account lookup degradation strategy.
class ProfileService:
# [DEF:__init__:Function]
# @PURPOSE: Initialize service with DB session and config manager.
# @PRE: db session is active and config_manager supports get_environments().
# @POST: Service is ready for preference persistence and lookup operations.
def __init__(self, db: Session, config_manager: Any, plugin_loader: Any = None):
self.db = db
self.config_manager = config_manager
self.plugin_loader = plugin_loader
self.auth_repository = AuthRepository(db)
self.encryption = EncryptionManager()
# [/DEF:__init__:Function]
# [DEF:get_my_preference:Function]
# @PURPOSE: Return current user's persisted preference or default non-configured view.
# @PRE: current_user is authenticated.
# @POST: Returned payload belongs to current_user only.
def get_my_preference(self, current_user: User) -> ProfilePreferenceResponse:
with belief_scope("ProfileService.get_my_preference", f"user_id={current_user.id}"):
logger.reflect("[REFLECT] Loading current user's dashboard preference")
preference = self._get_preference_row(current_user.id)
security_summary = self._build_security_summary(current_user)
if preference is None:
return ProfilePreferenceResponse(
status="success",
message="Preference not configured yet",
preference=self._build_default_preference(current_user.id),
security=security_summary,
)
return ProfilePreferenceResponse(
status="success",
message="Preference loaded",
preference=self._to_preference_payload(preference, str(current_user.id)),
security=security_summary,
)
# [/DEF:get_my_preference:Function]
# [DEF:update_my_preference:Function]
# @PURPOSE: Validate and persist current user's profile preference in self-scoped mode.
# @PRE: current_user is authenticated and payload is provided.
# @POST: Preference row for current_user is created/updated when validation passes.
def update_my_preference(
self,
current_user: User,
payload: ProfilePreferenceUpdateRequest,
target_user_id: Optional[str] = None,
) -> ProfilePreferenceResponse:
with belief_scope("ProfileService.update_my_preference", f"user_id={current_user.id}"):
logger.reason("[REASON] Evaluating self-scope guard before preference mutation")
requested_user_id = str(target_user_id or current_user.id)
if requested_user_id != str(current_user.id):
logger.explore("[EXPLORE] Cross-user mutation attempt blocked")
raise ProfileAuthorizationError("Cross-user preference mutation is forbidden")
preference = self._get_or_create_preference_row(current_user.id)
provided_fields = set(getattr(payload, "model_fields_set", set()))
effective_superset_username = self._sanitize_username(preference.superset_username)
if "superset_username" in provided_fields:
effective_superset_username = self._sanitize_username(payload.superset_username)
effective_show_only = bool(preference.show_only_my_dashboards)
if "show_only_my_dashboards" in provided_fields:
effective_show_only = bool(payload.show_only_my_dashboards)
effective_git_username = self._sanitize_text(preference.git_username)
if "git_username" in provided_fields:
effective_git_username = self._sanitize_text(payload.git_username)
effective_git_email = self._sanitize_text(preference.git_email)
if "git_email" in provided_fields:
effective_git_email = self._sanitize_text(payload.git_email)
effective_start_page = self._normalize_start_page(preference.start_page)
if "start_page" in provided_fields:
effective_start_page = self._normalize_start_page(payload.start_page)
effective_auto_open_task_drawer = (
bool(preference.auto_open_task_drawer)
if preference.auto_open_task_drawer is not None
else True
)
if "auto_open_task_drawer" in provided_fields:
effective_auto_open_task_drawer = bool(payload.auto_open_task_drawer)
effective_dashboards_table_density = self._normalize_density(
preference.dashboards_table_density
)
if "dashboards_table_density" in provided_fields:
effective_dashboards_table_density = self._normalize_density(
payload.dashboards_table_density
)
validation_errors = self._validate_update_payload(
superset_username=effective_superset_username,
show_only_my_dashboards=effective_show_only,
git_email=effective_git_email,
start_page=effective_start_page,
dashboards_table_density=effective_dashboards_table_density,
)
if validation_errors:
logger.reflect("[REFLECT] Validation failed; mutation is denied")
raise ProfileValidationError(validation_errors)
preference.superset_username = effective_superset_username
preference.superset_username_normalized = self._normalize_username(
effective_superset_username
)
preference.show_only_my_dashboards = effective_show_only
preference.git_username = effective_git_username
preference.git_email = effective_git_email
if "git_personal_access_token" in provided_fields:
sanitized_token = self._sanitize_secret(payload.git_personal_access_token)
if sanitized_token is None:
preference.git_personal_access_token_encrypted = None
else:
preference.git_personal_access_token_encrypted = self.encryption.encrypt(
sanitized_token
)
preference.start_page = effective_start_page
preference.auto_open_task_drawer = effective_auto_open_task_drawer
preference.dashboards_table_density = effective_dashboards_table_density
preference.updated_at = datetime.utcnow()
persisted_preference = self.auth_repository.save_user_dashboard_preference(preference)
logger.reason("[REASON] Preference persisted successfully")
return ProfilePreferenceResponse(
status="success",
message="Preference saved",
preference=self._to_preference_payload(
persisted_preference,
str(current_user.id),
),
security=self._build_security_summary(current_user),
)
# [/DEF:update_my_preference:Function]
# [DEF:lookup_superset_accounts:Function]
# @PURPOSE: Query Superset users in selected environment and project canonical account candidates.
# @PRE: current_user is authenticated and environment_id exists.
# @POST: Returns success payload or degraded payload with warning while preserving manual fallback.
def lookup_superset_accounts(
self,
current_user: User,
request: SupersetAccountLookupRequest,
) -> SupersetAccountLookupResponse:
with belief_scope(
"ProfileService.lookup_superset_accounts",
f"user_id={current_user.id}, environment_id={request.environment_id}",
):
environment = self._resolve_environment(request.environment_id)
if environment is None:
logger.explore("[EXPLORE] Lookup aborted: environment not found")
raise EnvironmentNotFoundError(f"Environment '{request.environment_id}' not found")
sort_column = str(request.sort_column or "username").strip().lower()
sort_order = str(request.sort_order or "desc").strip().lower()
allowed_columns = {"username", "first_name", "last_name", "email"}
if sort_column not in allowed_columns:
sort_column = "username"
if sort_order not in {"asc", "desc"}:
sort_order = "desc"
logger.reflect(
"[REFLECT] Normalized lookup request "
f"(env={request.environment_id}, sort_column={sort_column}, sort_order={sort_order}, "
f"page_index={request.page_index}, page_size={request.page_size}, "
f"search={(request.search or '').strip()!r})"
)
try:
logger.reason("[REASON] Performing Superset account lookup")
superset_client = SupersetClient(environment)
adapter = SupersetAccountLookupAdapter(
network_client=superset_client.network,
environment_id=request.environment_id,
)
lookup_result = adapter.get_users_page(
search=request.search,
page_index=request.page_index,
page_size=request.page_size,
sort_column=sort_column,
sort_order=sort_order,
)
items = [
SupersetAccountCandidate.model_validate(item)
for item in lookup_result.get("items", [])
]
return SupersetAccountLookupResponse(
status="success",
environment_id=request.environment_id,
page_index=request.page_index,
page_size=request.page_size,
total=max(int(lookup_result.get("total", len(items))), 0),
warning=None,
items=items,
)
except Exception as exc:
logger.explore(f"[EXPLORE] Lookup degraded due to upstream error: {exc}")
return SupersetAccountLookupResponse(
status="degraded",
environment_id=request.environment_id,
page_index=request.page_index,
page_size=request.page_size,
total=0,
warning=(
"Cannot load Superset accounts for this environment right now. "
"You can enter username manually."
),
items=[],
)
# [/DEF:lookup_superset_accounts:Function]
# [DEF:matches_dashboard_actor:Function]
# @PURPOSE: Apply trim+case-insensitive actor match across owners OR modified_by.
# @PRE: bound_username can be empty; owners may contain mixed payload.
# @POST: Returns True when normalized username matches owners or modified_by.
def matches_dashboard_actor(
self,
bound_username: Optional[str],
owners: Optional[Iterable[Any]],
modified_by: Optional[str],
) -> bool:
normalized_actor = self._normalize_username(bound_username)
if not normalized_actor:
return False
owner_tokens = self._normalize_owner_tokens(owners)
modified_token = self._normalize_username(modified_by)
if normalized_actor in owner_tokens:
return True
if modified_token and normalized_actor == modified_token:
return True
return False
# [/DEF:matches_dashboard_actor:Function]
# [DEF:_build_security_summary:Function]
# @PURPOSE: Build read-only security snapshot with role and permission badges.
# @PRE: current_user is authenticated.
# @POST: Returns deterministic security projection for profile UI.
def _build_security_summary(self, current_user: User) -> ProfileSecuritySummary:
role_names_set: Set[str] = set()
roles = getattr(current_user, "roles", []) or []
for role in roles:
normalized_role_name = self._sanitize_text(getattr(role, "name", None))
if normalized_role_name:
role_names_set.add(normalized_role_name)
role_names = sorted(role_names_set)
is_admin = any(str(role_name).lower() == "admin" for role_name in role_names)
user_permission_pairs = self._collect_user_permission_pairs(current_user)
declared_permission_pairs: Set[Tuple[str, str]] = set()
try:
discovered_permissions = discover_declared_permissions(
plugin_loader=self.plugin_loader
)
for resource, action in discovered_permissions:
normalized_resource = self._sanitize_text(resource)
normalized_action = str(action or "").strip().upper()
if normalized_resource and normalized_action:
declared_permission_pairs.add((normalized_resource, normalized_action))
except Exception as discovery_error:
logger.warning(
"[ProfileService][EXPLORE] Failed to build declared permission catalog: %s",
discovery_error,
)
if not declared_permission_pairs:
declared_permission_pairs = set(user_permission_pairs)
sorted_permission_pairs = sorted(
declared_permission_pairs,
key=lambda pair: (pair[0], pair[1]),
)
permission_states = [
ProfilePermissionState(
key=self._format_permission_key(resource, action),
allowed=bool(is_admin or (resource, action) in user_permission_pairs),
)
for resource, action in sorted_permission_pairs
]
auth_source = self._sanitize_text(getattr(current_user, "auth_source", None))
current_role = "Admin" if is_admin else (role_names[0] if role_names else None)
return ProfileSecuritySummary(
read_only=True,
auth_source=auth_source,
current_role=current_role,
role_source=auth_source,
roles=role_names,
permissions=permission_states,
)
# [/DEF:_build_security_summary:Function]
# [DEF:_collect_user_permission_pairs:Function]
# @PURPOSE: Collect effective permission tuples from current user's roles.
# @PRE: current_user can include role/permission graph.
# @POST: Returns unique normalized (resource, ACTION) tuples.
def _collect_user_permission_pairs(self, current_user: User) -> Set[Tuple[str, str]]:
collected: Set[Tuple[str, str]] = set()
roles = getattr(current_user, "roles", []) or []
for role in roles:
permissions = getattr(role, "permissions", []) or []
for permission in permissions:
resource = self._sanitize_text(getattr(permission, "resource", None))
action = str(getattr(permission, "action", "") or "").strip().upper()
if resource and action:
collected.add((resource, action))
return collected
# [/DEF:_collect_user_permission_pairs:Function]
# [DEF:_format_permission_key:Function]
# @PURPOSE: Convert normalized permission pair to compact UI key.
# @PRE: resource and action are normalized.
# @POST: Returns user-facing badge key.
def _format_permission_key(self, resource: str, action: str) -> str:
normalized_resource = self._sanitize_text(resource) or ""
normalized_action = str(action or "").strip().upper()
if normalized_action == "READ":
return normalized_resource
return f"{normalized_resource}:{normalized_action.lower()}"
# [/DEF:_format_permission_key:Function]
# [DEF:_to_preference_payload:Function]
# @PURPOSE: Map ORM preference row to API DTO with token metadata.
# @PRE: preference row can contain nullable optional fields.
# @POST: Returns normalized ProfilePreference object.
def _to_preference_payload(
self,
preference: UserDashboardPreference,
user_id: str,
) -> ProfilePreference:
encrypted_token = self._sanitize_text(
preference.git_personal_access_token_encrypted
)
token_masked = None
if encrypted_token:
try:
decrypted_token = self.encryption.decrypt(encrypted_token)
token_masked = self._mask_secret_value(decrypted_token)
except Exception:
token_masked = "***"
created_at = getattr(preference, "created_at", None) or datetime.utcnow()
updated_at = getattr(preference, "updated_at", None) or created_at
return ProfilePreference(
user_id=str(user_id),
superset_username=self._sanitize_username(preference.superset_username),
superset_username_normalized=self._normalize_username(
preference.superset_username_normalized
),
show_only_my_dashboards=bool(preference.show_only_my_dashboards),
git_username=self._sanitize_text(preference.git_username),
git_email=self._sanitize_text(preference.git_email),
has_git_personal_access_token=bool(encrypted_token),
git_personal_access_token_masked=token_masked,
start_page=self._normalize_start_page(preference.start_page),
auto_open_task_drawer=(
bool(preference.auto_open_task_drawer)
if preference.auto_open_task_drawer is not None
else True
),
dashboards_table_density=self._normalize_density(
preference.dashboards_table_density
),
created_at=created_at,
updated_at=updated_at,
)
# [/DEF:_to_preference_payload:Function]
# [DEF:_mask_secret_value:Function]
# @PURPOSE: Build a safe display value for sensitive secrets.
# @PRE: secret may be None or plaintext.
# @POST: Returns masked representation or None.
def _mask_secret_value(self, secret: Optional[str]) -> Optional[str]:
sanitized_secret = self._sanitize_secret(secret)
if sanitized_secret is None:
return None
if len(sanitized_secret) <= 4:
return "***"
return f"{sanitized_secret[:2]}***{sanitized_secret[-2:]}"
# [/DEF:_mask_secret_value:Function]
# [DEF:_sanitize_text:Function]
# @PURPOSE: Normalize optional text into trimmed form or None.
# @PRE: value may be empty or None.
# @POST: Returns trimmed value or None.
def _sanitize_text(self, value: Optional[str]) -> Optional[str]:
normalized = str(value or "").strip()
if not normalized:
return None
return normalized
# [/DEF:_sanitize_text:Function]
# [DEF:_sanitize_secret:Function]
# @PURPOSE: Normalize secret input into trimmed form or None.
# @PRE: value may be None or blank.
# @POST: Returns trimmed secret or None.
def _sanitize_secret(self, value: Optional[str]) -> Optional[str]:
if value is None:
return None
normalized = str(value).strip()
if not normalized:
return None
return normalized
# [/DEF:_sanitize_secret:Function]
# [DEF:_normalize_start_page:Function]
# @PURPOSE: Normalize supported start page aliases to canonical values.
# @PRE: value may be None or alias.
# @POST: Returns one of SUPPORTED_START_PAGES.
def _normalize_start_page(self, value: Optional[str]) -> str:
normalized = str(value or "").strip().lower()
if normalized == "reports-logs":
return "reports"
if normalized in SUPPORTED_START_PAGES:
return normalized
return "dashboards"
# [/DEF:_normalize_start_page:Function]
# [DEF:_normalize_density:Function]
# @PURPOSE: Normalize supported density aliases to canonical values.
# @PRE: value may be None or alias.
# @POST: Returns one of SUPPORTED_DENSITIES.
def _normalize_density(self, value: Optional[str]) -> str:
normalized = str(value or "").strip().lower()
if normalized == "free":
return "comfortable"
if normalized in SUPPORTED_DENSITIES:
return normalized
return "comfortable"
# [/DEF:_normalize_density:Function]
# [DEF:_resolve_environment:Function]
# @PURPOSE: Resolve environment model from configured environments by id.
# @PRE: environment_id is provided.
# @POST: Returns environment object when found else None.
def _resolve_environment(self, environment_id: str):
environments = self.config_manager.get_environments()
for env in environments:
if str(getattr(env, "id", "")) == str(environment_id):
return env
return None
# [/DEF:_resolve_environment:Function]
# [DEF:_get_preference_row:Function]
# @PURPOSE: Return persisted preference row for user or None.
# @PRE: user_id is provided.
# @POST: Returns matching row or None.
def _get_preference_row(self, user_id: str) -> Optional[UserDashboardPreference]:
return self.auth_repository.get_user_dashboard_preference(str(user_id))
# [/DEF:_get_preference_row:Function]
# [DEF:_get_or_create_preference_row:Function]
# @PURPOSE: Return existing preference row or create new unsaved row.
# @PRE: user_id is provided.
# @POST: Returned row always contains user_id.
def _get_or_create_preference_row(self, user_id: str) -> UserDashboardPreference:
existing = self._get_preference_row(user_id)
if existing is not None:
return existing
return UserDashboardPreference(user_id=str(user_id))
# [/DEF:_get_or_create_preference_row:Function]
# [DEF:_build_default_preference:Function]
# @PURPOSE: Build non-persisted default preference DTO for unconfigured users.
# @PRE: user_id is provided.
# @POST: Returns ProfilePreference with disabled toggle and empty username.
def _build_default_preference(self, user_id: str) -> ProfilePreference:
now = datetime.utcnow()
return ProfilePreference(
user_id=str(user_id),
superset_username=None,
superset_username_normalized=None,
show_only_my_dashboards=False,
git_username=None,
git_email=None,
has_git_personal_access_token=False,
git_personal_access_token_masked=None,
start_page="dashboards",
auto_open_task_drawer=True,
dashboards_table_density="comfortable",
created_at=now,
updated_at=now,
)
# [/DEF:_build_default_preference:Function]
# [DEF:_validate_update_payload:Function]
# @PURPOSE: Validate username/toggle constraints for preference mutation.
# @PRE: payload is provided.
# @POST: Returns validation errors list; empty list means valid.
def _validate_update_payload(
self,
superset_username: Optional[str],
show_only_my_dashboards: bool,
git_email: Optional[str],
start_page: str,
dashboards_table_density: str,
) -> List[str]:
errors: List[str] = []
sanitized_username = self._sanitize_username(superset_username)
if sanitized_username and any(ch.isspace() for ch in sanitized_username):
errors.append(
"Username should not contain spaces. Please enter a valid Apache Superset username."
)
if show_only_my_dashboards and not sanitized_username:
errors.append("Superset username is required when default filter is enabled.")
sanitized_git_email = self._sanitize_text(git_email)
if sanitized_git_email:
if (
" " in sanitized_git_email
or "@" not in sanitized_git_email
or sanitized_git_email.startswith("@")
or sanitized_git_email.endswith("@")
):
errors.append("Git email should be a valid email address.")
if start_page not in SUPPORTED_START_PAGES:
errors.append("Start page value is not supported.")
if dashboards_table_density not in SUPPORTED_DENSITIES:
errors.append("Dashboards table density value is not supported.")
return errors
# [/DEF:_validate_update_payload:Function]
# [DEF:_sanitize_username:Function]
# @PURPOSE: Normalize raw username into trimmed form or None for empty input.
# @PRE: value can be empty or None.
# @POST: Returns trimmed username or None.
def _sanitize_username(self, value: Optional[str]) -> Optional[str]:
return self._sanitize_text(value)
# [/DEF:_sanitize_username:Function]
# [DEF:_normalize_username:Function]
# @PURPOSE: Apply deterministic trim+lower normalization for actor matching.
# @PRE: value can be empty or None.
# @POST: Returns lowercase normalized token or None.
def _normalize_username(self, value: Optional[str]) -> Optional[str]:
sanitized = self._sanitize_username(value)
if sanitized is None:
return None
return sanitized.lower()
# [/DEF:_normalize_username:Function]
# [DEF:_normalize_owner_tokens:Function]
# @PURPOSE: Normalize owners payload into deduplicated lower-cased tokens.
# @PRE: owners can be iterable of scalars or dict-like values.
# @POST: Returns list of unique normalized owner tokens.
def _normalize_owner_tokens(self, owners: Optional[Iterable[Any]]) -> List[str]:
if owners is None:
return []
normalized: List[str] = []
for owner in owners:
owner_candidates: List[Any]
if isinstance(owner, dict):
first_name = self._sanitize_username(str(owner.get("first_name") or ""))
last_name = self._sanitize_username(str(owner.get("last_name") or ""))
full_name = " ".join(part for part in [first_name, last_name] if part).strip()
snake_name = "_".join(part for part in [first_name, last_name] if part).strip("_")
owner_candidates = [
owner.get("username"),
owner.get("user_name"),
owner.get("name"),
owner.get("full_name"),
first_name,
last_name,
full_name or None,
snake_name or None,
owner.get("email"),
]
else:
owner_candidates = [owner]
for candidate in owner_candidates:
token = self._normalize_username(str(candidate or ""))
if token and token not in normalized:
normalized.append(token)
return normalized
# [/DEF:_normalize_owner_tokens:Function]
# [/DEF:ProfileService:Class]
# [/DEF:backend.src.services.profile_service:Module]