Fix LLM validation and dashboard health hot paths

This commit is contained in:
2026-03-15 13:18:51 +03:00
parent 3928455189
commit a8563a8369
24 changed files with 1398 additions and 83 deletions

View File

@@ -4,21 +4,138 @@
# @PURPOSE: Business logic for aggregating dashboard health status from validation records.
# @LAYER: Domain/Service
# @RELATION: DEPENDS_ON -> ValidationRecord
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client.SupersetClient
# @RELATION: DEPENDS_ON -> backend.src.core.task_manager.cleanup.TaskCleanupService
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
import os
from ..models.llm import ValidationRecord
from ..schemas.health import DashboardHealthItem, HealthSummaryResponse
from ..core.logger import logger
from ..core.superset_client import SupersetClient
from ..core.task_manager.cleanup import TaskCleanupService
from ..core.task_manager import TaskManager
# [DEF:HealthService:Class]
# @TIER: STANDARD
# @PURPOSE: Aggregate latest dashboard validation state and manage persisted health report lifecycle.
# @RELATION: CALLS -> backend.src.core.superset_client.SupersetClient
# @RELATION: CALLS -> backend.src.core.task_manager.cleanup.TaskCleanupService
class HealthService:
"""
@PURPOSE: Service for managing and querying dashboard health data.
"""
def __init__(self, db: Session):
# [DEF:HealthService.__init__:Function]
# @PURPOSE: Initialize health service with DB session and optional config access for dashboard metadata resolution.
# @PRE: db is a valid SQLAlchemy session.
# @POST: Service is ready to aggregate summaries and delete health reports.
def __init__(self, db: Session, config_manager = None):
self.db = db
self.config_manager = config_manager
self._dashboard_meta_cache: Dict[Tuple[str, str], Dict[str, Optional[str]]] = {}
# [/DEF:HealthService.__init__:Function]
# [DEF:HealthService._prime_dashboard_meta_cache:Function]
# @PURPOSE: Warm dashboard slug/title cache with one Superset list fetch per environment.
# @PRE: records may contain mixed numeric and slug dashboard identifiers.
# @POST: Numeric dashboard ids for known environments are cached when discoverable.
# @SIDE_EFFECT: May call Superset dashboard list API once per referenced environment.
def _prime_dashboard_meta_cache(self, records: List[ValidationRecord]) -> None:
if not self.config_manager or not records:
return
numeric_ids_by_env: Dict[str, set[str]] = {}
for record in records:
environment_id = str(record.environment_id or "").strip()
dashboard_id = str(record.dashboard_id or "").strip()
if not environment_id or not dashboard_id or not dashboard_id.isdigit():
continue
cache_key = (environment_id, dashboard_id)
if cache_key in self._dashboard_meta_cache:
continue
numeric_ids_by_env.setdefault(environment_id, set()).add(dashboard_id)
if not numeric_ids_by_env:
return
environments = {
str(getattr(env, "id", "")).strip(): env
for env in self.config_manager.get_environments()
if str(getattr(env, "id", "")).strip()
}
for environment_id, dashboard_ids in numeric_ids_by_env.items():
env = environments.get(environment_id)
if not env:
for dashboard_id in dashboard_ids:
self._dashboard_meta_cache[(environment_id, dashboard_id)] = {
"slug": None,
"title": None,
}
continue
try:
dashboards = SupersetClient(env).get_dashboards_summary()
dashboard_meta_map = {
str(item.get("id")): {
"slug": item.get("slug"),
"title": item.get("title"),
}
for item in dashboards
if str(item.get("id") or "").strip()
}
for dashboard_id in dashboard_ids:
self._dashboard_meta_cache[(environment_id, dashboard_id)] = dashboard_meta_map.get(
dashboard_id,
{"slug": None, "title": None},
)
except Exception as exc:
logger.warning(
"[HealthService][_prime_dashboard_meta_cache] Failed to preload dashboard metadata for env=%s: %s",
environment_id,
exc,
)
for dashboard_id in dashboard_ids:
self._dashboard_meta_cache[(environment_id, dashboard_id)] = {
"slug": None,
"title": None,
}
# [/DEF:HealthService._prime_dashboard_meta_cache:Function]
# [DEF:HealthService._resolve_dashboard_meta:Function]
# @PURPOSE: Resolve slug/title for a dashboard referenced by persisted validation record.
# @PRE: dashboard_id may be numeric or slug-like; environment_id may be empty.
# @POST: Returns dict with `slug` and `title` keys, using cache when possible.
# @SIDE_EFFECT: May call Superset API through SupersetClient.
def _resolve_dashboard_meta(self, dashboard_id: str, environment_id: Optional[str]) -> Dict[str, Optional[str]]:
normalized_dashboard_id = str(dashboard_id or "").strip()
normalized_environment_id = str(environment_id or "").strip()
if not normalized_dashboard_id:
return {"slug": None, "title": None}
if not normalized_dashboard_id.isdigit():
return {"slug": normalized_dashboard_id, "title": None}
if not self.config_manager or not normalized_environment_id:
return {"slug": None, "title": None}
cache_key = (normalized_environment_id, normalized_dashboard_id)
cached = self._dashboard_meta_cache.get(cache_key)
if cached is not None:
return cached
meta = {"slug": None, "title": None}
self._dashboard_meta_cache[cache_key] = meta
return meta
# [/DEF:HealthService._resolve_dashboard_meta:Function]
# [DEF:HealthService.get_health_summary:Function]
# @PURPOSE: Aggregate latest validation status per dashboard and enrich rows with dashboard slug/title.
# @PRE: environment_id may be omitted to aggregate across all environments.
# @POST: Returns HealthSummaryResponse with counts and latest record row per dashboard.
# @SIDE_EFFECT: May call Superset API to resolve dashboard metadata.
async def get_health_summary(self, environment_id: str = None) -> HealthSummaryResponse:
"""
@PURPOSE: Aggregates the latest validation status for all dashboards.
@@ -44,6 +161,8 @@ class HealthService:
records = query.all()
self._prime_dashboard_meta_cache(records)
items = []
pass_count = 0
warn_count = 0
@@ -62,8 +181,12 @@ class HealthService:
unknown_count += 1
status = "UNKNOWN"
meta = self._resolve_dashboard_meta(rec.dashboard_id, rec.environment_id)
items.append(DashboardHealthItem(
record_id=rec.id,
dashboard_id=rec.dashboard_id,
dashboard_slug=meta.get("slug"),
dashboard_title=meta.get("title"),
environment_id=rec.environment_id or "unknown",
status=status,
last_check=rec.timestamp,
@@ -80,5 +203,82 @@ class HealthService:
fail_count=fail_count,
unknown_count=unknown_count
)
# [/DEF:HealthService.get_health_summary:Function]
# [/DEF:health_service:Module]
# [DEF:HealthService.delete_validation_report:Function]
# @PURPOSE: Delete one persisted health report and optionally clean linked task/log artifacts.
# @PRE: record_id is a validation record identifier.
# @POST: Returns True only when a matching record was deleted.
# @SIDE_EFFECT: Deletes DB row, optional screenshot file, and optional task/log persistence.
def delete_validation_report(self, record_id: str, task_manager: Optional[TaskManager] = None) -> bool:
record = self.db.query(ValidationRecord).filter(ValidationRecord.id == record_id).first()
if not record:
return False
peer_query = self.db.query(ValidationRecord).filter(
ValidationRecord.dashboard_id == record.dashboard_id
)
if record.environment_id is None:
peer_query = peer_query.filter(ValidationRecord.environment_id.is_(None))
else:
peer_query = peer_query.filter(ValidationRecord.environment_id == record.environment_id)
records_to_delete = peer_query.all()
screenshot_paths = [
str(item.screenshot_path or "").strip()
for item in records_to_delete
if str(item.screenshot_path or "").strip()
]
task_ids = {
str(item.task_id or "").strip()
for item in records_to_delete
if str(item.task_id or "").strip()
}
logger.info(
"[HealthService][delete_validation_report] Removing %s validation record(s) for dashboard=%s environment=%s triggered_by_record=%s",
len(records_to_delete),
record.dashboard_id,
record.environment_id,
record_id,
)
for item in records_to_delete:
self.db.delete(item)
self.db.commit()
for screenshot_path in screenshot_paths:
try:
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
except OSError as exc:
logger.warning(
"[HealthService][delete_validation_report] Failed to remove screenshot %s: %s",
screenshot_path,
exc,
)
if task_ids and task_manager and self.config_manager:
try:
cleanup_service = TaskCleanupService(
task_manager.persistence_service,
task_manager.log_persistence_service,
self.config_manager,
)
for task_id in task_ids:
task_manager.tasks.pop(task_id, None)
cleanup_service.delete_task_with_logs(task_id)
except Exception as exc:
logger.warning(
"[HealthService][delete_validation_report] Failed to cleanup linked task/logs for dashboard=%s environment=%s: %s",
record.dashboard_id,
record.environment_id,
exc,
)
return True
# [/DEF:HealthService.delete_validation_report:Function]
# [/DEF:HealthService:Class]
# [/DEF:health_service:Module]