Files
ss-tools/backend/src/core/config_manager.py

569 lines
23 KiB
Python

# [DEF:ConfigManager:Module]
#
# @COMPLEXITY: 5
# @SEMANTICS: config, manager, persistence, migration, postgresql
# @PURPOSE: Manages application configuration persistence in DB with one-time migration from legacy JSON.
# @LAYER: Domain
# @PRE: Database schema for AppConfigRecord must be initialized.
# @POST: Configuration is loaded into memory and logger is configured.
# @SIDE_EFFECT: Performs DB I/O and may update global logging level.
# @DATA_CONTRACT: Input[json, record] -> Model[AppConfig]
# @INVARIANT: Configuration must always be representable by AppConfig and persisted under global record id.
# @RELATION: [DEPENDS_ON] ->[AppConfig]
# @RELATION: [DEPENDS_ON] ->[SessionLocal]
# @RELATION: [DEPENDS_ON] ->[AppConfigRecord]
# @RELATION: [CALLS] ->[logger]
# @RELATION: [CALLS] ->[configure_logger]
#
import json
import os
from pathlib import Path
from typing import Any, Optional, List
from sqlalchemy.orm import Session
from .config_models import AppConfig, Environment, GlobalSettings
from .database import SessionLocal
from ..models.config import AppConfigRecord
from ..models.mapping import Environment as EnvironmentRecord
from .logger import logger, configure_logger, belief_scope
# [DEF:ConfigManager:Class]
# @COMPLEXITY: 5
# @PURPOSE: Handles application configuration load, validation, mutation, and persistence lifecycle.
# @PRE: Database is accessible and AppConfigRecord schema is loaded.
# @POST: Configuration state is synchronized between memory and database.
# @SIDE_EFFECT: Performs DB I/O, OS path validation, and logger reconfiguration.
class ConfigManager:
# [DEF:__init__:Function]
# @PURPOSE: Initialize manager state from persisted or migrated configuration.
# @PRE: config_path is a non-empty string path.
# @POST: self.config is initialized as AppConfig and logger is configured.
# @SIDE_EFFECT: Reads config sources and updates logging configuration.
# @DATA_CONTRACT: Input(str config_path) -> Output(None; self.config: AppConfig)
def __init__(self, config_path: str = "config.json"):
with belief_scope("ConfigManager.__init__"):
if not isinstance(config_path, str) or not config_path:
logger.explore(
"Invalid config_path provided", extra={"path": config_path}
)
raise ValueError("config_path must be a non-empty string")
logger.reason(f"Initializing ConfigManager with legacy path: {config_path}")
self.config_path = Path(config_path)
self.raw_payload: dict[str, Any] = {}
self.config: AppConfig = self._load_config()
configure_logger(self.config.settings.logging)
if not isinstance(self.config, AppConfig):
logger.explore(
"Config loading resulted in invalid type",
extra={"type": type(self.config)},
)
raise TypeError("self.config must be an instance of AppConfig")
logger.reflect("ConfigManager initialization complete")
# [/DEF:__init__:Function]
# [DEF:_default_config:Function]
# @PURPOSE: Build default application configuration fallback.
def _default_config(self) -> AppConfig:
with belief_scope("ConfigManager._default_config"):
logger.reason("Building default AppConfig fallback")
return AppConfig(environments=[], settings=GlobalSettings())
# [/DEF:_default_config:Function]
# [DEF:_sync_raw_payload_from_config:Function]
# @PURPOSE: Merge typed AppConfig state into raw payload while preserving unsupported legacy sections.
def _sync_raw_payload_from_config(self) -> dict[str, Any]:
with belief_scope("ConfigManager._sync_raw_payload_from_config"):
typed_payload = self.config.model_dump()
merged_payload = dict(self.raw_payload or {})
merged_payload["environments"] = typed_payload.get("environments", [])
merged_payload["settings"] = typed_payload.get("settings", {})
self.raw_payload = merged_payload
logger.reason(
"Synchronized raw payload from typed config",
extra={
"environments_count": len(
merged_payload.get("environments", []) or []
),
"has_settings": "settings" in merged_payload,
"extra_sections": sorted(
key
for key in merged_payload.keys()
if key not in {"environments", "settings"}
),
},
)
return merged_payload
# [/DEF:_sync_raw_payload_from_config:Function]
# [DEF:_load_from_legacy_file:Function]
# @PURPOSE: Load legacy JSON configuration for migration fallback path.
def _load_from_legacy_file(self) -> dict[str, Any]:
with belief_scope("ConfigManager._load_from_legacy_file"):
if not self.config_path.exists():
logger.reason(
"Legacy config file not found; using default payload",
extra={"path": str(self.config_path)},
)
return {}
logger.reason(
"Loading legacy config file", extra={"path": str(self.config_path)}
)
with self.config_path.open("r", encoding="utf-8") as fh:
payload = json.load(fh)
if not isinstance(payload, dict):
logger.explore(
"Legacy config payload is not a JSON object",
extra={
"path": str(self.config_path),
"type": type(payload).__name__,
},
)
raise ValueError("Legacy config payload must be a JSON object")
logger.reason(
"Legacy config file loaded successfully",
extra={"path": str(self.config_path), "keys": sorted(payload.keys())},
)
return payload
# [/DEF:_load_from_legacy_file:Function]
# [DEF:_get_record:Function]
# @PURPOSE: Resolve global configuration record from DB.
def _get_record(self, session: Session) -> Optional[AppConfigRecord]:
with belief_scope("ConfigManager._get_record"):
record = (
session.query(AppConfigRecord)
.filter(AppConfigRecord.id == "global")
.first()
)
logger.reason(
"Resolved app config record", extra={"exists": record is not None}
)
return record
# [/DEF:_get_record:Function]
# [DEF:_load_config:Function]
# @PURPOSE: Load configuration from DB or perform one-time migration from legacy JSON.
def _load_config(self) -> AppConfig:
with belief_scope("ConfigManager._load_config"):
session = SessionLocal()
try:
record = self._get_record(session)
if record and isinstance(record.payload, dict):
logger.reason(
"Loading configuration from database",
extra={"record_id": record.id},
)
self.raw_payload = dict(record.payload)
config = AppConfig.model_validate(
{
"environments": self.raw_payload.get("environments", []),
"settings": self.raw_payload.get("settings", {}),
}
)
self._sync_environment_records(session, config)
session.commit()
logger.reason(
"Database configuration validated successfully",
extra={
"environments_count": len(config.environments),
"payload_keys": sorted(self.raw_payload.keys()),
},
)
return config
logger.reason(
"Database configuration record missing; attempting legacy file migration",
extra={"legacy_path": str(self.config_path)},
)
legacy_payload = self._load_from_legacy_file()
if legacy_payload:
self.raw_payload = dict(legacy_payload)
config = AppConfig.model_validate(
{
"environments": self.raw_payload.get("environments", []),
"settings": self.raw_payload.get("settings", {}),
}
)
logger.reason(
"Legacy payload validated; persisting migrated configuration to database",
extra={
"environments_count": len(config.environments),
"payload_keys": sorted(self.raw_payload.keys()),
},
)
self._save_config_to_db(config, session=session)
return config
logger.reason(
"No persisted config found; falling back to default configuration"
)
config = self._default_config()
self.raw_payload = config.model_dump()
self._save_config_to_db(config, session=session)
return config
except (json.JSONDecodeError, TypeError, ValueError) as exc:
logger.explore(
"Recoverable config load failure; falling back to default configuration",
extra={"error": str(exc), "legacy_path": str(self.config_path)},
)
config = self._default_config()
self.raw_payload = config.model_dump()
return config
except Exception as exc:
logger.explore(
"Critical config load failure; re-raising persistence or validation error",
extra={"error": str(exc)},
)
raise
finally:
session.close()
# [/DEF:_load_config:Function]
# [DEF:_sync_environment_records:Function]
# @PURPOSE: Mirror configured environments into the relational environments table used by FK-backed domain models.
def _sync_environment_records(self, session: Session, config: AppConfig) -> None:
with belief_scope("ConfigManager._sync_environment_records"):
configured_envs = list(config.environments or [])
persisted_records = session.query(EnvironmentRecord).all()
persisted_by_id = {
str(record.id or "").strip(): record for record in persisted_records
}
for environment in configured_envs:
normalized_id = str(environment.id or "").strip()
if not normalized_id:
continue
display_name = (
str(environment.name or normalized_id).strip() or normalized_id
)
normalized_url = str(environment.url or "").strip()
credentials_id = (
str(environment.username or "").strip() or normalized_id
)
record = persisted_by_id.get(normalized_id)
if record is None:
logger.reason(
"Creating relational environment record from typed config",
extra={
"environment_id": normalized_id,
"environment_name": display_name,
},
)
session.add(
EnvironmentRecord(
id=normalized_id,
name=display_name,
url=normalized_url,
credentials_id=credentials_id,
)
)
continue
record.name = display_name
record.url = normalized_url
record.credentials_id = credentials_id
# [/DEF:_sync_environment_records:Function]
# [DEF:_save_config_to_db:Function]
# @PURPOSE: Persist provided AppConfig into the global DB configuration record.
def _save_config_to_db(
self, config: AppConfig, session: Optional[Session] = None
) -> None:
with belief_scope("ConfigManager._save_config_to_db"):
owns_session = session is None
db = session or SessionLocal()
try:
self.config = config
payload = self._sync_raw_payload_from_config()
record = self._get_record(db)
if record is None:
logger.reason("Creating new global app config record")
record = AppConfigRecord(id="global", payload=payload)
db.add(record)
else:
logger.reason(
"Updating existing global app config record",
extra={"record_id": record.id},
)
record.payload = payload
self._sync_environment_records(db, config)
db.commit()
logger.reason(
"Configuration persisted to database",
extra={
"environments_count": len(
payload.get("environments", []) or []
),
"payload_keys": sorted(payload.keys()),
},
)
except Exception:
db.rollback()
logger.explore("Database save failed; transaction rolled back")
raise
finally:
if owns_session:
db.close()
# [/DEF:_save_config_to_db:Function]
# [DEF:save:Function]
# @PURPOSE: Persist current in-memory configuration state.
def save(self) -> None:
with belief_scope("ConfigManager.save"):
logger.reason("Persisting current in-memory configuration")
self._save_config_to_db(self.config)
# [/DEF:save:Function]
# [DEF:get_config:Function]
# @PURPOSE: Return current in-memory configuration snapshot.
def get_config(self) -> AppConfig:
with belief_scope("ConfigManager.get_config"):
return self.config
# [/DEF:get_config:Function]
# [DEF:get_payload:Function]
# @PURPOSE: Return full persisted payload including sections outside typed AppConfig schema.
def get_payload(self) -> dict[str, Any]:
with belief_scope("ConfigManager.get_payload"):
return self._sync_raw_payload_from_config()
# [/DEF:get_payload:Function]
# [DEF:save_config:Function]
# @PURPOSE: Persist configuration provided either as typed AppConfig or raw payload dict.
def save_config(self, config: Any) -> AppConfig:
with belief_scope("ConfigManager.save_config"):
if isinstance(config, AppConfig):
logger.reason("Saving typed AppConfig payload")
self.config = config
self.raw_payload = config.model_dump()
self._save_config_to_db(config)
return self.config
if isinstance(config, dict):
logger.reason(
"Saving raw config payload",
extra={"keys": sorted(config.keys())},
)
self.raw_payload = dict(config)
typed_config = AppConfig.model_validate(
{
"environments": self.raw_payload.get("environments", []),
"settings": self.raw_payload.get("settings", {}),
}
)
self.config = typed_config
self._save_config_to_db(typed_config)
return self.config
logger.explore(
"Unsupported config type supplied to save_config",
extra={"type": type(config).__name__},
)
raise TypeError("config must be AppConfig or dict")
# [/DEF:save_config:Function]
# [DEF:update_global_settings:Function]
# @PURPOSE: Replace global settings and persist the resulting configuration.
def update_global_settings(self, settings: GlobalSettings) -> AppConfig:
with belief_scope("ConfigManager.update_global_settings"):
logger.reason("Updating global settings")
self.config.settings = settings
self.save()
return self.config
# [/DEF:update_global_settings:Function]
# [DEF:validate_path:Function]
# @PURPOSE: Validate that path exists and is writable, creating it when absent.
def validate_path(self, path: str) -> tuple[bool, str]:
with belief_scope("ConfigManager.validate_path", f"path={path}"):
try:
target = Path(path).expanduser()
target.mkdir(parents=True, exist_ok=True)
if not target.exists():
return False, f"Path does not exist: {target}"
if not target.is_dir():
return False, f"Path is not a directory: {target}"
test_file = target / ".write_test"
with test_file.open("w", encoding="utf-8") as fh:
fh.write("ok")
test_file.unlink(missing_ok=True)
logger.reason("Path validation succeeded", extra={"path": str(target)})
return True, "OK"
except Exception as exc:
logger.explore(
"Path validation failed", extra={"path": path, "error": str(exc)}
)
return False, str(exc)
# [/DEF:validate_path:Function]
# [DEF:get_environments:Function]
# @PURPOSE: Return all configured environments.
def get_environments(self) -> List[Environment]:
with belief_scope("ConfigManager.get_environments"):
return list(self.config.environments)
# [/DEF:get_environments:Function]
# [DEF:has_environments:Function]
# @PURPOSE: Check whether at least one environment exists in configuration.
def has_environments(self) -> bool:
with belief_scope("ConfigManager.has_environments"):
return len(self.config.environments) > 0
# [/DEF:has_environments:Function]
# [DEF:get_environment:Function]
# @PURPOSE: Resolve a configured environment by identifier.
def get_environment(self, env_id: str) -> Optional[Environment]:
with belief_scope("ConfigManager.get_environment", f"env_id={env_id}"):
normalized = str(env_id or "").strip()
if not normalized:
return None
for env in self.config.environments:
if env.id == normalized or env.name == normalized:
return env
return None
# [/DEF:get_environment:Function]
# [DEF:add_environment:Function]
# @PURPOSE: Upsert environment by id into configuration and persist.
def add_environment(self, env: Environment) -> AppConfig:
with belief_scope("ConfigManager.add_environment", f"env_id={env.id}"):
existing_index = next(
(
i
for i, item in enumerate(self.config.environments)
if item.id == env.id
),
None,
)
if env.is_default:
for item in self.config.environments:
item.is_default = False
if existing_index is None:
logger.reason("Appending new environment", extra={"env_id": env.id})
self.config.environments.append(env)
else:
logger.reason(
"Replacing existing environment during add",
extra={"env_id": env.id},
)
self.config.environments[existing_index] = env
if len(self.config.environments) == 1 and not any(
item.is_default for item in self.config.environments
):
self.config.environments[0].is_default = True
self.save()
return self.config
# [/DEF:add_environment:Function]
# [DEF:update_environment:Function]
# @PURPOSE: Update existing environment by id and preserve masked password placeholder behavior.
def update_environment(self, env_id: str, env: Environment) -> bool:
with belief_scope("ConfigManager.update_environment", f"env_id={env_id}"):
for index, existing in enumerate(self.config.environments):
if existing.id != env_id:
continue
update_data = env.model_dump()
if update_data.get("password") == "********":
update_data["password"] = existing.password
updated = Environment.model_validate(update_data)
if updated.is_default:
for item in self.config.environments:
item.is_default = False
elif existing.is_default and not updated.is_default:
updated.is_default = True
self.config.environments[index] = updated
logger.reason("Environment updated", extra={"env_id": env_id})
self.save()
return True
logger.explore(
"Environment update skipped; env not found", extra={"env_id": env_id}
)
return False
# [/DEF:update_environment:Function]
# [DEF:delete_environment:Function]
# @PURPOSE: Delete environment by id and persist when deletion occurs.
def delete_environment(self, env_id: str) -> bool:
with belief_scope("ConfigManager.delete_environment", f"env_id={env_id}"):
before = len(self.config.environments)
removed = [env for env in self.config.environments if env.id == env_id]
self.config.environments = [
env for env in self.config.environments if env.id != env_id
]
if len(self.config.environments) == before:
logger.explore(
"Environment delete skipped; env not found",
extra={"env_id": env_id},
)
return False
if removed and removed[0].is_default and self.config.environments:
self.config.environments[0].is_default = True
if self.config.settings.default_environment_id == env_id:
replacement = next(
(env.id for env in self.config.environments if env.is_default), None
)
self.config.settings.default_environment_id = replacement
logger.reason(
"Environment deleted",
extra={"env_id": env_id, "remaining": len(self.config.environments)},
)
self.save()
return True
# [/DEF:delete_environment:Function]
# [/DEF:ConfigManager:Class]
# [/DEF:ConfigManager:Module]