Files
ss-tools/backend/src/services/health_service.py
2026-03-10 12:00:18 +03:00

84 lines
2.9 KiB
Python

# [DEF:health_service:Module]
# @TIER: STANDARD
# @SEMANTICS: health, aggregation, dashboards
# @PURPOSE: Business logic for aggregating dashboard health status from validation records.
# @LAYER: Domain/Service
# @RELATION: DEPENDS_ON -> ValidationRecord
from typing import List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from ..models.llm import ValidationRecord
from ..schemas.health import DashboardHealthItem, HealthSummaryResponse
from ..core.logger import logger
class HealthService:
"""
@PURPOSE: Service for managing and querying dashboard health data.
"""
def __init__(self, db: Session):
self.db = db
async def get_health_summary(self, environment_id: str = None) -> HealthSummaryResponse:
"""
@PURPOSE: Aggregates the latest validation status for all dashboards.
@PRE: environment_id (optional) to filter by environment.
@POST: Returns a HealthSummaryResponse with aggregated status counts and items.
"""
# [REASON] We need the latest ValidationRecord for each unique dashboard_id.
# We use a subquery to find the max timestamp per dashboard_id.
subquery = self.db.query(
ValidationRecord.dashboard_id,
func.max(ValidationRecord.timestamp).label("max_ts")
)
if environment_id:
subquery = subquery.filter(ValidationRecord.environment_id == environment_id)
subquery = subquery.group_by(ValidationRecord.dashboard_id).subquery()
query = self.db.query(ValidationRecord).join(
subquery,
(ValidationRecord.dashboard_id == subquery.c.dashboard_id) &
(ValidationRecord.timestamp == subquery.c.max_ts)
)
records = query.all()
items = []
pass_count = 0
warn_count = 0
fail_count = 0
unknown_count = 0
for rec in records:
status = rec.status.upper()
if status == "PASS":
pass_count += 1
elif status == "WARN":
warn_count += 1
elif status == "FAIL":
fail_count += 1
else:
unknown_count += 1
status = "UNKNOWN"
items.append(DashboardHealthItem(
dashboard_id=rec.dashboard_id,
environment_id=rec.environment_id or "unknown",
status=status,
last_check=rec.timestamp,
task_id=rec.task_id,
summary=rec.summary
))
logger.info(f"[HealthService][get_health_summary] Aggregated {len(items)} dashboard health records.")
return HealthSummaryResponse(
items=items,
pass_count=pass_count,
warn_count=warn_count,
fail_count=fail_count,
unknown_count=unknown_count
)
# [/DEF:health_service:Module]