semantic clean up

This commit is contained in:
2026-03-10 19:38:10 +03:00
parent 31717870e3
commit 542835e0ff
31 changed files with 5392 additions and 6647 deletions

View File

@@ -422,7 +422,7 @@ def test_llm_validation_with_dashboard_ref_requires_confirmation():
assert "cancel" in action_types
# [/DEF:test_llm_validation_missing_dashboard_returns_needs_clarification:Function]
# [/DEF:test_llm_validation_with_dashboard_ref_requires_confirmation:Function]
# [DEF:test_list_conversations_groups_by_conversation_and_marks_archived:Function]
@@ -629,6 +629,7 @@ def test_guarded_operation_confirm_roundtrip():
assert second.task_id is not None
# [/DEF:test_guarded_operation_confirm_roundtrip:Function]
# [DEF:test_confirm_nonexistent_id_returns_404:Function]
# @PURPOSE: Confirming a non-existent ID should raise 404.
# @PRE: user tries to confirm a random/fake UUID.
@@ -649,6 +650,7 @@ def test_confirm_nonexistent_id_returns_404():
assert exc.value.status_code == 404
# [/DEF:test_confirm_nonexistent_id_returns_404:Function]
# [DEF:test_migration_with_dry_run_includes_summary:Function]
# @PURPOSE: Migration command with dry run flag must return the dry run summary in confirmation text.
# @PRE: user specifies a migration with --dry-run flag.

View File

@@ -135,6 +135,8 @@ def test_get_report_success():
finally:
app.dependency_overrides.clear()
# [/DEF:backend.tests.api.routes.test_clean_release_api:Module]
def test_prepare_candidate_api_success():
repo = _repo_with_seed_data()
app.dependency_overrides[get_clean_release_repository] = lambda: repo

View File

@@ -94,4 +94,7 @@ def test_prepare_candidate_blocks_external_source():
assert data["status"] == "blocked"
assert any(v["category"] == "external-source" for v in data["violations"])
finally:
app.dependency_overrides.clear()
app.dependency_overrides.clear()
# [/DEF:backend.tests.api.routes.test_clean_release_source_policy:Module]

View File

