# [DEF:health_service:Module] # @COMPLEXITY: 3 # @SEMANTICS: health, aggregation, dashboards # @PURPOSE: Business logic for aggregating dashboard health status from validation records. # @LAYER: Domain/Service # @RELATION: [DEPENDS_ON] ->[backend.src.models.llm.ValidationRecord] # @RELATION: [DEPENDS_ON] ->[backend.src.core.superset_client.SupersetClient] # @RELATION: [DEPENDS_ON] ->[backend.src.core.task_manager.cleanup.TaskCleanupService] from typing import List, Dict, Any, Optional, Tuple import time from sqlalchemy.orm import Session from sqlalchemy import func, desc import os from ..models.llm import ValidationRecord from ..schemas.health import DashboardHealthItem, HealthSummaryResponse from ..core.logger import logger from ..core.superset_client import SupersetClient from ..core.task_manager.cleanup import TaskCleanupService from ..core.task_manager import TaskManager # [DEF:HealthService:Class] # @COMPLEXITY: 4 # @PURPOSE: Aggregate latest dashboard validation state and manage persisted health report lifecycle. # @RELATION: [DEPENDS_ON] ->[backend.src.models.llm.ValidationRecord] # @RELATION: [DEPENDS_ON] ->[backend.src.schemas.health.DashboardHealthItem] # @RELATION: [DEPENDS_ON] ->[backend.src.schemas.health.HealthSummaryResponse] # @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient] # @RELATION: [CALLS] ->[backend.src.core.task_manager.cleanup.TaskCleanupService] class HealthService: _dashboard_summary_cache: Dict[str, Tuple[float, Dict[str, Dict[str, Optional[str]]]]] = {} _dashboard_summary_cache_ttl_seconds = 60.0 """ @PURPOSE: Service for managing and querying dashboard health data. """ # [DEF:HealthService.__init__:Function] # @COMPLEXITY: 3 # @PURPOSE: Initialize health service with DB session and optional config access for dashboard metadata resolution. # @PRE: db is a valid SQLAlchemy session. # @POST: Service is ready to aggregate summaries and delete health reports. def __init__(self, db: Session, config_manager = None): self.db = db self.config_manager = config_manager self._dashboard_meta_cache: Dict[Tuple[str, str], Dict[str, Optional[str]]] = {} # [/DEF:HealthService.__init__:Function] # [DEF:HealthService._prime_dashboard_meta_cache:Function] # @COMPLEXITY: 3 # @PURPOSE: Warm dashboard slug/title cache with one Superset list fetch per environment. # @PRE: records may contain mixed numeric and slug dashboard identifiers. # @POST: Numeric dashboard ids for known environments are cached when discoverable. # @SIDE_EFFECT: May call Superset dashboard list API once per referenced environment. # @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient.get_dashboards_summary] def _prime_dashboard_meta_cache(self, records: List[ValidationRecord]) -> None: if not self.config_manager or not records: return numeric_ids_by_env: Dict[str, set[str]] = {} for record in records: environment_id = str(record.environment_id or "").strip() dashboard_id = str(record.dashboard_id or "").strip() if not environment_id or not dashboard_id or not dashboard_id.isdigit(): continue cache_key = (environment_id, dashboard_id) if cache_key in self._dashboard_meta_cache: continue numeric_ids_by_env.setdefault(environment_id, set()).add(dashboard_id) if not numeric_ids_by_env: return environments = { str(getattr(env, "id", "")).strip(): env for env in self.config_manager.get_environments() if str(getattr(env, "id", "")).strip() } for environment_id, dashboard_ids in numeric_ids_by_env.items(): env = environments.get(environment_id) if not env: for dashboard_id in dashboard_ids: self._dashboard_meta_cache[(environment_id, dashboard_id)] = { "slug": None, "title": None, } continue try: cached_meta = self.__class__._dashboard_summary_cache.get(environment_id) cache_is_fresh = ( cached_meta is not None and (time.monotonic() - cached_meta[0]) < self.__class__._dashboard_summary_cache_ttl_seconds ) if cache_is_fresh: dashboard_meta_map = cached_meta[1] else: dashboards = SupersetClient(env).get_dashboards_summary() dashboard_meta_map = { str(item.get("id")): { "slug": item.get("slug"), "title": item.get("title"), } for item in dashboards if str(item.get("id") or "").strip() } self.__class__._dashboard_summary_cache[environment_id] = ( time.monotonic(), dashboard_meta_map, ) for dashboard_id in dashboard_ids: self._dashboard_meta_cache[(environment_id, dashboard_id)] = dashboard_meta_map.get( dashboard_id, {"slug": None, "title": None}, ) except Exception as exc: logger.warning( "[HealthService][_prime_dashboard_meta_cache] Failed to preload dashboard metadata for env=%s: %s", environment_id, exc, ) for dashboard_id in dashboard_ids: self._dashboard_meta_cache[(environment_id, dashboard_id)] = { "slug": None, "title": None, } # [/DEF:HealthService._prime_dashboard_meta_cache:Function] # [DEF:HealthService._resolve_dashboard_meta:Function] # @COMPLEXITY: 1 # @PURPOSE: Resolve slug/title for a dashboard referenced by persisted validation record. # @PRE: dashboard_id may be numeric or slug-like; environment_id may be empty. # @POST: Returns dict with `slug` and `title` keys, using cache when possible. def _resolve_dashboard_meta(self, dashboard_id: str, environment_id: Optional[str]) -> Dict[str, Optional[str]]: normalized_dashboard_id = str(dashboard_id or "").strip() normalized_environment_id = str(environment_id or "").strip() if not normalized_dashboard_id: return {"slug": None, "title": None} if not normalized_dashboard_id.isdigit(): return {"slug": normalized_dashboard_id, "title": None} if not self.config_manager or not normalized_environment_id: return {"slug": None, "title": None} cache_key = (normalized_environment_id, normalized_dashboard_id) cached = self._dashboard_meta_cache.get(cache_key) if cached is not None: return cached meta = {"slug": None, "title": None} self._dashboard_meta_cache[cache_key] = meta return meta # [/DEF:HealthService._resolve_dashboard_meta:Function] # [DEF:HealthService.get_health_summary:Function] # @COMPLEXITY: 3 # @PURPOSE: Aggregate latest validation status per dashboard and enrich rows with dashboard slug/title. # @PRE: environment_id may be omitted to aggregate across all environments. # @POST: Returns HealthSummaryResponse with counts and latest record row per dashboard. # @SIDE_EFFECT: May call Superset API to resolve dashboard metadata. # @DATA_CONTRACT: Input[environment_id: Optional[str]] -> Output[HealthSummaryResponse] # @RELATION: [CALLS] ->[self._prime_dashboard_meta_cache] # @RELATION: [CALLS] ->[self._resolve_dashboard_meta] async def get_health_summary(self, environment_id: str = None) -> HealthSummaryResponse: """ @PURPOSE: Aggregates the latest validation status for all dashboards. @PRE: environment_id (optional) to filter by environment. @POST: Returns a HealthSummaryResponse with aggregated status counts and items. """ # [REASON] We need the latest ValidationRecord for each unique dashboard_id. # We use a subquery to find the max timestamp per dashboard_id. subquery = self.db.query( ValidationRecord.dashboard_id, func.max(ValidationRecord.timestamp).label("max_ts") ) if environment_id: subquery = subquery.filter(ValidationRecord.environment_id == environment_id) subquery = subquery.group_by(ValidationRecord.dashboard_id).subquery() query = self.db.query(ValidationRecord).join( subquery, (ValidationRecord.dashboard_id == subquery.c.dashboard_id) & (ValidationRecord.timestamp == subquery.c.max_ts) ) records = query.all() self._prime_dashboard_meta_cache(records) items = [] pass_count = 0 warn_count = 0 fail_count = 0 unknown_count = 0 for rec in records: status = rec.status.upper() if status == "PASS": pass_count += 1 elif status == "WARN": warn_count += 1 elif status == "FAIL": fail_count += 1 else: unknown_count += 1 status = "UNKNOWN" meta = self._resolve_dashboard_meta(rec.dashboard_id, rec.environment_id) items.append(DashboardHealthItem( record_id=rec.id, dashboard_id=rec.dashboard_id, dashboard_slug=meta.get("slug"), dashboard_title=meta.get("title"), environment_id=rec.environment_id or "unknown", status=status, last_check=rec.timestamp, task_id=rec.task_id, summary=rec.summary )) logger.info(f"[HealthService][get_health_summary] Aggregated {len(items)} dashboard health records.") return HealthSummaryResponse( items=items, pass_count=pass_count, warn_count=warn_count, fail_count=fail_count, unknown_count=unknown_count ) # [/DEF:HealthService.get_health_summary:Function] # [DEF:HealthService.delete_validation_report:Function] # @COMPLEXITY: 3 # @PURPOSE: Delete one persisted health report and optionally clean linked task/log artifacts. # @PRE: record_id is a validation record identifier. # @POST: Returns True only when a matching record was deleted. # @SIDE_EFFECT: Deletes DB rows, optional screenshot file, and optional task/log persistence. # @DATA_CONTRACT: Input[record_id: str, task_manager: Optional[TaskManager]] -> Output[bool] # @RELATION: [CALLS] ->[backend.src.core.task_manager.cleanup.TaskCleanupService.delete_task_with_logs] def delete_validation_report(self, record_id: str, task_manager: Optional[TaskManager] = None) -> bool: record = self.db.query(ValidationRecord).filter(ValidationRecord.id == record_id).first() if not record: return False peer_query = self.db.query(ValidationRecord).filter( ValidationRecord.dashboard_id == record.dashboard_id ) if record.environment_id is None: peer_query = peer_query.filter(ValidationRecord.environment_id.is_(None)) else: peer_query = peer_query.filter(ValidationRecord.environment_id == record.environment_id) records_to_delete = peer_query.all() screenshot_paths = [ str(item.screenshot_path or "").strip() for item in records_to_delete if str(item.screenshot_path or "").strip() ] task_ids = { str(item.task_id or "").strip() for item in records_to_delete if str(item.task_id or "").strip() } logger.info( "[HealthService][delete_validation_report] Removing %s validation record(s) for dashboard=%s environment=%s triggered_by_record=%s", len(records_to_delete), record.dashboard_id, record.environment_id, record_id, ) for item in records_to_delete: self.db.delete(item) self.db.commit() for screenshot_path in screenshot_paths: try: if os.path.exists(screenshot_path): os.remove(screenshot_path) except OSError as exc: logger.warning( "[HealthService][delete_validation_report] Failed to remove screenshot %s: %s", screenshot_path, exc, ) if task_ids and task_manager and self.config_manager: try: cleanup_service = TaskCleanupService( task_manager.persistence_service, task_manager.log_persistence_service, self.config_manager, ) for task_id in task_ids: task_manager.tasks.pop(task_id, None) cleanup_service.delete_task_with_logs(task_id) except Exception as exc: logger.warning( "[HealthService][delete_validation_report] Failed to cleanup linked task/logs for dashboard=%s environment=%s: %s", record.dashboard_id, record.environment_id, exc, ) return True # [/DEF:HealthService.delete_validation_report:Function] # [/DEF:HealthService:Class] # [/DEF:health_service:Module]