@@ -1173,7 +1173,7 @@ async def _async_confirmation_summary(intent: Dict[str, Any], config_manager: Co
text += f"\n\n(Не удалось загрузить отчет dry-run: {e})."
return f"Выполнить: {text}. Подтвердите или отмените."
# [/DEF:_async_confirmation_summary:Function]
# [/DEF:_confirmation_summary:Function]
# [DEF:_clarification_text_for_intent:Function]

View File

@@ -1,10 +1,23 @@
# [DEF:backend.src.api.routes.migration:Module]
# @TIER: STANDARD
# @SEMANTICS: api, migration, dashboards
# @PURPOSE: API endpoints for migration operations.
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.dependencies
# @RELATION: DEPENDS_ON -> backend.src.models.dashboard
# @TIER: CRITICAL
# @SEMANTICS: api, migration, dashboards, sync, dry-run
# @PURPOSE: HTTP contract layer for migration orchestration, settings, dry-run, and mapping sync endpoints.
# @LAYER: Infra
# @RELATION: [DEPENDS_ON] ->[backend.src.dependencies]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.database]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.superset_client]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.migration.dry_run_orchestrator]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.mapping_service]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.dashboard]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.mapping]
# @INVARIANT: Migration endpoints never execute with invalid environment references and always return explicit HTTP errors on guard failures.
# @TEST_CONTRACT: [DashboardSelection + configured envs] -> [task_id | dry-run result | sync summary]
# @TEST_SCENARIO: [invalid_environment] -> [HTTP_400_or_404]
# @TEST_SCENARIO: [valid_execution] -> [success_payload_with_required_fields]
# @TEST_EDGE: [missing_field] ->[HTTP_400]
# @TEST_EDGE: [invalid_type] ->[validation_error]
# @TEST_EDGE: [external_fail] ->[HTTP_500]
# @TEST_INVARIANT: [EnvironmentValidationBeforeAction] -> VERIFIED_BY: [invalid_environment, valid_execution]
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Dict, Any, Optional
@@ -21,11 +34,11 @@ from ...models.mapping import ResourceMapping
router = APIRouter(prefix="/api", tags=["migration"])
# [DEF:get_dashboards:Function]
# @PURPOSE: Fetch all dashboards from the specified environment for the grid.
# @PRE: Environment ID must be valid.
# @POST: Returns a list of dashboard metadata.
# @PARAM: env_id (str) - The ID of the environment to fetch from.
# @RETURN: List[DashboardMetadata]
# @PURPOSE: Fetch dashboard metadata from a requested environment for migration selection UI.
# @PRE: env_id is provided and exists in configured environments.
# @POST: Returns List[DashboardMetadata] for the resolved environment; emits HTTP_404 when environment is absent.
# @SIDE_EFFECT: Reads environment configuration and performs remote Superset metadata retrieval over network.
# @DATA_CONTRACT: Input[str env_id] -> Output[List[DashboardMetadata]]
@router.get("/environments/{env_id}/dashboards", response_model=List[DashboardMetadata])
async def get_dashboards(
env_id: str,
@@ -44,11 +57,11 @@ async def get_dashboards(
# [/DEF:get_dashboards:Function]
# [DEF:execute_migration:Function]
# @PURPOSE: Execute the migration of selected dashboards.
# @PRE: Selection must be valid and environments must exist.
# @POST: Starts the migration task and returns the task ID.
# @PARAM: selection (DashboardSelection) - The dashboards to migrate.
# @RETURN: Dict - {"task_id": str, "message": str}
# @PURPOSE: Validate migration selection and enqueue asynchronous migration task execution.
# @PRE: DashboardSelection payload is valid and both source/target environments exist.
# @POST: Returns {"task_id": str, "message": str} when task creation succeeds; emits HTTP_400/HTTP_500 on failure.
# @SIDE_EFFECT: Reads configuration, writes task record through task manager, and writes operational logs.
# @DATA_CONTRACT: Input[DashboardSelection] -> Output[Dict[str, str]]
@router.post("/migration/execute")
async def execute_migration(
selection: DashboardSelection,
@@ -86,9 +99,11 @@ async def execute_migration(
# [DEF:dry_run_migration:Function]
# @PURPOSE: Build pre-flight diff and risk summary without applying migration.
# @PRE: Selection and environments are valid.
# @POST: Returns deterministic JSON diff and risk scoring.
# @PURPOSE: Build pre-flight migration diff and risk summary without mutating target systems.
# @PRE: DashboardSelection is valid, source and target environments exist, differ, and selected_ids is non-empty.
# @POST: Returns deterministic dry-run payload; emits HTTP_400 for guard violations and HTTP_500 for orchestrator value errors.
# @SIDE_EFFECT: Reads local mappings from DB and fetches source/target metadata via Superset API.
# @DATA_CONTRACT: Input[DashboardSelection] -> Output[Dict[str, Any]]
@router.post("/migration/dry-run", response_model=Dict[str, Any])
async def dry_run_migration(
selection: DashboardSelection,
@@ -123,7 +138,11 @@ async def dry_run_migration(
# [/DEF:dry_run_migration:Function]
# [DEF:get_migration_settings:Function]
# @PURPOSE: Get current migration Cron string explicitly.
# @PURPOSE: Read and return configured migration synchronization cron expression.
# @PRE: Configuration store is available and requester has READ permission.
# @POST: Returns {"cron": str} reflecting current persisted settings value.
# @SIDE_EFFECT: Reads configuration from config manager.
# @DATA_CONTRACT: Input[None] -> Output[Dict[str, str]]
@router.get("/migration/settings", response_model=Dict[str, str])
async def get_migration_settings(
config_manager=Depends(get_config_manager),
@@ -136,7 +155,11 @@ async def get_migration_settings(
# [/DEF:get_migration_settings:Function]
# [DEF:update_migration_settings:Function]
# @PURPOSE: Update migration Cron string.
# @PURPOSE: Validate and persist migration synchronization cron expression update.
# @PRE: Payload includes "cron" key and requester has WRITE permission.
# @POST: Returns {"cron": str, "status": "updated"} and persists updated cron value.
# @SIDE_EFFECT: Mutates configuration and writes persisted config through config manager.
# @DATA_CONTRACT: Input[Dict[str, str]] -> Output[Dict[str, str]]
@router.put("/migration/settings", response_model=Dict[str, str])
async def update_migration_settings(
payload: Dict[str, str],
@@ -157,7 +180,11 @@ async def update_migration_settings(
# [/DEF:update_migration_settings:Function]
# [DEF:get_resource_mappings:Function]
# @PURPOSE: Fetch synchronized object mappings with search, filtering, and pagination.
# @PURPOSE: Fetch synchronized resource mappings with optional filters and pagination for migration mappings view.
# @PRE: skip>=0, 1<=limit<=500, DB session is active, requester has READ permission.
# @POST: Returns {"items": [...], "total": int} where items reflect applied filters and pagination.
# @SIDE_EFFECT: Executes database read queries against ResourceMapping table.
# @DATA_CONTRACT: Input[QueryParams] -> Output[Dict[str, Any]]
@router.get("/migration/mappings-data", response_model=Dict[str, Any])
async def get_resource_mappings(
skip: int = Query(0, ge=0),
@@ -203,9 +230,11 @@ async def get_resource_mappings(
# [/DEF:get_resource_mappings:Function]
# [DEF:trigger_sync_now:Function]
# @PURPOSE: Triggers an immediate ID synchronization for all environments.
# @PRE: At least one environment must be configured.
# @POST: Environment rows are ensured in DB; sync_environment is called for each.
# @PURPOSE: Trigger immediate ID synchronization for every configured environment.
# @PRE: At least one environment is configured and requester has EXECUTE permission.
# @POST: Returns sync summary with synced/failed counts after attempting all environments.
# @SIDE_EFFECT: Upserts Environment rows, commits DB transaction, performs network sync calls, and writes logs.
# @DATA_CONTRACT: Input[None] -> Output[Dict[str, Any]]
@router.post("/migration/sync-now", response_model=Dict[str, Any])
async def trigger_sync_now(
config_manager=Depends(get_config_manager),

View File

@@ -1,70 +1,79 @@
# [DEF:backend.src.core.auth.repository:Module]
#
# @SEMANTICS: auth, repository, database, user, role
# @PURPOSE: Data access layer for authentication-related entities.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> sqlalchemy
# @RELATION: USES -> backend.src.models.auth
# @TIER: CRITICAL
# @SEMANTICS: auth, repository, database, user, role, permission
# @PURPOSE: Data access layer for authentication and user preference entities.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.auth]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.profile]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.logger.belief_scope]
# @INVARIANT: All database read/write operations must execute via the injected SQLAlchemy session boundary.
#
# @INVARIANT: All database operations must be performed within a session.
# [SECTION: IMPORTS]
from typing import Optional, List
from typing import List, Optional
from sqlalchemy.orm import Session
from ...models.auth import User, Role, Permission
from ...models.auth import Permission, Role, User
from ...models.profile import UserDashboardPreference
from ..logger import belief_scope
# [/SECTION]
# [DEF:AuthRepository:Class]
# @PURPOSE: Encapsulates database operations for authentication.
# @PURPOSE: Encapsulates database operations for authentication-related entities.
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
class AuthRepository:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the repository with a database session.
# @PARAM: db (Session) - SQLAlchemy session.
# @PURPOSE: Bind repository instance to an existing SQLAlchemy session.
# @PRE: db is an initialized sqlalchemy.orm.Session instance.
# @POST: self.db points to the provided session and is used by all repository methods.
# @SIDE_EFFECT: Stores session reference on repository instance state.
# @DATA_CONTRACT: Input[Session] -> Output[None]
def __init__(self, db: Session):
self.db = db
with belief_scope("AuthRepository.__init__"):
self.db = db
# [/DEF:__init__:Function]
# [DEF:get_user_by_username:Function]
# @PURPOSE: Retrieves a user by their username.
# @PRE: username is a string.
# @POST: Returns User object if found, else None.
# @PARAM: username (str) - The username to search for.
# @RETURN: Optional[User] - The found user or None.
# @PURPOSE: Retrieve a user entity by unique username.
# @PRE: username is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching User entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_username(self, username: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_username"):
return self.db.query(User).filter(User.username == username).first()
# [/DEF:get_user_by_username:Function]
# [DEF:get_user_by_id:Function]
# @PURPOSE: Retrieves a user by their unique ID.
# @PRE: user_id is a valid UUID string.
# @POST: Returns User object if found, else None.
# @PARAM: user_id (str) - The user's unique identifier.
# @RETURN: Optional[User] - The found user or None.
# @PURPOSE: Retrieve a user entity by identifier.
# @PRE: user_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching User entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_id(self, user_id: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_id"):
return self.db.query(User).filter(User.id == user_id).first()
# [/DEF:get_user_by_id:Function]
# [DEF:get_role_by_name:Function]
# @PURPOSE: Retrieves a role by its name.
# @PRE: name is a string.
# @POST: Returns Role object if found, else None.
# @PARAM: name (str) - The role name to search for.
# @RETURN: Optional[Role] - The found role or None.
# @PURPOSE: Retrieve a role entity by role name.
# @PRE: name is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Role entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Role]]
def get_role_by_name(self, name: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_name"):
return self.db.query(Role).filter(Role.name == name).first()
# [/DEF:get_role_by_name:Function]
# [DEF:update_last_login:Function]
# @PURPOSE: Updates the last_login timestamp for a user.
# @PRE: user object is a valid User instance.
# @POST: User's last_login is updated in the database.
# @SIDE_EFFECT: Commits the transaction.
# @PARAM: user (User) - The user to update.
# @PURPOSE: Update last_login timestamp for the provided user entity.
# @PRE: user is a managed User instance and self.db is a valid open Session.
# @POST: user.last_login is set to current UTC timestamp and transaction is committed.
# @SIDE_EFFECT: Mutates user entity state and commits database transaction.
# @DATA_CONTRACT: Input[User] -> Output[None]
def update_last_login(self, user: User):
with belief_scope("AuthRepository.update_last_login"):
from datetime import datetime
@@ -74,34 +83,33 @@ class AuthRepository:
# [/DEF:update_last_login:Function]
# [DEF:get_role_by_id:Function]
# @PURPOSE: Retrieves a role by its unique ID.
# @PRE: role_id is a string.
# @POST: Returns Role object if found, else None.
# @PARAM: role_id (str) - The role's unique identifier.
# @RETURN: Optional[Role] - The found role or None.
# @PURPOSE: Retrieve a role entity by identifier.
# @PRE: role_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Role entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Role]]
def get_role_by_id(self, role_id: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_id"):
return self.db.query(Role).filter(Role.id == role_id).first()
# [/DEF:get_role_by_id:Function]
# [DEF:get_permission_by_id:Function]
# @PURPOSE: Retrieves a permission by its unique ID.
# @PRE: perm_id is a string.
# @POST: Returns Permission object if found, else None.
# @PARAM: perm_id (str) - The permission's unique identifier.
# @RETURN: Optional[Permission] - The found permission or None.
# @PURPOSE: Retrieve a permission entity by identifier.
# @PRE: perm_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching Permission entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[Permission]]
def get_permission_by_id(self, perm_id: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_id"):
return self.db.query(Permission).filter(Permission.id == perm_id).first()
# [/DEF:get_permission_by_id:Function]
# [DEF:get_permission_by_resource_action:Function]
# @PURPOSE: Retrieves a permission by resource and action.
# @PRE: resource and action are strings.
# @POST: Returns Permission object if found, else None.
# @PARAM: resource (str) - The resource name.
# @PARAM: action (str) - The action name.
# @RETURN: Optional[Permission] - The found permission or None.
# @PURPOSE: Retrieve a permission entity by resource and action pair.
# @PRE: resource and action are non-empty str values; self.db is a valid open Session.
# @POST: Returns matching Permission entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str, str] -> Output[Optional[Permission]]
def get_permission_by_resource_action(self, resource: str, action: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_resource_action"):
return self.db.query(Permission).filter(
@@ -111,11 +119,11 @@ class AuthRepository:
# [/DEF:get_permission_by_resource_action:Function]
# [DEF:get_user_dashboard_preference:Function]
# @PURPOSE: Retrieves dashboard preference by owner user ID.
# @PRE: user_id is a string.
# @POST: Returns UserDashboardPreference if found, else None.
# @PARAM: user_id (str) - Preference owner identifier.
# @RETURN: Optional[UserDashboardPreference] - Found preference or None.
# @PURPOSE: Retrieve dashboard preference entity owned by specified user.
# @PRE: user_id is a non-empty str and self.db is a valid open Session.
# @POST: Returns matching UserDashboardPreference entity when present, otherwise None.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[str] -> Output[Optional[UserDashboardPreference]]
def get_user_dashboard_preference(self, user_id: str) -> Optional[UserDashboardPreference]:
with belief_scope("AuthRepository.get_user_dashboard_preference"):
return (
@@ -126,11 +134,11 @@ class AuthRepository:
# [/DEF:get_user_dashboard_preference:Function]
# [DEF:save_user_dashboard_preference:Function]
# @PURPOSE: Persists dashboard preference entity and returns refreshed row.
# @PRE: preference is a valid UserDashboardPreference entity.
# @POST: Preference is committed and refreshed in database.
# @PARAM: preference (UserDashboardPreference) - Preference entity to persist.
# @RETURN: UserDashboardPreference - Persisted preference row.
# @PURPOSE: Persist dashboard preference entity and return refreshed persistent row.
# @PRE: preference is a valid UserDashboardPreference entity and self.db is a valid open Session.
# @POST: preference is committed to DB, refreshed from DB state, and returned.
# @SIDE_EFFECT: Performs INSERT/UPDATE commit and refresh via active DB session.
# @DATA_CONTRACT: Input[UserDashboardPreference] -> Output[UserDashboardPreference]
def save_user_dashboard_preference(
self,
preference: UserDashboardPreference,
@@ -143,14 +151,16 @@ class AuthRepository:
# [/DEF:save_user_dashboard_preference:Function]
# [DEF:list_permissions:Function]
# @PURPOSE: Lists all available permissions.
# @POST: Returns a list of all Permission objects.
# @RETURN: List[Permission] - List of permissions.
# @PURPOSE: List all permission entities available in storage.
# @PRE: self.db is a valid open Session.
# @POST: Returns list containing all Permission entities visible to the session.
# @SIDE_EFFECT: Executes read-only SELECT query through active DB session.
# @DATA_CONTRACT: Input[None] -> Output[List[Permission]]
def list_permissions(self) -> List[Permission]:
with belief_scope("AuthRepository.list_permissions"):
return self.db.query(Permission).all()
# [/DEF:list_permissions:Function]
# [/DEF:AuthRepository:Class]
# [/DEF:AuthRepository:Class]
# [/DEF:backend.src.core.auth.repository:Module]

View File

@@ -1,17 +1,17 @@
# [DEF:ConfigManagerModule:Module]
#
# @TIER: STANDARD
# @SEMANTICS: config, manager, persistence, postgresql
# @PURPOSE: Manages application configuration persisted in database with one-time migration from JSON.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> ConfigModels
# @RELATION: DEPENDS_ON -> AppConfigRecord
# @RELATION: CALLS -> logger
# @TIER: CRITICAL
# @SEMANTICS: config, manager, persistence, migration, postgresql
# @PURPOSE: Manages application configuration persistence in DB with one-time migration from legacy JSON.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[ConfigModels]
# @RELATION: [DEPENDS_ON] ->[SessionLocal]
# @RELATION: [DEPENDS_ON] ->[AppConfigRecord]
# @RELATION: [CALLS] ->[logger]
# @RELATION: [CALLS] ->[configure_logger]
# @RELATION: [BINDS_TO] ->[ConfigManager]
# @INVARIANT: Configuration must always be representable by AppConfig and persisted under global record id.
#
# @INVARIANT: Configuration must always be valid according to AppConfig model.
# @PUBLIC_API: ConfigManager
# [SECTION: IMPORTS]
import json
import os
from pathlib import Path
@@ -23,19 +23,18 @@ from .config_models import AppConfig, Environment, GlobalSettings, StorageConfig
from .database import SessionLocal
from ..models.config import AppConfigRecord
from .logger import logger, configure_logger, belief_scope
# [/SECTION]
# [DEF:ConfigManager:Class]
# @TIER: STANDARD
# @PURPOSE: A class to handle application configuration persistence and management.
# @TIER: CRITICAL
# @PURPOSE: Handles application configuration load, validation, mutation, and persistence lifecycle.
class ConfigManager:
# [DEF:__init__:Function]
# @TIER: STANDARD
# @PURPOSE: Initializes the ConfigManager.
# @PRE: isinstance(config_path, str) and len(config_path) > 0
# @POST: self.config is an instance of AppConfig
# @PARAM: config_path (str) - Path to legacy JSON config (used only for initial migration fallback).
# @PURPOSE: Initialize manager state from persisted or migrated configuration.
# @PRE: config_path is a non-empty string path.
# @POST: self.config is initialized as AppConfig and logger is configured.
# @SIDE_EFFECT: Reads config sources and updates logging configuration.
# @DATA_CONTRACT: Input(str config_path) -> Output(None; self.config: AppConfig)
def __init__(self, config_path: str = "config.json"):
with belief_scope("__init__"):
assert isinstance(config_path, str) and config_path, "config_path must be a non-empty string"
@@ -52,18 +51,25 @@ class ConfigManager:
# [/DEF:__init__:Function]
# [DEF:_default_config:Function]
# @PURPOSE: Returns default application configuration.
# @RETURN: AppConfig - Default configuration.
# @PURPOSE: Build default application configuration fallback.
# @PRE: None.
# @POST: Returns valid AppConfig with empty environments and default storage settings.
# @SIDE_EFFECT: None.
# @DATA_CONTRACT: Input(None) -> Output(AppConfig)
def _default_config(self) -> AppConfig:
return AppConfig(
environments=[],
settings=GlobalSettings(storage=StorageConfig()),
)
with belief_scope("_default_config"):
return AppConfig(
environments=[],
settings=GlobalSettings(storage=StorageConfig()),
)
# [/DEF:_default_config:Function]
# [DEF:_load_from_legacy_file:Function]
# @PURPOSE: Loads legacy configuration from config.json for migration fallback.
# @RETURN: AppConfig - Loaded or default configuration.
# @PURPOSE: Load legacy JSON configuration for migration fallback path.
# @PRE: self.config_path is initialized.
# @POST: Returns AppConfig from file payload or safe default.
# @SIDE_EFFECT: Filesystem read and error logging.
# @DATA_CONTRACT: Input(Path self.config_path) -> Output(AppConfig)
def _load_from_legacy_file(self) -> AppConfig:
with belief_scope("_load_from_legacy_file"):
if not self.config_path.exists():
@@ -81,18 +87,22 @@ class ConfigManager:
# [/DEF:_load_from_legacy_file:Function]
# [DEF:_get_record:Function]
# @PURPOSE: Loads config record from DB.
# @PARAM: session (Session) - DB session.
# @RETURN: Optional[AppConfigRecord] - Existing record or None.
# @PURPOSE: Resolve global configuration record from DB.
# @PRE: session is an active SQLAlchemy Session.
# @POST: Returns record when present, otherwise None.
# @SIDE_EFFECT: Database read query.
# @DATA_CONTRACT: Input(Session) -> Output(Optional[AppConfigRecord])
def _get_record(self, session: Session) -> Optional[AppConfigRecord]:
return session.query(AppConfigRecord).filter(AppConfigRecord.id == "global").first()
with belief_scope("_get_record"):
return session.query(AppConfigRecord).filter(AppConfigRecord.id == "global").first()
# [/DEF:_get_record:Function]
# [DEF:_load_config:Function]
# @PURPOSE: Loads the configuration from DB or performs one-time migration from JSON file.
# @PRE: DB session factory is available.
# @POST: isinstance(return, AppConfig)
# @RETURN: AppConfig - Loaded configuration.
# @PURPOSE: Load configuration from DB or perform one-time migration from legacy JSON.
# @PRE: SessionLocal factory is available and AppConfigRecord schema is accessible.
# @POST: Returns valid AppConfig and closes opened DB session.
# @SIDE_EFFECT: Database read/write, possible migration write, logging.
# @DATA_CONTRACT: Input(None) -> Output(AppConfig)
def _load_config(self) -> AppConfig:
with belief_scope("_load_config"):
session: Session = SessionLocal()
@@ -114,11 +124,11 @@ class ConfigManager:
# [/DEF:_load_config:Function]
# [DEF:_save_config_to_db:Function]
# @PURPOSE: Saves the provided configuration object to DB.
# @PRE: isinstance(config, AppConfig)
# @POST: Configuration saved to database.
# @PARAM: config (AppConfig) - The configuration to save.
# @PARAM: session (Optional[Session]) - Existing DB session for transactional reuse.
# @PURPOSE: Persist provided AppConfig into the global DB configuration record.
# @PRE: config is AppConfig; session is either None or an active Session.
# @POST: Global DB record payload equals config.model_dump() when commit succeeds.
# @SIDE_EFFECT: Database insert/update, commit/rollback, logging.
# @DATA_CONTRACT: Input(AppConfig, Optional[Session]) -> Output(None)
def _save_config_to_db(self, config: AppConfig, session: Optional[Session] = None):
with belief_scope("_save_config_to_db"):
assert isinstance(config, AppConfig), "config must be an instance of AppConfig"
@@ -145,27 +155,33 @@ class ConfigManager:
# [/DEF:_save_config_to_db:Function]
# [DEF:save:Function]
# @PURPOSE: Saves the current configuration state to DB.
# @PRE: self.config is set.
# @POST: self._save_config_to_db called.
# @PURPOSE: Persist current in-memory configuration state.
# @PRE: self.config is initialized.
# @POST: Current self.config is written to DB global record.
# @SIDE_EFFECT: Database write and logging via delegated persistence call.
# @DATA_CONTRACT: Input(None; self.config: AppConfig) -> Output(None)
def save(self):
with belief_scope("save"):
self._save_config_to_db(self.config)
# [/DEF:save:Function]
# [DEF:get_config:Function]
# @PURPOSE: Returns the current configuration.
# @RETURN: AppConfig - The current configuration.
# @PURPOSE: Return current in-memory configuration snapshot.
# @PRE: self.config is initialized.
# @POST: Returns AppConfig reference stored in manager.
# @SIDE_EFFECT: None.
# @DATA_CONTRACT: Input(None) -> Output(AppConfig)
def get_config(self) -> AppConfig:
with belief_scope("get_config"):
return self.config
# [/DEF:get_config:Function]
# [DEF:update_global_settings:Function]
# @PURPOSE: Updates the global settings and persists the change.
# @PRE: isinstance(settings, GlobalSettings)
# @POST: self.config.settings updated and saved.
# @PARAM: settings (GlobalSettings) - The new global settings.
# @PURPOSE: Replace global settings and persist the resulting configuration.
# @PRE: settings is GlobalSettings.
# @POST: self.config.settings equals provided settings and DB state is updated.
# @SIDE_EFFECT: Mutates self.config, DB write, logger reconfiguration, logging.
# @DATA_CONTRACT: Input(GlobalSettings) -> Output(None)
def update_global_settings(self, settings: GlobalSettings):
with belief_scope("update_global_settings"):
logger.info("[update_global_settings][Entry] Updating settings")
@@ -178,9 +194,11 @@ class ConfigManager:
# [/DEF:update_global_settings:Function]
# [DEF:validate_path:Function]
# @PURPOSE: Validates if a path exists and is writable.
# @PARAM: path (str) - The path to validate.
# @RETURN: tuple (bool, str) - (is_valid, message)
# @PURPOSE: Validate that path exists and is writable, creating it when absent.
# @PRE: path is a string path candidate.
# @POST: Returns (True, msg) for writable path, else (False, reason).
# @SIDE_EFFECT: Filesystem directory creation attempt and OS permission checks.
# @DATA_CONTRACT: Input(str path) -> Output(tuple[bool, str])
def validate_path(self, path: str) -> tuple[bool, str]:
with belief_scope("validate_path"):
p = os.path.abspath(path)
@@ -197,25 +215,33 @@ class ConfigManager:
# [/DEF:validate_path:Function]
# [DEF:get_environments:Function]
# @PURPOSE: Returns the list of configured environments.
# @RETURN: List[Environment] - List of environments.
# @PURPOSE: Return all configured environments.
# @PRE: self.config is initialized.
# @POST: Returns list of Environment models from current configuration.
# @SIDE_EFFECT: None.
# @DATA_CONTRACT: Input(None) -> Output(List[Environment])
def get_environments(self) -> List[Environment]:
with belief_scope("get_environments"):
return self.config.environments
# [/DEF:get_environments:Function]
# [DEF:has_environments:Function]
# @PURPOSE: Checks if at least one environment is configured.
# @RETURN: bool - True if at least one environment exists.
# @PURPOSE: Check whether at least one environment exists in configuration.
# @PRE: self.config is initialized.
# @POST: Returns True iff environment list length is greater than zero.
# @SIDE_EFFECT: None.
# @DATA_CONTRACT: Input(None) -> Output(bool)
def has_environments(self) -> bool:
with belief_scope("has_environments"):
return len(self.config.environments) > 0
# [/DEF:has_environments:Function]
# [DEF:get_environment:Function]
# @PURPOSE: Returns a single environment by ID.
# @PARAM: env_id (str) - The ID of the environment to retrieve.
# @RETURN: Optional[Environment] - The environment with the given ID, or None.
# @PURPOSE: Resolve a configured environment by identifier.
# @PRE: env_id is string identifier.
# @POST: Returns matching Environment when found; otherwise None.
# @SIDE_EFFECT: None.
# @DATA_CONTRACT: Input(str env_id) -> Output(Optional[Environment])
def get_environment(self, env_id: str) -> Optional[Environment]:
with belief_scope("get_environment"):
for env in self.config.environments:
@@ -225,8 +251,11 @@ class ConfigManager:
# [/DEF:get_environment:Function]
# [DEF:add_environment:Function]
# @PURPOSE: Adds a new environment to the configuration.
# @PARAM: env (Environment) - The environment to add.
# @PURPOSE: Upsert environment by id into configuration and persist.
# @PRE: env is Environment.
# @POST: Configuration contains provided env id with new payload persisted.
# @SIDE_EFFECT: Mutates environment list, DB write, logging.
# @DATA_CONTRACT: Input(Environment) -> Output(None)
def add_environment(self, env: Environment):
with belief_scope("add_environment"):
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
@@ -239,10 +268,11 @@ class ConfigManager:
# [/DEF:add_environment:Function]
# [DEF:update_environment:Function]
# @PURPOSE: Updates an existing environment.
# @PARAM: env_id (str) - The ID of the environment to update.
# @PARAM: updated_env (Environment) - The updated environment data.
# @RETURN: bool - True if updated, False otherwise.
# @PURPOSE: Update existing environment by id and preserve masked password placeholder behavior.
# @PRE: env_id is non-empty string and updated_env is Environment.
# @POST: Returns True and persists update when target exists; else returns False.
# @SIDE_EFFECT: May mutate environment list, DB write, logging.
# @DATA_CONTRACT: Input(str env_id, Environment updated_env) -> Output(bool)
def update_environment(self, env_id: str, updated_env: Environment) -> bool:
with belief_scope("update_environment"):
logger.info(f"[update_environment][Entry] Updating {env_id}")
@@ -264,8 +294,11 @@ class ConfigManager:
# [/DEF:update_environment:Function]
# [DEF:delete_environment:Function]
# @PURPOSE: Deletes an environment by ID.
# @PARAM: env_id (str) - The ID of the environment to delete.
# @PURPOSE: Delete environment by id and persist when deletion occurs.
# @PRE: env_id is non-empty string.
# @POST: Environment is removed when present; otherwise configuration is unchanged.
# @SIDE_EFFECT: May mutate environment list, conditional DB write, logging.
# @DATA_CONTRACT: Input(str env_id) -> Output(None)
def delete_environment(self, env_id: str):
with belief_scope("delete_environment"):
logger.info(f"[delete_environment][Entry] Deleting {env_id}")

View File

@@ -225,7 +225,7 @@ def test_enable_belief_state_flag(caplog):
assert not any("[DisabledFunction][Exit]" in msg for msg in log_messages), "Exit should not be logged when disabled"
# Coherence:OK should still be logged (internal tracking)
assert any("[DisabledFunction][COHERENCE:OK]" in msg for msg in log_messages), "Coherence should still be logged"
# [/DEF:test_enable_belief_state_flag:Function]
# [DEF:test_belief_scope_missing_anchor:Function]

View File

@@ -1,118 +1,170 @@
# [DEF:backend.src.core.migration.risk_assessor:Module]
# @TIER: STANDARD
# @SEMANTICS: migration, dry_run, risk, scoring
# @PURPOSE: Risk evaluation helpers for migration pre-flight reporting.
# @LAYER: Core
# @RELATION: USED_BY -> backend.src.core.migration.dry_run_orchestrator
# @TIER: CRITICAL
# @SEMANTICS: migration, dry_run, risk, scoring, preflight
# @PURPOSE: Compute deterministic migration risk items and aggregate score for dry-run reporting.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[backend.src.core.superset_client.SupersetClient]
# @RELATION: [DISPATCHES] ->[backend.src.core.migration.dry_run_orchestrator.MigrationDryRunService.run]
# @INVARIANT: Risk scoring must remain bounded to [0,100] and preserve severity-to-weight mapping.
# @TEST_CONTRACT: [source_objects,target_objects,diff,target_client] -> [List[RiskItem]]
# @TEST_SCENARIO: [overwrite_update_objects] -> [medium overwrite_existing risk is emitted for each update diff item]
# @TEST_SCENARIO: [missing_datasource_dataset] -> [high missing_datasource risk is emitted]
# @TEST_SCENARIO: [owner_mismatch_dashboard] -> [low owner_mismatch risk is emitted]
# @TEST_EDGE: [missing_field] -> [object without uuid is ignored by indexer]
# @TEST_EDGE: [invalid_type] -> [non-list owners input normalizes to empty identifiers]
# @TEST_EDGE: [external_fail] -> [target_client get_databases exception propagates to caller]
# @TEST_INVARIANT: [score_upper_bound_100] -> VERIFIED_BY: [severity_weight_aggregation]
# @UX_STATE: [Idle] -> [N/A backend domain module]
# @UX_FEEDBACK: [N/A] -> [No direct UI side effects in this module]
# @UX_RECOVERY: [N/A] -> [Caller-level retry/recovery]
# @UX_REACTIVITY: [N/A] -> [Backend synchronous function contracts]
from typing import Any, Dict, List
from ..logger import logger, belief_scope
from ..superset_client import SupersetClient
# [DEF:index_by_uuid:Function]
# @PURPOSE: Build UUID-index from normalized objects.
# @PRE: Input list items are dict-like payloads potentially containing "uuid".
# @POST: Returns mapping keyed by string uuid; only truthy uuid values are included.
# @SIDE_EFFECT: Emits reasoning/reflective logs only.
# @DATA_CONTRACT: List[Dict[str, Any]] -> Dict[str, Dict[str, Any]]
def index_by_uuid(objects: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
indexed: Dict[str, Dict[str, Any]] = {}
for obj in objects:
uuid = obj.get("uuid")
if uuid:
indexed[str(uuid)] = obj
return indexed
with belief_scope("risk_assessor.index_by_uuid"):
logger.reason("Building UUID index", extra={"objects_count": len(objects)})
indexed: Dict[str, Dict[str, Any]] = {}
for obj in objects:
uuid = obj.get("uuid")
if uuid:
indexed[str(uuid)] = obj
logger.reflect("UUID index built", extra={"indexed_count": len(indexed)})
return indexed
# [/DEF:index_by_uuid:Function]
# [DEF:extract_owner_identifiers:Function]
# @PURPOSE: Normalize owner payloads for stable comparison.
# @PRE: Owners may be list payload, scalar values, or None.
# @POST: Returns sorted unique owner identifiers as strings.
# @SIDE_EFFECT: Emits reasoning/reflective logs only.
# @DATA_CONTRACT: Any -> List[str]
def extract_owner_identifiers(owners: Any) -> List[str]:
if not isinstance(owners, list):
return []
ids: List[str] = []
for owner in owners:
if isinstance(owner, dict):
if owner.get("username"):
ids.append(str(owner["username"]))
elif owner.get("id") is not None:
ids.append(str(owner["id"]))
elif owner is not None:
ids.append(str(owner))
return sorted(set(ids))
with belief_scope("risk_assessor.extract_owner_identifiers"):
logger.reason("Normalizing owner identifiers")
if not isinstance(owners, list):
logger.reflect("Owners payload is not list; returning empty identifiers")
return []
ids: List[str] = []
for owner in owners:
if isinstance(owner, dict):
if owner.get("username"):
ids.append(str(owner["username"]))
elif owner.get("id") is not None:
ids.append(str(owner["id"]))
elif owner is not None:
ids.append(str(owner))
normalized_ids = sorted(set(ids))
logger.reflect("Owner identifiers normalized", extra={"owner_count": len(normalized_ids)})
return normalized_ids
# [/DEF:extract_owner_identifiers:Function]
# [DEF:build_risks:Function]
# @PURPOSE: Build risk list from computed diffs and target catalog state.
# @PRE: source_objects/target_objects/diff contain dashboards/charts/datasets keys with expected list structures.
# @PRE: target_client is authenticated/usable for database list retrieval.
# @POST: Returns list of deterministic risk items derived from overwrite, missing datasource, reference, and owner mismatch checks.
# @SIDE_EFFECT: Calls target Superset API for databases metadata and emits logs.
# @DATA_CONTRACT: (
# @DATA_CONTRACT: Dict[str, List[Dict[str, Any]]],
# @DATA_CONTRACT: Dict[str, List[Dict[str, Any]]],
# @DATA_CONTRACT: Dict[str, Dict[str, List[Dict[str, Any]]]],
# @DATA_CONTRACT: SupersetClient
# @DATA_CONTRACT: ) -> List[Dict[str, Any]]
def build_risks(
source_objects: Dict[str, List[Dict[str, Any]]],
target_objects: Dict[str, List[Dict[str, Any]]],
diff: Dict[str, Dict[str, List[Dict[str, Any]]]],
target_client: SupersetClient,
) -> List[Dict[str, Any]]:
risks: List[Dict[str, Any]] = []
for object_type in ("dashboards", "charts", "datasets"):
for item in diff[object_type]["update"]:
risks.append({
"code": "overwrite_existing",
"severity": "medium",
"object_type": object_type[:-1],
"object_uuid": item["uuid"],
"message": f"Object will be updated in target: {item.get('title') or item['uuid']}",
})
with belief_scope("risk_assessor.build_risks"):
logger.reason("Building migration risks from diff payload")
risks: List[Dict[str, Any]] = []
for object_type in ("dashboards", "charts", "datasets"):
for item in diff[object_type]["update"]:
risks.append({
"code": "overwrite_existing",
"severity": "medium",
"object_type": object_type[:-1],
"object_uuid": item["uuid"],
"message": f"Object will be updated in target: {item.get('title') or item['uuid']}",
})
target_dataset_uuids = set(index_by_uuid(target_objects["datasets"]).keys())
_, target_databases = target_client.get_databases(query={"columns": ["uuid"]})
target_database_uuids = {str(item.get("uuid")) for item in target_databases if item.get("uuid")}
target_dataset_uuids = set(index_by_uuid(target_objects["datasets"]).keys())
_, target_databases = target_client.get_databases(query={"columns": ["uuid"]})
target_database_uuids = {str(item.get("uuid")) for item in target_databases if item.get("uuid")}
for dataset in source_objects["datasets"]:
db_uuid = dataset.get("database_uuid")
if db_uuid and str(db_uuid) not in target_database_uuids:
risks.append({
"code": "missing_datasource",
"severity": "high",
"object_type": "dataset",
"object_uuid": dataset.get("uuid"),
"message": f"Target datasource is missing for dataset {dataset.get('title') or dataset.get('uuid')}",
})
for dataset in source_objects["datasets"]:
db_uuid = dataset.get("database_uuid")
if db_uuid and str(db_uuid) not in target_database_uuids:
risks.append({
"code": "missing_datasource",
"severity": "high",
"object_type": "dataset",
"object_uuid": dataset.get("uuid"),
"message": f"Target datasource is missing for dataset {dataset.get('title') or dataset.get('uuid')}",
})
for chart in source_objects["charts"]:
ds_uuid = chart.get("dataset_uuid")
if ds_uuid and str(ds_uuid) not in target_dataset_uuids:
risks.append({
"code": "breaking_reference",
"severity": "high",
"object_type": "chart",
"object_uuid": chart.get("uuid"),
"message": f"Chart references dataset not found on target: {ds_uuid}",
})
for chart in source_objects["charts"]:
ds_uuid = chart.get("dataset_uuid")
if ds_uuid and str(ds_uuid) not in target_dataset_uuids:
risks.append({
"code": "breaking_reference",
"severity": "high",
"object_type": "chart",
"object_uuid": chart.get("uuid"),
"message": f"Chart references dataset not found on target: {ds_uuid}",
})
source_dash = index_by_uuid(source_objects["dashboards"])
target_dash = index_by_uuid(target_objects["dashboards"])
for item in diff["dashboards"]["update"]:
source_obj = source_dash.get(item["uuid"])
target_obj = target_dash.get(item["uuid"])
if not source_obj or not target_obj:
continue
source_owners = extract_owner_identifiers(source_obj.get("owners"))
target_owners = extract_owner_identifiers(target_obj.get("owners"))
if source_owners and target_owners and source_owners != target_owners:
risks.append({
"code": "owner_mismatch",
"severity": "low",
"object_type": "dashboard",
"object_uuid": item["uuid"],
"message": f"Owner mismatch for dashboard {item.get('title') or item['uuid']}",
})
return risks
source_dash = index_by_uuid(source_objects["dashboards"])
target_dash = index_by_uuid(target_objects["dashboards"])
for item in diff["dashboards"]["update"]:
source_obj = source_dash.get(item["uuid"])
target_obj = target_dash.get(item["uuid"])
if not source_obj or not target_obj:
continue
source_owners = extract_owner_identifiers(source_obj.get("owners"))
target_owners = extract_owner_identifiers(target_obj.get("owners"))
if source_owners and target_owners and source_owners != target_owners:
risks.append({
"code": "owner_mismatch",
"severity": "low",
"object_type": "dashboard",
"object_uuid": item["uuid"],
"message": f"Owner mismatch for dashboard {item.get('title') or item['uuid']}",
})
logger.reflect("Risk list assembled", extra={"risk_count": len(risks)})
return risks
# [/DEF:build_risks:Function]
# [DEF:score_risks:Function]
# @PURPOSE: Aggregate risk list into score and level.
# @PRE: risk_items contains optional severity fields expected in {high,medium,low} or defaults to low weight.
# @POST: Returns dict with score in [0,100], derived level, and original items.
# @SIDE_EFFECT: Emits reasoning/reflective logs only.
# @DATA_CONTRACT: List[Dict[str, Any]] -> Dict[str, Any]
def score_risks(risk_items: List[Dict[str, Any]]) -> Dict[str, Any]:
weights = {"high": 25, "medium": 10, "low": 5}
score = min(100, sum(weights.get(item.get("severity", "low"), 5) for item in risk_items))
level = "low" if score < 25 else "medium" if score < 60 else "high"
return {"score": score, "level": level, "items": risk_items}
with belief_scope("risk_assessor.score_risks"):
logger.reason("Scoring risk items", extra={"risk_items_count": len(risk_items)})
weights = {"high": 25, "medium": 10, "low": 5}
score = min(100, sum(weights.get(item.get("severity", "low"), 5) for item in risk_items))
level = "low" if score < 25 else "medium" if score < 60 else "high"
result = {"score": score, "level": level, "items": risk_items}
logger.reflect("Risk score computed", extra={"score": score, "level": level})
return result
# [/DEF:score_risks:Function]

View File

@@ -1,11 +1,15 @@
# [DEF:backend.src.core.migration_engine:Module]
#
# @SEMANTICS: migration, engine, zip, yaml, transformation
# @PURPOSE: Handles the interception and transformation of Superset asset ZIP archives.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> PyYAML
# @TIER: CRITICAL
# @SEMANTICS: migration, engine, zip, yaml, transformation, cross-filter, id-mapping
# @PURPOSE: Transforms Superset export ZIP archives while preserving archive integrity and patching mapped identifiers.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[src.core.logger]
# @RELATION: [DEPENDS_ON] ->[src.core.mapping_service.IdMappingService]
# @RELATION: [DEPENDS_ON] ->[src.models.mapping.ResourceType]
# @RELATION: [DEPENDS_ON] ->[yaml]
#
# @INVARIANT: ZIP structure must be preserved after transformation.
# @INVARIANT: ZIP structure and non-targeted metadata must remain valid after transformation.
# [SECTION: IMPORTS]
import zipfile
@@ -26,10 +30,15 @@ from src.models.mapping import ResourceType
class MigrationEngine:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the migration engine with optional ID mapping service.
# @PURPOSE: Initializes migration orchestration dependencies for ZIP/YAML metadata transformations.
# @PRE: mapping_service is None or implements batch remote ID lookup for ResourceType.CHART.
# @POST: self.mapping_service is assigned and available for optional cross-filter patching flows.
# @SIDE_EFFECT: Mutates in-memory engine state by storing dependency reference.
# @DATA_CONTRACT: Input[Optional[IdMappingService]] -> Output[MigrationEngine]
# @PARAM: mapping_service (Optional[IdMappingService]) - Used for resolving target environment integer IDs.
def __init__(self, mapping_service: Optional[IdMappingService] = None):
self.mapping_service = mapping_service
with belief_scope("MigrationEngine.__init__"):
self.mapping_service = mapping_service
# [/DEF:__init__:Function]
# [DEF:transform_zip:Function]
@@ -40,9 +49,11 @@ class MigrationEngine:
# @PARAM: strip_databases (bool) - Whether to remove the databases directory from the archive.
# @PARAM: target_env_id (Optional[str]) - Used if fix_cross_filters is True to know which environment map to use.
# @PARAM: fix_cross_filters (bool) - Whether to patch dashboard json_metadata.
# @PRE: zip_path must point to a valid Superset export archive.
# @POST: Transformed archive is saved to output_path.
# @RETURN: bool - True if successful.
# @PRE: zip_path points to a readable ZIP; output_path parent is writable; db_mapping keys/values are UUID strings.
# @POST: Returns True only when extraction, transformation, and packaging complete without exception.
# @SIDE_EFFECT: Reads/writes filesystem archives, creates temporary directory, emits structured logs.
# @DATA_CONTRACT: Input[(str zip_path, str output_path, Dict[str,str] db_mapping, bool strip_databases, Optional[str] target_env_id, bool fix_cross_filters)] -> Output[bool]
# @RETURN: bool - True if successful.
def transform_zip(self, zip_path: str, output_path: str, db_mapping: Dict[str, str], strip_databases: bool = True, target_env_id: Optional[str] = None, fix_cross_filters: bool = False) -> bool:
"""
Transform a Superset export ZIP by replacing database UUIDs and optionally fixing cross-filters.
@@ -105,48 +116,60 @@ class MigrationEngine:
# @PURPOSE: Replaces database_uuid in a single YAML file.
# @PARAM: file_path (Path) - Path to the YAML file.
# @PARAM: db_mapping (Dict[str, str]) - UUID mapping dictionary.
# @PRE: file_path must exist and be readable.
# @POST: File is modified in-place if source UUID matches mapping.
# @PRE: file_path exists, is readable YAML, and db_mapping contains source->target UUID pairs.
# @POST: database_uuid is replaced in-place only when source UUID is present in db_mapping.
# @SIDE_EFFECT: Reads and conditionally rewrites YAML file on disk.
# @DATA_CONTRACT: Input[(Path file_path, Dict[str,str] db_mapping)] -> Output[None]
def _transform_yaml(self, file_path: Path, db_mapping: Dict[str, str]):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
with belief_scope("MigrationEngine._transform_yaml"):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
if not data:
return
if not data:
return
# Superset dataset YAML structure:
# database_uuid: ...
source_uuid = data.get('database_uuid')
if source_uuid in db_mapping:
data['database_uuid'] = db_mapping[source_uuid]
with open(file_path, 'w') as f:
yaml.dump(data, f)
# Superset dataset YAML structure:
# database_uuid: ...
source_uuid = data.get('database_uuid')
if source_uuid in db_mapping:
data['database_uuid'] = db_mapping[source_uuid]
with open(file_path, 'w') as f:
yaml.dump(data, f)
# [/DEF:_transform_yaml:Function]
# [DEF:_extract_chart_uuids_from_archive:Function]
# @PURPOSE: Scans the unpacked ZIP to map local exported integer IDs back to their UUIDs.
# @PARAM: temp_dir (Path) - Root dir of unpacked archive
# @PURPOSE: Scans extracted chart YAML files and builds a source chart ID to UUID lookup map.
# @PRE: temp_dir exists and points to extracted archive root with optional chart YAML resources.
# @POST: Returns a best-effort Dict[int, str] containing only parseable chart id/uuid pairs.
# @SIDE_EFFECT: Reads chart YAML files from filesystem; suppresses per-file parsing failures.
# @DATA_CONTRACT: Input[Path] -> Output[Dict[int,str]]
# @PARAM: temp_dir (Path) - Root dir of unpacked archive.
# @RETURN: Dict[int, str] - Mapping of source Integer ID to UUID.
def _extract_chart_uuids_from_archive(self, temp_dir: Path) -> Dict[int, str]:
# Implementation Note: This is a placeholder for the logic that extracts
# actual Source IDs. In a real scenario, this involves parsing chart YAMLs
# or manifesting the export metadata structure where source IDs are stored.
# For simplicity in US1 MVP, we assume it's read from chart files if present.
mapping = {}
chart_files = list(temp_dir.glob("**/charts/**/*.yaml")) + list(temp_dir.glob("**/charts/*.yaml"))
for cf in set(chart_files):
try:
with open(cf, 'r') as f:
cdata = yaml.safe_load(f)
if cdata and 'id' in cdata and 'uuid' in cdata:
mapping[cdata['id']] = cdata['uuid']
except Exception:
pass
return mapping
with belief_scope("MigrationEngine._extract_chart_uuids_from_archive"):
# Implementation Note: This is a placeholder for the logic that extracts
# actual Source IDs. In a real scenario, this involves parsing chart YAMLs
# or manifesting the export metadata structure where source IDs are stored.
# For simplicity in US1 MVP, we assume it's read from chart files if present.
mapping = {}
chart_files = list(temp_dir.glob("**/charts/**/*.yaml")) + list(temp_dir.glob("**/charts/*.yaml"))
for cf in set(chart_files):
try:
with open(cf, 'r') as f:
cdata = yaml.safe_load(f)
if cdata and 'id' in cdata and 'uuid' in cdata:
mapping[cdata['id']] = cdata['uuid']
except Exception:
pass
return mapping
# [/DEF:_extract_chart_uuids_from_archive:Function]
# [DEF:_patch_dashboard_metadata:Function]
# @PURPOSE: Replaces integer IDs in json_metadata.
# @PURPOSE: Rewrites dashboard json_metadata chart/dataset integer identifiers using target environment mappings.
# @PRE: file_path points to dashboard YAML with json_metadata; target_env_id is non-empty; source_map contains source id->uuid.
# @POST: json_metadata is re-serialized with mapped integer IDs when remote mappings are available; otherwise file remains unchanged.
# @SIDE_EFFECT: Reads/writes YAML file, performs mapping lookup via mapping_service, emits logs for recoverable/terminal failures.
# @DATA_CONTRACT: Input[(Path file_path, str target_env_id, Dict[int,str] source_map)] -> Output[None]
# @PARAM: file_path (Path)
# @PARAM: target_env_id (str)
# @PARAM: source_map (Dict[int, str])

View File

@@ -1,10 +1,12 @@
# [DEF:backend.src.models.config:Module]
#
# @TIER: STANDARD
# @SEMANTICS: database, config, settings, sqlalchemy
# @PURPOSE: Defines database schema for persisted application configuration.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> sqlalchemy
# @TIER: CRITICAL
# @SEMANTICS: database, config, settings, sqlalchemy, notification
# @PURPOSE: Defines SQLAlchemy persistence models for application and notification configuration records.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[sqlalchemy]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.mapping:Base]
# @INVARIANT: Configuration payload and notification credentials must remain persisted as non-null JSON documents.
from sqlalchemy import Column, String, DateTime, JSON, Boolean
from sqlalchemy.sql import func
@@ -13,7 +15,11 @@ from .mapping import Base
# [DEF:AppConfigRecord:Class]
# @PURPOSE: Stores the single source of truth for application configuration.
# @PURPOSE: Stores persisted application configuration as a single authoritative record model.
# @PRE: SQLAlchemy declarative Base is initialized and table metadata registration is active.
# @POST: ORM table 'app_configurations' exposes id, payload, and updated_at fields with declared nullability/default semantics.
# @SIDE_EFFECT: Registers ORM mapping metadata during module import.
# @DATA_CONTRACT: Input -> persistence row {id:str, payload:json, updated_at:datetime}; Output -> AppConfigRecord ORM entity.
class AppConfigRecord(Base):
__tablename__ = "app_configurations"
@@ -25,7 +31,11 @@ class AppConfigRecord(Base):
# [/DEF:AppConfigRecord:Class]
# [DEF:NotificationConfig:Class]
# @PURPOSE: Global settings for external notification providers.
# @PURPOSE: Stores persisted provider-level notification configuration and encrypted credentials metadata.
# @PRE: SQLAlchemy declarative Base is initialized and uuid generation is available at instance creation time.
# @POST: ORM table 'notification_configs' exposes id, type, name, credentials, is_active, created_at, updated_at fields with declared constraints/defaults.
# @SIDE_EFFECT: Registers ORM mapping metadata during module import; may generate UUID values for new entity instances.
# @DATA_CONTRACT: Input -> persistence row {id:str, type:str, name:str, credentials:json, is_active:bool, created_at:datetime, updated_at:datetime}; Output -> NotificationConfig ORM entity.
class NotificationConfig(Base):
__tablename__ = "notification_configs"

View File

@@ -80,6 +80,8 @@ class MigrationJob(Base):
status = Column(SQLEnum(MigrationStatus), default=MigrationStatus.PENDING)
replace_db = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# [/DEF:MigrationJob:Class]
# [DEF:ResourceMapping:Class]
# @TIER: STANDARD
# @PURPOSE: Maps a universal UUID for a resource to its actual ID on a specific environment.

View File

@@ -462,6 +462,7 @@ class CleanReleaseTUI:
self.status = CheckFinalStatus.FAILED
self.refresh_overview()
self.refresh_screen()
# [/DEF:run_checks:Function]
def build_manifest(self):
try:

View File

@@ -291,6 +291,9 @@ def main() -> None:
logger.info(f"[COHERENCE:OK] Result summary: {json.dumps(result, ensure_ascii=True)}")
# [/DEF:main:Function]
if __name__ == "__main__":
main()

View File

@@ -82,4 +82,6 @@ async def test_get_health_summary_empty():
summary = await service.get_health_summary(environment_id="env_none")
assert summary.pass_count == 0
assert len(summary.items) == 0
assert len(summary.items) == 0
# [/DEF:test_health_service:Module]

View File

@@ -1,13 +1,16 @@
# [DEF:backend.src.services.auth_service:Module]
#
# @SEMANTICS: auth, service, business-logic, login, jwt
# @PURPOSE: Orchestrates authentication business logic.
# @LAYER: Service
# @RELATION: USES -> backend.src.core.auth.repository.AuthRepository
# @RELATION: USES -> backend.src.core.auth.security
# @RELATION: USES -> backend.src.core.auth.jwt
# @TIER: CRITICAL
# @SEMANTICS: auth, service, business-logic, login, jwt, adfs, jit-provisioning
# @PURPOSE: Orchestrates credential authentication and ADFS JIT user provisioning.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[backend.src.core.auth.repository.AuthRepository]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.auth.security.verify_password]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.auth.jwt.create_access_token]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.auth.User]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.auth.Role]
#
# @INVARIANT: Authentication must verify both credentials and account status.
# @INVARIANT: Authentication succeeds only for active users with valid credentials; issued sessions encode subject and scopes from assigned roles.
# [SECTION: IMPORTS]
from typing import Dict, Any
@@ -23,20 +26,25 @@ from ..core.logger import belief_scope
# @PURPOSE: Provides high-level authentication services.
class AuthService:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the service with a database session.
# @PARAM: db (Session) - SQLAlchemy session.
# @PURPOSE: Initializes the authentication service with repository access over an active DB session.
# @PRE: db is a valid SQLAlchemy Session instance bound to the auth persistence context.
# @POST: self.repo is initialized and ready for auth user/role CRUD operations.
# @SIDE_EFFECT: Allocates AuthRepository and binds it to the provided Session.
# @DATA_CONTRACT: Input(Session) -> Model(AuthRepository)
# @PARAM: db (Session) - SQLAlchemy session.
def __init__(self, db: Session):
self.repo = AuthRepository(db)
# [/DEF:__init__:Function]
# [DEF:authenticate_user:Function]
# @PURPOSE: Authenticates a user with username and password.
# @PRE: username and password are provided.
# @POST: Returns User object if authentication succeeds, else None.
# @SIDE_EFFECT: Updates last_login timestamp on success.
# @PARAM: username (str) - The username.
# @PARAM: password (str) - The plain password.
# @RETURN: Optional[User] - The authenticated user or None.
# @PURPOSE: Validates credentials and account state for local username/password authentication.
# @PRE: username and password are non-empty credential inputs.
# @POST: Returns User only when user exists, is active, and password hash verification succeeds; otherwise returns None.
# @SIDE_EFFECT: Persists last_login update for successful authentications via repository.
# @DATA_CONTRACT: Input(str username, str password) -> Output(User | None)
# @PARAM: username (str) - The username.
# @PARAM: password (str) - The plain password.
# @RETURN: Optional[User] - The authenticated user or None.
def authenticate_user(self, username: str, password: str):
with belief_scope("AuthService.authenticate_user"):
user = self.repo.get_user_by_username(username)
@@ -54,11 +62,13 @@ class AuthService:
# [/DEF:authenticate_user:Function]
# [DEF:create_session:Function]
# @PURPOSE: Creates a JWT session for an authenticated user.
# @PRE: user is a valid User object.
# @POST: Returns a dictionary with access_token and token_type.
# @PARAM: user (User) - The authenticated user.
# @RETURN: Dict[str, str] - Session data.
# @PURPOSE: Issues an access token payload for an already authenticated user.
# @PRE: user is a valid User entity containing username and iterable roles with role.name values.
# @POST: Returns session dict with non-empty access_token and token_type='bearer'.
# @SIDE_EFFECT: Generates signed JWT via auth JWT provider.
# @DATA_CONTRACT: Input(User) -> Output(Dict[str, str]{access_token, token_type})
# @PARAM: user (User) - The authenticated user.
# @RETURN: Dict[str, str] - Session data.
def create_session(self, user) -> Dict[str, str]:
with belief_scope("AuthService.create_session"):
# Collect role names for scopes
@@ -77,11 +87,13 @@ class AuthService:
# [/DEF:create_session:Function]
# [DEF:provision_adfs_user:Function]
# @PURPOSE: Just-In-Time (JIT) provisioning for ADFS users based on group mappings.
# @PRE: user_info contains 'upn' (username), 'email', and 'groups'.
# @POST: User is created/updated and assigned roles based on groups.
# @PARAM: user_info (Dict[str, Any]) - Claims from ADFS token.
# @RETURN: User - The provisioned user.
# @PURPOSE: Performs ADFS Just-In-Time provisioning and role synchronization from AD group mappings.
# @PRE: user_info contains identity claims where at least one of 'upn' or 'email' is present; 'groups' may be absent.
# @POST: Returns persisted user entity with roles synchronized to mapped AD groups and refreshed state.
# @SIDE_EFFECT: May insert new User, mutate user.roles, commit transaction, and refresh ORM state.
# @DATA_CONTRACT: Input(Dict[str, Any]{upn|email, email, groups[]}) -> Output(User persisted)
# @PARAM: user_info (Dict[str, Any]) - Claims from ADFS token.
# @RETURN: User - The provisioned user.
def provision_adfs_user(self, user_info: Dict[str, Any]) -> User:
with belief_scope("AuthService.provision_adfs_user"):
username = user_info.get("upn") or user_info.get("email")

View File

@@ -22,3 +22,6 @@ def test_audit_check_run(mock_logger):
def test_audit_report(mock_logger):
audit_report("rep-1", "cand-1")
mock_logger.info.assert_called_with("[EXPLORE] clean-release report_id=rep-1 candidate=cand-1")
# [/DEF:backend.tests.services.clean_release.test_audit_service:Module]

View File

@@ -3,7 +3,7 @@
# @SEMANTICS: tests, clean-release, preparation, flow
# @PURPOSE: Validate release candidate preparation flow, including policy evaluation and manifest persisting.
# @LAYER: Domain
# @RELATION: TESTS -> backend.src.services.clean_release.preparation_service
# @RELATION: [DEPENDS_ON] ->[backend.src.services.clean_release.preparation_service:Module]
# @INVARIANT: Candidate preparation always persists manifest and candidate status deterministically.
import pytest
@@ -21,6 +21,8 @@ from src.models.clean_release import (
)
from src.services.clean_release.preparation_service import prepare_candidate
# [DEF:backend.tests.services.clean_release.test_preparation_service._mock_policy:Function]
# @PURPOSE: Build a valid clean profile policy fixture for preparation tests.
def _mock_policy() -> CleanProfilePolicy:
return CleanProfilePolicy(
policy_id="pol-1",
@@ -33,7 +35,10 @@ def _mock_policy() -> CleanProfilePolicy:
effective_from=datetime.now(timezone.utc),
profile=ProfileType.ENTERPRISE_CLEAN,
)
# [/DEF:backend.tests.services.clean_release.test_preparation_service._mock_policy:Function]
# [DEF:backend.tests.services.clean_release.test_preparation_service._mock_registry:Function]
# @PURPOSE: Build an internal-only source registry fixture for preparation tests.
def _mock_registry() -> ResourceSourceRegistry:
return ResourceSourceRegistry(
registry_id="reg-1",
@@ -42,7 +47,10 @@ def _mock_registry() -> ResourceSourceRegistry:
updated_at=datetime.now(timezone.utc),
updated_by="tester"
)
# [/DEF:backend.tests.services.clean_release.test_preparation_service._mock_registry:Function]
# [DEF:backend.tests.services.clean_release.test_preparation_service._mock_candidate:Function]
# @PURPOSE: Build a draft release candidate fixture with provided identifier.
def _mock_candidate(candidate_id: str) -> ReleaseCandidate:
return ReleaseCandidate(
candidate_id=candidate_id,
@@ -53,7 +61,15 @@ def _mock_candidate(candidate_id: str) -> ReleaseCandidate:
created_by="tester",
source_snapshot_ref="v1.0.0-snapshot"
)
# [/DEF:backend.tests.services.clean_release.test_preparation_service._mock_candidate:Function]
# [DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_success:Function]
# @PURPOSE: Verify candidate transitions to PREPARED when evaluation returns no violations.
# @TEST_CONTRACT: [valid_candidate + active_policy + internal_sources + no_violations] -> [status=PREPARED, manifest_persisted, candidate_saved]
# @TEST_SCENARIO: [prepare_success] -> [prepared status and persistence side effects are produced]
# @TEST_FIXTURE: [INLINE_MOCKS] -> INLINE_JSON
# @TEST_EDGE: [external_fail] -> [none; dependency interactions mocked and successful]
# @TEST_INVARIANT: [prepared_flow_persists_state] -> VERIFIED_BY: [prepare_success]
def test_prepare_candidate_success():
# Setup
repository = MagicMock()
@@ -82,7 +98,15 @@ def test_prepare_candidate_success():
assert candidate.status == ReleaseCandidateStatus.PREPARED
repository.save_manifest.assert_called_once()
repository.save_candidate.assert_called_with(candidate)
# [/DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_success:Function]
# [DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_with_violations:Function]
# @PURPOSE: Verify candidate transitions to BLOCKED when evaluation returns blocking violations.
# @TEST_CONTRACT: [valid_candidate + active_policy + evaluation_with_violations] -> [status=BLOCKED, violations_exposed]
# @TEST_SCENARIO: [prepare_blocked_due_to_policy] -> [blocked status and violation list are produced]
# @TEST_FIXTURE: [INLINE_MOCKS] -> INLINE_JSON
# @TEST_EDGE: [external_fail] -> [none; dependency interactions mocked and successful]
# @TEST_INVARIANT: [blocked_flow_reports_violations] -> VERIFIED_BY: [prepare_blocked_due_to_policy]
def test_prepare_candidate_with_violations():
# Setup
repository = MagicMock()
@@ -110,14 +134,30 @@ def test_prepare_candidate_with_violations():
assert result["status"] == ReleaseCandidateStatus.BLOCKED.value
assert candidate.status == ReleaseCandidateStatus.BLOCKED
assert len(result["violations"]) == 1
# [/DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_with_violations:Function]
# [DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_not_found:Function]
# @PURPOSE: Verify preparation raises ValueError when candidate does not exist.
# @TEST_CONTRACT: [missing_candidate] -> [ValueError('Candidate not found')]
# @TEST_SCENARIO: [prepare_missing_candidate] -> [raises candidate not found error]
# @TEST_FIXTURE: [INLINE_MOCKS] -> INLINE_JSON
# @TEST_EDGE: [missing_field] -> [candidate lookup returns None]
# @TEST_INVARIANT: [missing_candidate_is_rejected] -> VERIFIED_BY: [prepare_missing_candidate]
def test_prepare_candidate_not_found():
repository = MagicMock()
repository.get_candidate.return_value = None
with pytest.raises(ValueError, match="Candidate not found"):
prepare_candidate(repository, "non-existent", [], [], "op")
# [/DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_not_found:Function]
# [DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_no_active_policy:Function]
# @PURPOSE: Verify preparation raises ValueError when no active policy is available.
# @TEST_CONTRACT: [candidate_present + missing_active_policy] -> [ValueError('Active clean policy not found')]
# @TEST_SCENARIO: [prepare_missing_policy] -> [raises active policy missing error]
# @TEST_FIXTURE: [INLINE_MOCKS] -> INLINE_JSON
# @TEST_EDGE: [invalid_type] -> [policy dependency resolves to None]
# @TEST_INVARIANT: [active_policy_required] -> VERIFIED_BY: [prepare_missing_policy]
def test_prepare_candidate_no_active_policy():
repository = MagicMock()
repository.get_candidate.return_value = _mock_candidate("cand-1")
@@ -125,3 +165,7 @@ def test_prepare_candidate_no_active_policy():
with pytest.raises(ValueError, match="Active clean policy not found"):
prepare_candidate(repository, "cand-1", [], [], "op")
# [/DEF:backend.tests.services.clean_release.test_preparation_service.test_prepare_candidate_no_active_policy:Function]
# [/DEF:backend.tests.services.clean_release.test_preparation_service:Module]

View File

@@ -55,4 +55,6 @@ def test_validate_internal_sources_external_blocked():
assert result["ok"] is False
assert len(result["violations"]) == 1
assert result["violations"][0]["category"] == "external-source"
assert result["violations"][0]["blocked_release"] is True
assert result["violations"][0]["blocked_release"] is True
# [/DEF:backend.tests.services.clean_release.test_source_isolation:Module]

View File

@@ -25,3 +25,6 @@ def test_derive_final_status_failed_skipped():
results = [CheckStageResult(stage=s, status=CheckStageStatus.PASS, details="ok") for s in MANDATORY_STAGE_ORDER]
results[2].status = CheckStageStatus.SKIPPED
assert derive_final_status(results) == CheckFinalStatus.FAILED
# [/DEF:backend.tests.services.clean_release.test_stages:Module]

View File

@@ -35,89 +35,117 @@ from ...models.clean_release import (
from .policy_engine import CleanPolicyEngine
from .repository import CleanReleaseRepository
from .stages import derive_final_status
from ...core.logger import belief_scope
# [DEF:CleanComplianceOrchestrator:Class]
# @PURPOSE: Coordinate clean-release compliance verification stages.
class CleanComplianceOrchestrator:
# [DEF:CleanComplianceOrchestrator.__init__:Function]
# @PURPOSE: Bind repository dependency used for orchestrator persistence and lookups.
# @PRE: repository is a valid CleanReleaseRepository instance with required methods.
# @POST: self.repository is assigned and used by all orchestration steps.
# @SIDE_EFFECT: Stores repository reference on orchestrator instance.
# @DATA_CONTRACT: Input -> CleanReleaseRepository, Output -> None
def __init__(self, repository: CleanReleaseRepository):
self.repository = repository
with belief_scope("CleanComplianceOrchestrator.__init__"):
self.repository = repository
# [/DEF:CleanComplianceOrchestrator.__init__:Function]
# [DEF:start_check_run:Function]
# @PURPOSE: Initiate a new compliance run session.
# @PRE: candidate_id and policy_id must exist in repository.
# @POST: Returns initialized ComplianceRun in RUNNING state.
# @PRE: candidate_id/policy_id/manifest_id identify existing records in repository.
# @POST: Returns initialized ComplianceRun in RUNNING state persisted in repository.
# @SIDE_EFFECT: Reads manifest/policy and writes new ComplianceRun via repository.save_check_run.
# @DATA_CONTRACT: Input -> (candidate_id:str, policy_id:str, requested_by:str, manifest_id:str), Output -> ComplianceRun
def start_check_run(self, candidate_id: str, policy_id: str, requested_by: str, manifest_id: str) -> ComplianceRun:
manifest = self.repository.get_manifest(manifest_id)
policy = self.repository.get_policy(policy_id)
if not manifest or not policy:
raise ValueError("Manifest or Policy not found")
with belief_scope("start_check_run"):
manifest = self.repository.get_manifest(manifest_id)
policy = self.repository.get_policy(policy_id)
if not manifest or not policy:
raise ValueError("Manifest or Policy not found")
check_run = ComplianceRun(
id=f"check-{uuid4()}",
candidate_id=candidate_id,
manifest_id=manifest_id,
manifest_digest=manifest.manifest_digest,
policy_snapshot_id=policy_id,
registry_snapshot_id=policy.registry_snapshot_id,
requested_by=requested_by,
requested_at=datetime.now(timezone.utc),
status=RunStatus.RUNNING,
)
return self.repository.save_check_run(check_run)
check_run = ComplianceRun(
id=f"check-{uuid4()}",
candidate_id=candidate_id,
manifest_id=manifest_id,
manifest_digest=manifest.manifest_digest,
policy_snapshot_id=policy_id,
registry_snapshot_id=policy.registry_snapshot_id,
requested_by=requested_by,
requested_at=datetime.now(timezone.utc),
status=RunStatus.RUNNING,
)
return self.repository.save_check_run(check_run)
# [/DEF:start_check_run:Function]
# [DEF:execute_stages:Function]
# @PURPOSE: Execute or accept compliance stage outcomes and set intermediate/final check-run status fields.
# @PRE: check_run exists and references candidate/policy/registry/manifest identifiers resolvable by repository.
# @POST: Returns persisted ComplianceRun with status FAILED on missing dependencies, otherwise SUCCEEDED with final_status set.
# @SIDE_EFFECT: Reads candidate/policy/registry/manifest and persists updated check_run.
# @DATA_CONTRACT: Input -> (check_run:ComplianceRun, forced_results:Optional[List[ComplianceStageRun]]), Output -> ComplianceRun
def execute_stages(self, check_run: ComplianceRun, forced_results: Optional[List[ComplianceStageRun]] = None) -> ComplianceRun:
if forced_results is not None:
# In a real scenario, we'd persist these stages.
with belief_scope("execute_stages"):
if forced_results is not None:
# In a real scenario, we'd persist these stages.
return self.repository.save_check_run(check_run)
# Real Logic Integration
candidate = self.repository.get_candidate(check_run.candidate_id)
policy = self.repository.get_policy(check_run.policy_snapshot_id)
if not candidate or not policy:
check_run.status = RunStatus.FAILED
return self.repository.save_check_run(check_run)
registry = self.repository.get_registry(check_run.registry_snapshot_id)
manifest = self.repository.get_manifest(check_run.manifest_id)
if not registry or not manifest:
check_run.status = RunStatus.FAILED
return self.repository.save_check_run(check_run)
# Simulate stage execution and violation detection
# 1. DATA_PURITY
summary = manifest.content_json.get("summary", {})
purity_ok = summary.get("prohibited_detected_count", 0) == 0
if not purity_ok:
check_run.final_status = ComplianceDecision.BLOCKED
else:
check_run.final_status = ComplianceDecision.PASSED
check_run.status = RunStatus.SUCCEEDED
check_run.finished_at = datetime.now(timezone.utc)
return self.repository.save_check_run(check_run)
# Real Logic Integration
candidate = self.repository.get_candidate(check_run.candidate_id)
policy = self.repository.get_policy(check_run.policy_snapshot_id)
if not candidate or not policy:
check_run.status = RunStatus.FAILED
return self.repository.save_check_run(check_run)
registry = self.repository.get_registry(check_run.registry_snapshot_id)
manifest = self.repository.get_manifest(check_run.manifest_id)
if not registry or not manifest:
check_run.status = RunStatus.FAILED
return self.repository.save_check_run(check_run)
# Simulate stage execution and violation detection
# 1. DATA_PURITY
summary = manifest.content_json.get("summary", {})
purity_ok = summary.get("prohibited_detected_count", 0) == 0
if not purity_ok:
check_run.final_status = ComplianceDecision.BLOCKED
else:
check_run.final_status = ComplianceDecision.PASSED
check_run.status = RunStatus.SUCCEEDED
check_run.finished_at = datetime.now(timezone.utc)
return self.repository.save_check_run(check_run)
# [/DEF:execute_stages:Function]
# [DEF:finalize_run:Function]
# @PURPOSE: Finalize run status based on cumulative stage results.
# @POST: Status derivation follows strict MANDATORY_STAGE_ORDER.
# @PRE: check_run was started and may already contain a derived final_status from stage execution.
# @POST: Returns persisted ComplianceRun in SUCCEEDED status with final_status guaranteed non-empty.
# @SIDE_EFFECT: Mutates check_run terminal fields and persists via repository.save_check_run.
# @DATA_CONTRACT: Input -> ComplianceRun, Output -> ComplianceRun
def finalize_run(self, check_run: ComplianceRun) -> ComplianceRun:
# If not already set by execute_stages
if not check_run.final_status:
check_run.final_status = ComplianceDecision.PASSED
check_run.status = RunStatus.SUCCEEDED
check_run.finished_at = datetime.now(timezone.utc)
return self.repository.save_check_run(check_run)
with belief_scope("finalize_run"):
# If not already set by execute_stages
if not check_run.final_status:
check_run.final_status = ComplianceDecision.PASSED
check_run.status = RunStatus.SUCCEEDED
check_run.finished_at = datetime.now(timezone.utc)
return self.repository.save_check_run(check_run)
# [/DEF:finalize_run:Function]
# [/DEF:CleanComplianceOrchestrator:Class]
# [DEF:run_check_legacy:Function]
# @PURPOSE: Legacy wrapper for compatibility with previous orchestrator call style.
# @PRE: Candidate/policy/manifest identifiers are valid for repository.
# @POST: Returns finalized ComplianceRun produced by orchestrator.
# @PRE: repository and identifiers are valid and resolvable by orchestrator dependencies.
# @POST: Returns finalized ComplianceRun produced by orchestrator start->execute->finalize sequence.
# @SIDE_EFFECT: Reads/writes compliance entities through repository during orchestrator calls.
# @DATA_CONTRACT: Input -> (repository:CleanReleaseRepository, candidate_id:str, policy_id:str, requested_by:str, manifest_id:str), Output -> ComplianceRun
def run_check_legacy(
repository: CleanReleaseRepository,
candidate_id: str,
@@ -125,14 +153,15 @@ def run_check_legacy(
requested_by: str,
manifest_id: str,
) -> ComplianceRun:
orchestrator = CleanComplianceOrchestrator(repository)
run = orchestrator.start_check_run(
candidate_id=candidate_id,
policy_id=policy_id,
requested_by=requested_by,
manifest_id=manifest_id,
)
run = orchestrator.execute_stages(run)
return orchestrator.finalize_run(run)
with belief_scope("run_check_legacy"):
orchestrator = CleanComplianceOrchestrator(repository)
run = orchestrator.start_check_run(
candidate_id=candidate_id,
policy_id=policy_id,
requested_by=requested_by,
manifest_id=manifest_id,
)
run = orchestrator.execute_stages(run)
return orchestrator.finalize_run(run)
# [/DEF:run_check_legacy:Function]
# [/DEF:backend.src.services.clean_release.compliance_orchestrator:Module]