код написан

This commit is contained in:
2026-03-10 12:00:18 +03:00
parent a127aa07df
commit 0078c1ae05
57 changed files with 53951 additions and 4909 deletions

View File

@@ -1,189 +0,0 @@
INFO: Will watch for changes in these directories: ['/home/user/ss-tools/backend']
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [7952] using StatReload
INFO: Started server process [7968]
INFO: Waiting for application startup.
INFO: Application startup complete.
Error loading plugin module backup: No module named 'yaml'
Error loading plugin module migration: No module named 'yaml'
INFO: 127.0.0.1:36934 - "HEAD /docs HTTP/1.1" 200 OK
INFO: 127.0.0.1:55006 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:55006 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:55010 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:55010 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:55010 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:55010 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:55010 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:55010 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:55010 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:55010 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:35508 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:35508 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:49820 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:49820 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:49822 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:49822 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:49822 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:49822 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:49908 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:49908 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:49922 - "OPTIONS /settings/environments HTTP/1.1" 200 OK
[2025-12-20 19:14:15,576][INFO][superset_tools_app] [ConfigManager.save_config][Coherence:OK] Configuration saved context={'path': '/home/user/ss-tools/config.json'}
INFO: 127.0.0.1:49922 - "POST /settings/environments HTTP/1.1" 200 OK
INFO: 127.0.0.1:49922 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:49922 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:49922 - "OPTIONS /settings/environments/7071dab6-881f-49a2-b850-c004b3fc11c0/test HTTP/1.1" 200 OK
INFO: 127.0.0.1:36930 - "POST /settings/environments/7071dab6-881f-49a2-b850-c004b3fc11c0/test HTTP/1.1" 500 Internal Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/fastapi/applications.py", line 1135, in __call__
await super().__call__(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/applications.py", line 107, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 186, in __call__
raise exc
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/middleware/errors.py", line 164, in __call__
await self.app(scope, receive, _send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/middleware/cors.py", line 93, in __call__
await self.simple_response(scope, receive, send, request_headers=headers)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/middleware/cors.py", line 144, in simple_response
await self.app(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/routing.py", line 716, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/routing.py", line 736, in app
await route.handle(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/routing.py", line 290, in handle
await self.app(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/fastapi/routing.py", line 118, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/fastapi/routing.py", line 104, in app
response = await f(request)
^^^^^^^^^^^^^^^^
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/fastapi/routing.py", line 428, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/ss-tools/backend/venv/lib/python3.12/site-packages/fastapi/routing.py", line 314, in run_endpoint_function
return await dependant.call(**values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/ss-tools/backend/src/api/routes/settings.py", line 103, in test_connection
import httpx
ModuleNotFoundError: No module named 'httpx'
INFO: 127.0.0.1:45776 - "POST /settings/environments/7071dab6-881f-49a2-b850-c004b3fc11c0/test HTTP/1.1" 200 OK
INFO: 127.0.0.1:45784 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:45784 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41628 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:41628 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41628 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:41628 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:60184 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:60184 - "GET /settings/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:60184 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:60184 - "GET /plugins/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:60184 - "GET /settings HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:60184 - "GET /settings/ HTTP/1.1" 200 OK
WARNING: StatReload detected changes in 'src/core/plugin_loader.py'. Reloading...
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [7968]
INFO: Started server process [12178]
INFO: Waiting for application startup.
INFO: Application startup complete.
WARNING: StatReload detected changes in 'src/dependencies.py'. Reloading...
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [12178]
INFO: Started server process [12451]
INFO: Waiting for application startup.
INFO: Application startup complete.
Plugin 'Superset Dashboard Backup' (ID: superset-backup) loaded successfully.
Plugin 'Superset Dashboard Migration' (ID: superset-migration) loaded successfully.
INFO: 127.0.0.1:37334 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:37334 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO: 127.0.0.1:39932 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:39932 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO: 127.0.0.1:39932 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:39932 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:54900 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:49280 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:49280 - "GET /plugins/ HTTP/1.1" 200 OK
WARNING: StatReload detected changes in 'src/api/routes/plugins.py'. Reloading...
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [12451]
INFO: Started server process [15016]
INFO: Waiting for application startup.
INFO: Application startup complete.
Plugin 'Superset Dashboard Backup' (ID: superset-backup) loaded successfully.
Plugin 'Superset Dashboard Migration' (ID: superset-migration) loaded successfully.
INFO: 127.0.0.1:59340 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
DEBUG: list_plugins called. Found 0 plugins.
INFO: 127.0.0.1:59340 - "GET /plugins/ HTTP/1.1" 200 OK
WARNING: StatReload detected changes in 'src/dependencies.py'. Reloading...
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [15016]
INFO: Started server process [15257]
INFO: Waiting for application startup.
INFO: Application startup complete.
Plugin 'Superset Dashboard Backup' (ID: superset-backup) loaded successfully.
Plugin 'Superset Dashboard Migration' (ID: superset-migration) loaded successfully.
DEBUG: dependencies.py initialized. PluginLoader ID: 139922613090976
DEBUG: dependencies.py initialized. PluginLoader ID: 139922627375088
INFO: 127.0.0.1:57464 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
DEBUG: get_plugin_loader called. Returning PluginLoader ID: 139922627375088
DEBUG: list_plugins called. Found 0 plugins.
INFO: 127.0.0.1:57464 - "GET /plugins/ HTTP/1.1" 200 OK
WARNING: StatReload detected changes in 'src/core/plugin_loader.py'. Reloading...
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [15257]
INFO: Started server process [15533]
INFO: Waiting for application startup.
INFO: Application startup complete.
DEBUG: Loading plugin backup as src.plugins.backup
Plugin 'Superset Dashboard Backup' (ID: superset-backup) loaded successfully.
DEBUG: Loading plugin migration as src.plugins.migration
Plugin 'Superset Dashboard Migration' (ID: superset-migration) loaded successfully.
DEBUG: dependencies.py initialized. PluginLoader ID: 140371031142384
INFO: 127.0.0.1:46470 - "GET /plugins HTTP/1.1" 307 Temporary Redirect
DEBUG: get_plugin_loader called. Returning PluginLoader ID: 140371031142384
DEBUG: list_plugins called. Found 2 plugins.
DEBUG: Plugin: superset-backup
DEBUG: Plugin: superset-migration
INFO: 127.0.0.1:46470 - "GET /plugins/ HTTP/1.1" 200 OK
WARNING: StatReload detected changes in 'src/api/routes/settings.py'. Reloading...
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [15533]
INFO: Started server process [15827]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [15827]
INFO: Stopping reloader process [7952]

View File

@@ -1 +0,0 @@
{"print(f'Length": {"else": "print('Provider not found')\ndb.close()"}}

View File

@@ -120,6 +120,7 @@ INTENT_PERMISSION_CHECKS: Dict[str, List[Tuple[str, str]]] = {
"run_backup": [("plugin:superset-backup", "EXECUTE"), ("plugin:backup", "EXECUTE")],
"run_llm_validation": [("plugin:llm_dashboard_validation", "EXECUTE")],
"run_llm_documentation": [("plugin:llm_documentation", "EXECUTE")],
"get_health_summary": [("plugin:migration", "READ")],
}
@@ -845,6 +846,18 @@ def _parse_command(message: str, config_manager: ConfigManager) -> Dict[str, Any
"requires_confirmation": False,
}
# Health summary
if any(k in lower for k in ["здоровье", "health", "ошибки", "failing", "проблемы"]):
env_match = _extract_id(lower, [r"(?:в|for|env|окружени[ея])\s+([a-z0-9_-]+)"])
return {
"domain": "health",
"operation": "get_health_summary",
"entities": {"environment": env_match},
"confidence": 0.9,
"risk_level": "safe",
"requires_confirmation": False,
}
# LLM validation
if any(k in lower for k in ["валидац", "validate", "провер"]):
env_match = _extract_id(lower, [r"(?:в|for|env|окружени[ея])\s+([a-z0-9_-]+)"])
@@ -1023,6 +1036,15 @@ def _build_tool_catalog(current_user: User, config_manager: ConfigManager, db: S
"risk_level": "guarded",
"requires_confirmation": False,
},
{
"operation": "get_health_summary",
"domain": "health",
"description": "Get summary of dashboard health and failing validations",
"required_entities": [],
"optional_entities": ["environment"],
"risk_level": "safe",
"requires_confirmation": False,
},
]
available: List[Dict[str, Any]] = []
@@ -1056,7 +1078,7 @@ def _coerce_intent_entities(intent: Dict[str, Any]) -> Dict[str, Any]:
# Operations that are read-only and do not require confirmation.
_SAFE_OPS = {"show_capabilities", "get_task_status"}
_SAFE_OPS = {"show_capabilities", "get_task_status", "get_health_summary"}
# [DEF:_confirmation_summary:Function]
@@ -1323,6 +1345,7 @@ async def _dispatch_intent(
"run_llm_validation": "LLM: валидация дашборда",
"run_llm_documentation": "LLM: генерация документации",
"get_task_status": "Статус: проверка задачи",
"get_health_summary": "Здоровье: сводка по дашбордам",
}
available = [labels[t["operation"]] for t in tools_catalog if t["operation"] in labels]
if not available:
@@ -1335,6 +1358,41 @@ async def _dispatch_intent(
)
return text, None, []
if operation == "get_health_summary":
from ...services.health_service import HealthService
env_token = entities.get("environment")
env_id = _resolve_env_id(env_token, config_manager)
service = HealthService(db)
summary = await service.get_health_summary(environment_id=env_id)
env_name = _get_environment_name_by_id(env_id, config_manager) if env_id else "всех окружений"
text = (
f"Сводка здоровья дашбордов для {env_name}:\n"
f"- ✅ Прошли проверку: {summary.pass_count}\n"
f"- ⚠️ С предупреждениями: {summary.warn_count}\n"
f"- ❌ Ошибки валидации: {summary.fail_count}\n"
f"- ❓ Неизвестно: {summary.unknown_count}"
)
actions = [
AssistantAction(type="open_route", label="Открыть Health Center", target="/dashboards/health")
]
if summary.fail_count > 0:
text += "\n\nОбнаружены ошибки в следующих дашбордах:"
for item in summary.items:
if item.status == "FAIL":
text += f"\n- {item.dashboard_id} ({item.environment_id}): {item.summary or 'Нет деталей'}"
actions.append(
AssistantAction(
type="open_route",
label=f"Отчет {item.dashboard_id}",
target=f"/reports/llm/{item.task_id}"
)
)
return text, None, actions[:5] # Limit actions to avoid UI clutter
if operation == "get_task_status":
_check_any_permission(current_user, [("tasks", "READ")])
task_id = entities.get("task_id")

View File

@@ -0,0 +1,31 @@
# [DEF:health_router:Module]
# @TIER: STANDARD
# @SEMANTICS: health, monitoring, dashboards
# @PURPOSE: API endpoints for dashboard health monitoring and status aggregation.
# @LAYER: UI/API
# @RELATION: DEPENDS_ON -> health_service
from fastapi import APIRouter, Depends, Query
from typing import List, Optional
from sqlalchemy.orm import Session
from ...core.database import get_db
from ...services.health_service import HealthService
from ...schemas.health import HealthSummaryResponse
from ...dependencies import has_permission
router = APIRouter(prefix="/api/health", tags=["Health"])
@router.get("/summary", response_model=HealthSummaryResponse)
async def get_health_summary(
environment_id: Optional[str] = Query(None),
db: Session = Depends(get_db),
_ = Depends(has_permission("plugin:migration", "READ"))
):
"""
@PURPOSE: Get aggregated health status for all dashboards.
@POST: Returns HealthSummaryResponse
"""
service = HealthService(db)
return await service.get_health_summary(environment_id=environment_id)
# [/DEF:health_router:Module]

View File

@@ -16,10 +16,14 @@ from pydantic import BaseModel
from ...core.config_models import AppConfig, Environment, GlobalSettings, LoggingConfig
from ...models.storage import StorageConfig
from ...dependencies import get_config_manager, has_permission
from ...core.config_manager import ConfigManager
from ...core.logger import logger, belief_scope
from ...core.superset_client import SupersetClient
from ...services.llm_prompt_templates import normalize_llm_settings
from ...core.config_manager import ConfigManager
from ...core.logger import logger, belief_scope
from ...core.superset_client import SupersetClient
from ...services.llm_prompt_templates import normalize_llm_settings
from ...models.llm import ValidationPolicy
from ...schemas.settings import ValidationPolicyCreate, ValidationPolicyUpdate, ValidationPolicyResponse
from ...core.database import get_db
from sqlalchemy.orm import Session
# [/SECTION]
# [DEF:LoggingConfigResponse:Class]
@@ -31,38 +35,38 @@ class LoggingConfigResponse(BaseModel):
enable_belief_state: bool
# [/DEF:LoggingConfigResponse:Class]
router = APIRouter()
# [DEF:_normalize_superset_env_url:Function]
# @PURPOSE: Canonicalize Superset environment URL to base host/path without trailing /api/v1.
# @PRE: raw_url can be empty.
# @POST: Returns normalized base URL.
def _normalize_superset_env_url(raw_url: str) -> str:
normalized = str(raw_url or "").strip().rstrip("/")
if normalized.lower().endswith("/api/v1"):
normalized = normalized[:-len("/api/v1")]
return normalized.rstrip("/")
# [/DEF:_normalize_superset_env_url:Function]
# [DEF:_validate_superset_connection_fast:Function]
# @PURPOSE: Run lightweight Superset connectivity validation without full pagination scan.
# @PRE: env contains valid URL and credentials.
# @POST: Raises on auth/API failures; returns None on success.
def _validate_superset_connection_fast(env: Environment) -> None:
client = SupersetClient(env)
# 1) Explicit auth check
client.authenticate()
# 2) Single lightweight API call to ensure read access
client.get_dashboards_page(
query={
"page": 0,
"page_size": 1,
"columns": ["id"],
}
)
# [/DEF:_validate_superset_connection_fast:Function]
router = APIRouter()
# [DEF:_normalize_superset_env_url:Function]
# @PURPOSE: Canonicalize Superset environment URL to base host/path without trailing /api/v1.
# @PRE: raw_url can be empty.
# @POST: Returns normalized base URL.
def _normalize_superset_env_url(raw_url: str) -> str:
normalized = str(raw_url or "").strip().rstrip("/")
if normalized.lower().endswith("/api/v1"):
normalized = normalized[:-len("/api/v1")]
return normalized.rstrip("/")
# [/DEF:_normalize_superset_env_url:Function]
# [DEF:_validate_superset_connection_fast:Function]
# @PURPOSE: Run lightweight Superset connectivity validation without full pagination scan.
# @PRE: env contains valid URL and credentials.
# @POST: Raises on auth/API failures; returns None on success.
def _validate_superset_connection_fast(env: Environment) -> None:
client = SupersetClient(env)
# 1) Explicit auth check
client.authenticate()
# 2) Single lightweight API call to ensure read access
client.get_dashboards_page(
query={
"page": 0,
"page_size": 1,
"columns": ["id"],
}
)
# [/DEF:_validate_superset_connection_fast:Function]
# [DEF:get_settings:Function]
# @PURPOSE: Retrieves all application settings.
@@ -70,14 +74,14 @@ def _validate_superset_connection_fast(env: Environment) -> None:
# @POST: Returns masked AppConfig.
# @RETURN: AppConfig - The current configuration.
@router.get("", response_model=AppConfig)
async def get_settings(
async def get_settings(
config_manager: ConfigManager = Depends(get_config_manager),
_ = Depends(has_permission("admin:settings", "READ"))
):
with belief_scope("get_settings"):
logger.info("[get_settings][Entry] Fetching all settings")
config = config_manager.get_config().copy(deep=True)
config.settings.llm = normalize_llm_settings(config.settings.llm)
config = config_manager.get_config().copy(deep=True)
config.settings.llm = normalize_llm_settings(config.settings.llm)
# Mask passwords
for env in config.environments:
if env.password:
@@ -143,18 +147,18 @@ async def update_storage_settings(
# @PRE: Config manager is available.
# @POST: Returns list of environments.
# @RETURN: List[Environment] - List of environments.
@router.get("/environments", response_model=List[Environment])
async def get_environments(
@router.get("/environments", response_model=List[Environment])
async def get_environments(
config_manager: ConfigManager = Depends(get_config_manager),
_ = Depends(has_permission("admin:settings", "READ"))
):
with belief_scope("get_environments"):
logger.info("[get_environments][Entry] Fetching environments")
environments = config_manager.get_environments()
return [
env.copy(update={"url": _normalize_superset_env_url(env.url)})
for env in environments
]
):
with belief_scope("get_environments"):
logger.info("[get_environments][Entry] Fetching environments")
environments = config_manager.get_environments()
return [
env.copy(update={"url": _normalize_superset_env_url(env.url)})
for env in environments
]
# [/DEF:get_environments:Function]
# [DEF:add_environment:Function]
@@ -164,21 +168,21 @@ async def get_environments(
# @PARAM: env (Environment) - The environment to add.
# @RETURN: Environment - The added environment.
@router.post("/environments", response_model=Environment)
async def add_environment(
env: Environment,
async def add_environment(
env: Environment,
config_manager: ConfigManager = Depends(get_config_manager),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("add_environment"):
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
env = env.copy(update={"url": _normalize_superset_env_url(env.url)})
):
with belief_scope("add_environment"):
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
env = env.copy(update={"url": _normalize_superset_env_url(env.url)})
# Validate connection before adding (fast path)
try:
_validate_superset_connection_fast(env)
except Exception as e:
logger.error(f"[add_environment][Coherence:Failed] Connection validation failed: {e}")
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
# Validate connection before adding (fast path)
try:
_validate_superset_connection_fast(env)
except Exception as e:
logger.error(f"[add_environment][Coherence:Failed] Connection validation failed: {e}")
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
config_manager.add_environment(env)
return env
@@ -192,29 +196,29 @@ async def add_environment(
# @PARAM: env (Environment) - The updated environment data.
# @RETURN: Environment - The updated environment.
@router.put("/environments/{id}", response_model=Environment)
async def update_environment(
async def update_environment(
id: str,
env: Environment,
config_manager: ConfigManager = Depends(get_config_manager)
):
):
with belief_scope("update_environment"):
logger.info(f"[update_environment][Entry] Updating environment {id}")
env = env.copy(update={"url": _normalize_superset_env_url(env.url)})
# If password is masked, we need the real one for validation
env_to_validate = env.copy(deep=True)
env = env.copy(update={"url": _normalize_superset_env_url(env.url)})
# If password is masked, we need the real one for validation
env_to_validate = env.copy(deep=True)
if env_to_validate.password == "********":
old_env = next((e for e in config_manager.get_environments() if e.id == id), None)
if old_env:
env_to_validate.password = old_env.password
# Validate connection before updating (fast path)
try:
_validate_superset_connection_fast(env_to_validate)
except Exception as e:
logger.error(f"[update_environment][Coherence:Failed] Connection validation failed: {e}")
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
# Validate connection before updating (fast path)
try:
_validate_superset_connection_fast(env_to_validate)
except Exception as e:
logger.error(f"[update_environment][Coherence:Failed] Connection validation failed: {e}")
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
if config_manager.update_environment(id, env):
return env
@@ -244,7 +248,7 @@ async def delete_environment(
# @PARAM: id (str) - The ID of the environment to test.
# @RETURN: dict - Success message or error.
@router.post("/environments/{id}/test")
async def test_environment_connection(
async def test_environment_connection(
id: str,
config_manager: ConfigManager = Depends(get_config_manager)
):
@@ -256,11 +260,11 @@ async def test_environment_connection(
if not env:
raise HTTPException(status_code=404, detail=f"Environment {id} not found")
try:
_validate_superset_connection_fast(env)
logger.info(f"[test_environment_connection][Coherence:OK] Connection successful for {id}")
return {"status": "success", "message": "Connection successful"}
try:
_validate_superset_connection_fast(env)
logger.info(f"[test_environment_connection][Coherence:OK] Connection successful for {id}")
return {"status": "success", "message": "Connection successful"}
except Exception as e:
logger.error(f"[test_environment_connection][Coherence:Failed] Connection failed for {id}: {e}")
return {"status": "error", "message": str(e)}
@@ -313,13 +317,14 @@ async def update_logging_config(
# [/DEF:update_logging_config:Function]
# [DEF:ConsolidatedSettingsResponse:Class]
class ConsolidatedSettingsResponse(BaseModel):
class ConsolidatedSettingsResponse(BaseModel):
environments: List[dict]
connections: List[dict]
llm: dict
llm_providers: List[dict]
logging: dict
storage: dict
notifications: dict = {}
# [/DEF:ConsolidatedSettingsResponse:Class]
# [DEF:get_consolidated_settings:Function]
@@ -328,7 +333,7 @@ class ConsolidatedSettingsResponse(BaseModel):
# @POST: Returns all consolidated settings.
# @RETURN: ConsolidatedSettingsResponse - All settings categories.
@router.get("/consolidated", response_model=ConsolidatedSettingsResponse)
async def get_consolidated_settings(
async def get_consolidated_settings(
config_manager: ConfigManager = Depends(get_config_manager),
_ = Depends(has_permission("admin:settings", "READ"))
):
@@ -357,16 +362,17 @@ async def get_consolidated_settings(
finally:
db.close()
normalized_llm = normalize_llm_settings(config.settings.llm)
return ConsolidatedSettingsResponse(
environments=[env.dict() for env in config.environments],
connections=config.settings.connections,
llm=normalized_llm,
llm_providers=llm_providers_list,
logging=config.settings.logging.dict(),
storage=config.settings.storage.dict()
)
normalized_llm = normalize_llm_settings(config.settings.llm)
return ConsolidatedSettingsResponse(
environments=[env.dict() for env in config.environments],
connections=config.settings.connections,
llm=normalized_llm,
llm_providers=llm_providers_list,
logging=config.settings.logging.dict(),
storage=config.settings.storage.dict(),
notifications=config.payload.get("notifications", {})
)
# [/DEF:get_consolidated_settings:Function]
# [DEF:update_consolidated_settings:Function]
@@ -389,9 +395,9 @@ async def update_consolidated_settings(
if "connections" in settings_patch:
current_settings.connections = settings_patch["connections"]
# Update LLM if provided
if "llm" in settings_patch:
current_settings.llm = normalize_llm_settings(settings_patch["llm"])
# Update LLM if provided
if "llm" in settings_patch:
current_settings.llm = normalize_llm_settings(settings_patch["llm"])
# Update Logging if provided
if "logging" in settings_patch:
@@ -405,8 +411,88 @@ async def update_consolidated_settings(
raise HTTPException(status_code=400, detail=message)
current_settings.storage = new_storage
if "notifications" in settings_patch:
payload = config_manager.get_payload()
payload["notifications"] = settings_patch["notifications"]
config_manager.save_config(payload)
config_manager.update_global_settings(current_settings)
return {"status": "success", "message": "Settings updated"}
# [/DEF:update_consolidated_settings:Function]
# [DEF:get_validation_policies:Function]
# @PURPOSE: Lists all validation policies.
# @RETURN: List[ValidationPolicyResponse] - List of policies.
@router.get("/automation/policies", response_model=List[ValidationPolicyResponse])
async def get_validation_policies(
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "READ"))
):
with belief_scope("get_validation_policies"):
return db.query(ValidationPolicy).all()
# [/DEF:get_validation_policies:Function]
# [DEF:create_validation_policy:Function]
# @PURPOSE: Creates a new validation policy.
# @PARAM: policy (ValidationPolicyCreate) - The policy data.
# @RETURN: ValidationPolicyResponse - The created policy.
@router.post("/automation/policies", response_model=ValidationPolicyResponse)
async def create_validation_policy(
policy: ValidationPolicyCreate,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("create_validation_policy"):
db_policy = ValidationPolicy(**policy.dict())
db.add(db_policy)
db.commit()
db.refresh(db_policy)
return db_policy
# [/DEF:create_validation_policy:Function]
# [DEF:update_validation_policy:Function]
# @PURPOSE: Updates an existing validation policy.
# @PARAM: id (str) - The ID of the policy to update.
# @PARAM: policy (ValidationPolicyUpdate) - The updated policy data.
# @RETURN: ValidationPolicyResponse - The updated policy.
@router.patch("/automation/policies/{id}", response_model=ValidationPolicyResponse)
async def update_validation_policy(
id: str,
policy: ValidationPolicyUpdate,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("update_validation_policy"):
db_policy = db.query(ValidationPolicy).filter(ValidationPolicy.id == id).first()
if not db_policy:
raise HTTPException(status_code=404, detail="Policy not found")
update_data = policy.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_policy, key, value)
db.commit()
db.refresh(db_policy)
return db_policy
# [/DEF:update_validation_policy:Function]
# [DEF:delete_validation_policy:Function]
# @PURPOSE: Deletes a validation policy.
# @PARAM: id (str) - The ID of the policy to delete.
@router.delete("/automation/policies/{id}")
async def delete_validation_policy(
id: str,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("delete_validation_policy"):
db_policy = db.query(ValidationPolicy).filter(ValidationPolicy.id == id).first()
if not db_policy:
raise HTTPException(status_code=404, detail="Policy not found")
db.delete(db_policy)
db.commit()
return {"message": "Policy deleted"}
# [/DEF:delete_validation_policy:Function]
# [/DEF:SettingsRouter:Module]

View File

@@ -21,7 +21,7 @@ import asyncio
from .dependencies import get_task_manager, get_scheduler_service
from .core.utils.network import NetworkError
from .core.logger import logger, belief_scope
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile, health
from .api import auth
# [DEF:App:Global]
@@ -136,6 +136,7 @@ app.include_router(assistant.router, prefix="/api/assistant", tags=["Assistant"]
app.include_router(clean_release.router)
app.include_router(clean_release_v2.router)
app.include_router(profile.router)
app.include_router(health.router)
# [DEF:api.include_routers:Action]

View File

@@ -0,0 +1,99 @@
import pytest
from datetime import time, date, datetime, timedelta
from src.core.scheduler import ThrottledSchedulerConfigurator
# [DEF:test_throttled_scheduler:Module]
# @TIER: STANDARD
# @PURPOSE: Unit tests for ThrottledSchedulerConfigurator distribution logic.
def test_calculate_schedule_even_distribution():
"""
@TEST_SCENARIO: 3 tasks in a 2-hour window should be spaced 1 hour apart.
"""
start = time(1, 0)
end = time(3, 0)
dashboards = ["d1", "d2", "d3"]
today = date(2024, 1, 1)
schedule = ThrottledSchedulerConfigurator.calculate_schedule(start, end, dashboards, today)
assert len(schedule) == 3
assert schedule[0] == datetime(2024, 1, 1, 1, 0)
assert schedule[1] == datetime(2024, 1, 1, 2, 0)
assert schedule[2] == datetime(2024, 1, 1, 3, 0)
def test_calculate_schedule_midnight_crossing():
"""
@TEST_SCENARIO: Window from 23:00 to 01:00 (next day).
"""
start = time(23, 0)
end = time(1, 0)
dashboards = ["d1", "d2", "d3"]
today = date(2024, 1, 1)
schedule = ThrottledSchedulerConfigurator.calculate_schedule(start, end, dashboards, today)
assert len(schedule) == 3
assert schedule[0] == datetime(2024, 1, 1, 23, 0)
assert schedule[1] == datetime(2024, 1, 2, 0, 0)
assert schedule[2] == datetime(2024, 1, 2, 1, 0)
def test_calculate_schedule_single_task():
"""
@TEST_SCENARIO: Single task should be scheduled at start time.
"""
start = time(1, 0)
end = time(2, 0)
dashboards = ["d1"]
today = date(2024, 1, 1)
schedule = ThrottledSchedulerConfigurator.calculate_schedule(start, end, dashboards, today)
assert len(schedule) == 1
assert schedule[0] == datetime(2024, 1, 1, 1, 0)
def test_calculate_schedule_empty_list():
"""
@TEST_SCENARIO: Empty dashboard list returns empty schedule.
"""
start = time(1, 0)
end = time(2, 0)
dashboards = []
today = date(2024, 1, 1)
schedule = ThrottledSchedulerConfigurator.calculate_schedule(start, end, dashboards, today)
assert schedule == []
def test_calculate_schedule_zero_window():
"""
@TEST_SCENARIO: Window start == end. All tasks at start time.
"""
start = time(1, 0)
end = time(1, 0)
dashboards = ["d1", "d2"]
today = date(2024, 1, 1)
schedule = ThrottledSchedulerConfigurator.calculate_schedule(start, end, dashboards, today)
assert len(schedule) == 2
assert schedule[0] == datetime(2024, 1, 1, 1, 0)
assert schedule[1] == datetime(2024, 1, 1, 1, 0)
def test_calculate_schedule_very_small_window():
"""
@TEST_SCENARIO: Window smaller than number of tasks (in seconds).
"""
start = time(1, 0, 0)
end = time(1, 0, 1) # 1 second window
dashboards = ["d1", "d2", "d3"]
today = date(2024, 1, 1)
schedule = ThrottledSchedulerConfigurator.calculate_schedule(start, end, dashboards, today)
assert len(schedule) == 3
assert schedule[0] == datetime(2024, 1, 1, 1, 0, 0)
assert schedule[1] == datetime(2024, 1, 1, 1, 0, 0, 500000) # 0.5s
assert schedule[2] == datetime(2024, 1, 1, 1, 0, 1)
# [/DEF:test_throttled_scheduler:Module]

View File

@@ -157,6 +157,88 @@ def _ensure_user_dashboard_preferences_columns(bind_engine):
# [/DEF:_ensure_user_dashboard_preferences_columns:Function]
# [DEF:_ensure_user_dashboard_preferences_health_columns:Function]
# @PURPOSE: Applies additive schema upgrades for user_dashboard_preferences table (health fields).
def _ensure_user_dashboard_preferences_health_columns(bind_engine):
with belief_scope("_ensure_user_dashboard_preferences_health_columns"):
table_name = "user_dashboard_preferences"
inspector = inspect(bind_engine)
if table_name not in inspector.get_table_names():
return
existing_columns = {
str(column.get("name") or "").strip()
for column in inspector.get_columns(table_name)
}
alter_statements = []
if "telegram_id" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences ADD COLUMN telegram_id VARCHAR"
)
if "email_address" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences ADD COLUMN email_address VARCHAR"
)
if "notify_on_fail" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences ADD COLUMN notify_on_fail BOOLEAN NOT NULL DEFAULT TRUE"
)
if not alter_statements:
return
try:
with bind_engine.begin() as connection:
for statement in alter_statements:
connection.execute(text(statement))
except Exception as migration_error:
logger.warning(
"[database][EXPLORE] Profile health preference additive migration failed: %s",
migration_error,
)
# [/DEF:_ensure_user_dashboard_preferences_health_columns:Function]
# [DEF:_ensure_llm_validation_results_columns:Function]
# @PURPOSE: Applies additive schema upgrades for llm_validation_results table.
def _ensure_llm_validation_results_columns(bind_engine):
with belief_scope("_ensure_llm_validation_results_columns"):
table_name = "llm_validation_results"
inspector = inspect(bind_engine)
if table_name not in inspector.get_table_names():
return
existing_columns = {
str(column.get("name") or "").strip()
for column in inspector.get_columns(table_name)
}
alter_statements = []
if "task_id" not in existing_columns:
alter_statements.append(
"ALTER TABLE llm_validation_results ADD COLUMN task_id VARCHAR"
)
if "environment_id" not in existing_columns:
alter_statements.append(
"ALTER TABLE llm_validation_results ADD COLUMN environment_id VARCHAR"
)
if not alter_statements:
return
try:
with bind_engine.begin() as connection:
for statement in alter_statements:
connection.execute(text(statement))
except Exception as migration_error:
logger.warning(
"[database][EXPLORE] ValidationRecord additive migration failed: %s",
migration_error,
)
# [/DEF:_ensure_llm_validation_results_columns:Function]
# [DEF:_ensure_git_server_configs_columns:Function]
# @PURPOSE: Applies additive schema upgrades for git_server_configs table.
# @PRE: bind_engine points to application database.
@@ -205,6 +287,8 @@ def init_db():
Base.metadata.create_all(bind=tasks_engine)
Base.metadata.create_all(bind=auth_engine)
_ensure_user_dashboard_preferences_columns(engine)
_ensure_llm_validation_results_columns(engine)
_ensure_user_dashboard_preferences_health_columns(engine)
_ensure_git_server_configs_columns(engine)
# [/DEF:init_db:Function]

View File

@@ -8,9 +8,13 @@
# [SECTION: IMPORTS]
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from .logger import logger, belief_scope
from .config_manager import ConfigManager
from .database import SessionLocal
from ..models.llm import ValidationPolicy
import asyncio
from datetime import datetime, time, timedelta, date
# [/SECTION]
# [DEF:SchedulerService:Class]
@@ -117,4 +121,63 @@ class SchedulerService:
# [/DEF:_trigger_backup:Function]
# [/DEF:SchedulerService:Class]
# [DEF:ThrottledSchedulerConfigurator:Class]
# @TIER: CRITICAL
# @SEMANTICS: scheduler, throttling, distribution
# @PURPOSE: Distributes validation tasks evenly within an execution window.
class ThrottledSchedulerConfigurator:
# [DEF:calculate_schedule:Function]
# @PURPOSE: Calculates execution times for N tasks within a window.
# @PRE: window_start, window_end (time), dashboard_ids (List), current_date (date).
# @POST: Returns List[datetime] of scheduled times.
# @INVARIANT: Tasks are distributed with near-even spacing.
@staticmethod
def calculate_schedule(
window_start: time,
window_end: time,
dashboard_ids: list,
current_date: date
) -> list:
with belief_scope("ThrottledSchedulerConfigurator.calculate_schedule"):
n = len(dashboard_ids)
if n == 0:
return []
start_dt = datetime.combine(current_date, window_start)
end_dt = datetime.combine(current_date, window_end)
# Handle window crossing midnight
if end_dt < start_dt:
end_dt += timedelta(days=1)
total_seconds = (end_dt - start_dt).total_seconds()
# Minimum interval of 1 second to avoid division by zero or negative
if total_seconds <= 0:
logger.warning(f"[calculate_schedule] Window size is zero or negative. Falling back to start time for all {n} tasks.")
return [start_dt] * n
# If window is too small for even distribution (e.g. 10 tasks in 5 seconds),
# we still distribute them but they might be very close.
# The requirement says "near-even spacing".
if n == 1:
return [start_dt]
interval = total_seconds / (n - 1) if n > 1 else 0
# If interval is too small (e.g. < 1s), we might want a fallback,
# but the spec says "handle too-small windows with explicit fallback/warning".
if interval < 1:
logger.warning(f"[calculate_schedule] Window too small for {n} tasks (interval {interval:.2f}s). Tasks will be highly concentrated.")
scheduled_times = []
for i in range(n):
scheduled_times.append(start_dt + timedelta(seconds=i * interval))
return scheduled_times
# [/DEF:calculate_schedule:Function]
# [/DEF:ThrottledSchedulerConfigurator:Class]
# [/DEF:SchedulerModule:Module]

View File

@@ -6,7 +6,7 @@
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> sqlalchemy
from sqlalchemy import Column, String, DateTime, JSON
from sqlalchemy import Column, String, DateTime, JSON, Boolean
from sqlalchemy.sql import func
from .mapping import Base
@@ -23,4 +23,21 @@ class AppConfigRecord(Base):
# [/DEF:AppConfigRecord:Class]
# [DEF:NotificationConfig:Class]
# @PURPOSE: Global settings for external notification providers.
class NotificationConfig(Base):
__tablename__ = "notification_configs"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
type = Column(String, nullable=False) # SMTP, SLACK, TELEGRAM
name = Column(String, nullable=False)
credentials = Column(JSON, nullable=False) # Encrypted connection details
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# [/DEF:NotificationConfig:Class]
import uuid
# [/DEF:backend.src.models.config:Module]

View File

@@ -5,7 +5,7 @@
# @LAYER: Domain
# @RELATION: INHERITS_FROM -> backend.src.models.mapping.Base
from sqlalchemy import Column, String, Boolean, DateTime, JSON, Text
from sqlalchemy import Column, String, Boolean, DateTime, JSON, Text, Time, ForeignKey
from datetime import datetime
import uuid
from .mapping import Base
@@ -13,6 +13,26 @@ from .mapping import Base
def generate_uuid():
return str(uuid.uuid4())
# [DEF:ValidationPolicy:Class]
# @PURPOSE: Defines a scheduled rule for validating a group of dashboards within an execution window.
class ValidationPolicy(Base):
__tablename__ = "validation_policies"
id = Column(String, primary_key=True, default=generate_uuid)
name = Column(String, nullable=False)
environment_id = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
dashboard_ids = Column(JSON, nullable=False) # Array of dashboard IDs
schedule_days = Column(JSON, nullable=False) # Array of integers (0-6)
window_start = Column(Time, nullable=False)
window_end = Column(Time, nullable=False)
notify_owners = Column(Boolean, default=True)
custom_channels = Column(JSON, nullable=True) # List of external channels
alert_condition = Column(String, default="FAIL_ONLY") # FAIL_ONLY, WARN_AND_FAIL, ALWAYS
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# [/DEF:ValidationPolicy:Class]
# [DEF:LLMProvider:Class]
# @PURPOSE: SQLAlchemy model for LLM provider configuration.
class LLMProvider(Base):
@@ -34,9 +54,11 @@ class ValidationRecord(Base):
__tablename__ = "llm_validation_results"
id = Column(String, primary_key=True, default=generate_uuid)
task_id = Column(String, nullable=True, index=True) # Reference to TaskRecord
dashboard_id = Column(String, nullable=False, index=True)
environment_id = Column(String, nullable=True, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
status = Column(String, nullable=False) # PASS, WARN, FAIL
status = Column(String, nullable=False) # PASS, WARN, FAIL, UNKNOWN
screenshot_path = Column(String, nullable=True)
issues = Column(JSON, nullable=False)
summary = Column(Text, nullable=False)

View File

@@ -41,6 +41,10 @@ class UserDashboardPreference(Base):
auto_open_task_drawer = Column(Boolean, nullable=False, default=True)
dashboards_table_density = Column(String, nullable=False, default="comfortable")
telegram_id = Column(String, nullable=True)
email_address = Column(String, nullable=True)
notify_on_fail = Column(Boolean, nullable=False, default=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at = Column(
DateTime,

View File

@@ -112,6 +112,7 @@ class TaskReport(BaseModel):
updated_at: datetime
summary: str
details: Optional[Dict[str, Any]] = None
validation_record: Optional[Dict[str, Any]] = None # Extended for US2
error_context: Optional[ErrorContext] = None
source_ref: Optional[Dict[str, Any]] = None

View File

@@ -21,8 +21,9 @@ from ...services.llm_provider import LLMProviderService
from ...core.superset_client import SupersetClient
from .service import ScreenshotService, LLMClient
from .models import LLMProviderType, ValidationStatus, ValidationResult, DetectedIssue
from ...models.llm import ValidationRecord
from ...models.llm import ValidationRecord, ValidationPolicy
from ...core.task_manager.context import TaskContext
from ...services.notifications.service import NotificationService
from ...services.llm_prompt_templates import (
DEFAULT_LLM_PROMPTS,
is_multimodal_model,
@@ -283,7 +284,9 @@ class DashboardValidationPlugin(PluginBase):
}
db_record = ValidationRecord(
task_id=context.task_id if context else None,
dashboard_id=validation_result.dashboard_id,
environment_id=env_id,
status=validation_result.status.value,
summary=validation_result.summary,
issues=[issue.dict() for issue in validation_result.issues],
@@ -294,11 +297,20 @@ class DashboardValidationPlugin(PluginBase):
db.commit()
# 7. Notification on failure (US1 / FR-015)
if validation_result.status == ValidationStatus.FAIL:
log.warning(f"Dashboard {dashboard_id} validation FAILED. Summary: {validation_result.summary}")
# Placeholder for Email/Pulse notification dispatch
# In a real implementation, we would call a NotificationService here
# with a payload containing the summary and a link to the report.
try:
policy_id = params.get("policy_id")
policy = None
if policy_id:
policy = db.query(ValidationPolicy).filter(ValidationPolicy.id == policy_id).first()
notification_service = NotificationService(db, config_mgr)
await notification_service.dispatch_report(
record=db_record,
policy=policy,
background_tasks=context.background_tasks if context else None
)
except Exception as e:
log.error(f"Failed to dispatch notifications: {e}")
# Final log to ensure all analysis is visible in task logs
log.info(f"Validation completed for dashboard {dashboard_id}. Status: {validation_result.status.value}")

View File

@@ -0,0 +1,84 @@
# [DEF:backend.src.schemas.__tests__.test_settings_and_health_schemas:Module]
# @TIER: STANDARD
# @PURPOSE: Regression tests for settings and health schema contracts updated in 026 fix batch.
import pytest
from pydantic import ValidationError
from src.schemas.health import DashboardHealthItem
from src.schemas.settings import ValidationPolicyCreate
# [DEF:test_validation_policy_create_accepts_structured_custom_channels:Function]
# @PURPOSE: Ensure policy schema accepts structured custom channel objects with type/target fields.
def test_validation_policy_create_accepts_structured_custom_channels():
payload = {
"name": "Daily Health",
"environment_id": "env-1",
"dashboard_ids": ["10", "11"],
"schedule_days": [0, 1, 2],
"window_start": "01:00:00",
"window_end": "03:00:00",
"notify_owners": True,
"custom_channels": [{"type": "SLACK", "target": "#alerts"}],
"alert_condition": "FAIL_ONLY",
}
policy = ValidationPolicyCreate(**payload)
assert policy.custom_channels is not None
assert len(policy.custom_channels) == 1
assert policy.custom_channels[0].type == "SLACK"
assert policy.custom_channels[0].target == "#alerts"
# [/DEF:test_validation_policy_create_accepts_structured_custom_channels:Function]
# [DEF:test_validation_policy_create_rejects_legacy_string_custom_channels:Function]
# @PURPOSE: Ensure legacy list[str] custom channel payload is rejected by typed channel contract.
def test_validation_policy_create_rejects_legacy_string_custom_channels():
payload = {
"name": "Daily Health",
"environment_id": "env-1",
"dashboard_ids": ["10"],
"schedule_days": [0],
"window_start": "01:00:00",
"window_end": "02:00:00",
"notify_owners": False,
"custom_channels": ["SLACK:#alerts"],
}
with pytest.raises(ValidationError):
ValidationPolicyCreate(**payload)
# [/DEF:test_validation_policy_create_rejects_legacy_string_custom_channels:Function]
# [DEF:test_dashboard_health_item_status_accepts_only_whitelisted_values:Function]
# @PURPOSE: Verify strict grouped regex only accepts PASS/WARN/FAIL/UNKNOWN exact statuses.
def test_dashboard_health_item_status_accepts_only_whitelisted_values():
valid = DashboardHealthItem(
dashboard_id="dash-1",
environment_id="env-1",
status="PASS",
last_check="2026-03-10T10:00:00",
)
assert valid.status == "PASS"
with pytest.raises(ValidationError):
DashboardHealthItem(
dashboard_id="dash-1",
environment_id="env-1",
status="PASSING",
last_check="2026-03-10T10:00:00",
)
with pytest.raises(ValidationError):
DashboardHealthItem(
dashboard_id="dash-1",
environment_id="env-1",
status="FAIL ",
last_check="2026-03-10T10:00:00",
)
# [/DEF:test_dashboard_health_item_status_accepts_only_whitelisted_values:Function]
# [/DEF:backend.src.schemas.__tests__.test_settings_and_health_schemas:Module]

View File

@@ -0,0 +1,33 @@
# [DEF:backend.src.schemas.health:Module]
# @TIER: STANDARD
# @SEMANTICS: health, schemas, pydantic
# @PURPOSE: Pydantic schemas for dashboard health summary.
# @LAYER: Domain
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
# [DEF:DashboardHealthItem:Class]
# @PURPOSE: Represents the latest health status of a single dashboard.
class DashboardHealthItem(BaseModel):
dashboard_id: str
dashboard_title: Optional[str] = None
environment_id: str
status: str = Field(..., pattern="^(PASS|WARN|FAIL|UNKNOWN)$")
last_check: datetime
task_id: Optional[str] = None
summary: Optional[str] = None
# [/DEF:DashboardHealthItem:Class]
# [DEF:HealthSummaryResponse:Class]
# @PURPOSE: Aggregated health summary for all dashboards.
class HealthSummaryResponse(BaseModel):
items: List[DashboardHealthItem]
pass_count: int
warn_count: int
fail_count: int
unknown_count: int
# [/DEF:HealthSummaryResponse:Class]
# [/DEF:backend.src.schemas.health:Module]

View File

@@ -55,6 +55,10 @@ class ProfilePreference(BaseModel):
auto_open_task_drawer: bool = True
dashboards_table_density: Literal["compact", "comfortable"] = "comfortable"
telegram_id: Optional[str] = None
email_address: Optional[str] = None
notify_on_fail: bool = True
created_at: datetime
updated_at: datetime
@@ -103,6 +107,18 @@ class ProfilePreferenceUpdateRequest(BaseModel):
default=None,
description="Preferred table density for dashboard listings.",
)
telegram_id: Optional[str] = Field(
default=None,
description="Telegram ID for notifications.",
)
email_address: Optional[str] = Field(
default=None,
description="Email address for notifications (overrides system email).",
)
notify_on_fail: Optional[bool] = Field(
default=None,
description="Whether to send notifications on validation failure.",
)
# [/DEF:ProfilePreferenceUpdateRequest:Class]

View File

@@ -0,0 +1,68 @@
# [DEF:backend.src.schemas.settings:Module]
# @TIER: STANDARD
# @SEMANTICS: settings, schemas, pydantic, validation
# @PURPOSE: Pydantic schemas for application settings and automation policies.
# @LAYER: Domain
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime, time
# [DEF:NotificationChannel:Class]
# @PURPOSE: Structured notification channel definition for policy-level custom routing.
class NotificationChannel(BaseModel):
type: str = Field(..., description="Notification channel type (e.g., SLACK, SMTP, TELEGRAM)")
target: str = Field(..., description="Notification destination (e.g., #alerts, chat id, email)")
# [/DEF:NotificationChannel:Class]
# [DEF:ValidationPolicyBase:Class]
# @PURPOSE: Base schema for validation policy data.
class ValidationPolicyBase(BaseModel):
name: str = Field(..., description="Name of the policy")
environment_id: str = Field(..., description="Target Superset environment ID")
is_active: bool = Field(True, description="Whether the policy is currently active")
dashboard_ids: List[str] = Field(..., description="List of dashboard IDs to validate")
schedule_days: List[int] = Field(..., description="Days of the week (0-6, 0=Sunday) to run")
window_start: time = Field(..., description="Start of the execution window")
window_end: time = Field(..., description="End of the execution window")
notify_owners: bool = Field(True, description="Whether to notify dashboard owners on failure")
custom_channels: Optional[List[NotificationChannel]] = Field(
None,
description="List of additional structured notification channels",
)
alert_condition: str = Field("FAIL_ONLY", description="Condition to trigger alerts: FAIL_ONLY, WARN_AND_FAIL, ALWAYS")
# [/DEF:ValidationPolicyBase:Class]
# [DEF:ValidationPolicyCreate:Class]
# @PURPOSE: Schema for creating a new validation policy.
class ValidationPolicyCreate(ValidationPolicyBase):
pass
# [/DEF:ValidationPolicyCreate:Class]
# [DEF:ValidationPolicyUpdate:Class]
# @PURPOSE: Schema for updating an existing validation policy.
class ValidationPolicyUpdate(BaseModel):
name: Optional[str] = None
environment_id: Optional[str] = None
is_active: Optional[bool] = None
dashboard_ids: Optional[List[str]] = None
schedule_days: Optional[List[int]] = None
window_start: Optional[time] = None
window_end: Optional[time] = None
notify_owners: Optional[bool] = None
custom_channels: Optional[List[NotificationChannel]] = None
alert_condition: Optional[str] = None
# [/DEF:ValidationPolicyUpdate:Class]
# [DEF:ValidationPolicyResponse:Class]
# @PURPOSE: Schema for validation policy response data.
class ValidationPolicyResponse(ValidationPolicyBase):
id: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# [/DEF:ValidationPolicyResponse:Class]
# [/DEF:backend.src.schemas.settings:Module]

View File

@@ -0,0 +1,85 @@
import pytest
from datetime import datetime, timedelta
from unittest.mock import MagicMock
from src.services.health_service import HealthService
from src.models.llm import ValidationRecord
# [DEF:test_health_service:Module]
# @TIER: STANDARD
# @PURPOSE: Unit tests for HealthService aggregation logic.
@pytest.mark.asyncio
async def test_get_health_summary_aggregation():
"""
@TEST_SCENARIO: Verify that HealthService correctly aggregates the latest record per dashboard.
"""
# Setup: Mock DB session
db = MagicMock()
now = datetime.utcnow()
# Dashboard 1: Old FAIL, New PASS
rec1_old = ValidationRecord(
dashboard_id="dash_1",
environment_id="env_1",
status="FAIL",
timestamp=now - timedelta(hours=1),
summary="Old failure",
issues=[]
)
rec1_new = ValidationRecord(
dashboard_id="dash_1",
environment_id="env_1",
status="PASS",
timestamp=now,
summary="New pass",
issues=[]
)
# Dashboard 2: Single WARN
rec2 = ValidationRecord(
dashboard_id="dash_2",
environment_id="env_1",
status="WARN",
timestamp=now,
summary="Warning",
issues=[]
)
# Mock the query chain
# subquery = self.db.query(...).filter(...).group_by(...).subquery()
# query = self.db.query(ValidationRecord).join(subquery, ...).all()
mock_query = db.query.return_value
mock_query.filter.return_value = mock_query
mock_query.group_by.return_value = mock_query
mock_query.subquery.return_value = MagicMock()
db.query.return_value.join.return_value.all.return_value = [rec1_new, rec2]
service = HealthService(db)
summary = await service.get_health_summary(environment_id="env_1")
assert summary.pass_count == 1
assert summary.warn_count == 1
assert summary.fail_count == 0
assert len(summary.items) == 2
# Verify dash_1 has the latest status (PASS)
dash_1_item = next(item for item in summary.items if item.dashboard_id == "dash_1")
assert dash_1_item.status == "PASS"
assert dash_1_item.summary == "New pass"
@pytest.mark.asyncio
async def test_get_health_summary_empty():
"""
@TEST_SCENARIO: Verify behavior with no records.
"""
db = MagicMock()
db.query.return_value.join.return_value.all.return_value = []
service = HealthService(db)
summary = await service.get_health_summary(environment_id="env_none")
assert summary.pass_count == 0
assert len(summary.items) == 0

View File

@@ -0,0 +1,150 @@
# [DEF:backend.src.services.__tests__.test_llm_plugin_persistence:Module]
# @TIER: STANDARD
# @PURPOSE: Regression test for ValidationRecord persistence fields populated from task context.
import types
import pytest
from src.plugins.llm_analysis import plugin as plugin_module
# [DEF:_DummyLogger:Class]
# @PURPOSE: Minimal logger shim for TaskContext-like objects used in tests.
class _DummyLogger:
def with_source(self, _source: str):
return self
def info(self, *_args, **_kwargs):
return None
def debug(self, *_args, **_kwargs):
return None
def warning(self, *_args, **_kwargs):
return None
def error(self, *_args, **_kwargs):
return None
# [/DEF:_DummyLogger:Class]
# [DEF:_FakeDBSession:Class]
# @PURPOSE: Captures persisted records for assertion and mimics SQLAlchemy session methods used by plugin.
class _FakeDBSession:
def __init__(self):
self.added = None
self.committed = False
self.closed = False
def add(self, obj):
self.added = obj
def commit(self):
self.committed = True
def close(self):
self.closed = True
# [/DEF:_FakeDBSession:Class]
# [DEF:test_dashboard_validation_plugin_persists_task_and_environment_ids:Function]
# @PURPOSE: Ensure db ValidationRecord includes context.task_id and params.environment_id.
@pytest.mark.asyncio
async def test_dashboard_validation_plugin_persists_task_and_environment_ids(tmp_path, monkeypatch):
fake_db = _FakeDBSession()
env = types.SimpleNamespace(id="env-42")
provider = types.SimpleNamespace(
id="provider-1",
name="Main LLM",
provider_type="openai",
base_url="https://example.invalid/v1",
default_model="gpt-4o",
is_active=True,
)
class _FakeProviderService:
def __init__(self, _db):
return None
def get_provider(self, _provider_id):
return provider
def get_decrypted_api_key(self, _provider_id):
return "a" * 32
class _FakeScreenshotService:
def __init__(self, _env):
return None
async def capture_dashboard(self, _dashboard_id, _screenshot_path):
return None
class _FakeLLMClient:
def __init__(self, **_kwargs):
return None
async def analyze_dashboard(self, *_args, **_kwargs):
return {
"status": "PASS",
"summary": "Dashboard healthy",
"issues": [],
}
class _FakeNotificationService:
def __init__(self, *_args, **_kwargs):
return None
async def dispatch_report(self, **_kwargs):
return None
class _FakeConfigManager:
def get_environment(self, _env_id):
return env
def get_config(self):
return types.SimpleNamespace(
settings=types.SimpleNamespace(
storage=types.SimpleNamespace(root_path=str(tmp_path)),
llm={},
)
)
class _FakeSupersetClient:
def __init__(self, _env):
self.network = types.SimpleNamespace(request=lambda **_kwargs: {"result": []})
monkeypatch.setattr(plugin_module, "SessionLocal", lambda: fake_db)
monkeypatch.setattr(plugin_module, "LLMProviderService", _FakeProviderService)
monkeypatch.setattr(plugin_module, "ScreenshotService", _FakeScreenshotService)
monkeypatch.setattr(plugin_module, "LLMClient", _FakeLLMClient)
monkeypatch.setattr(plugin_module, "NotificationService", _FakeNotificationService)
monkeypatch.setattr(plugin_module, "SupersetClient", _FakeSupersetClient)
monkeypatch.setattr("src.dependencies.get_config_manager", lambda: _FakeConfigManager())
context = types.SimpleNamespace(
task_id="task-999",
logger=_DummyLogger(),
background_tasks=None,
)
plugin = plugin_module.DashboardValidationPlugin()
result = await plugin.execute(
{
"dashboard_id": "11",
"environment_id": "env-42",
"provider_id": "provider-1",
},
context=context,
)
assert result["environment_id"] == "env-42"
assert fake_db.committed is True
assert fake_db.closed is True
assert fake_db.added is not None
assert fake_db.added.task_id == "task-999"
assert fake_db.added.environment_id == "env-42"
# [/DEF:test_dashboard_validation_plugin_persists_task_and_environment_ids:Function]
# [/DEF:backend.src.services.__tests__.test_llm_plugin_persistence:Module]

View File

@@ -0,0 +1,84 @@
# [DEF:health_service:Module]
# @TIER: STANDARD
# @SEMANTICS: health, aggregation, dashboards
# @PURPOSE: Business logic for aggregating dashboard health status from validation records.
# @LAYER: Domain/Service
# @RELATION: DEPENDS_ON -> ValidationRecord
from typing import List, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from ..models.llm import ValidationRecord
from ..schemas.health import DashboardHealthItem, HealthSummaryResponse
from ..core.logger import logger
class HealthService:
"""
@PURPOSE: Service for managing and querying dashboard health data.
"""
def __init__(self, db: Session):
self.db = db
async def get_health_summary(self, environment_id: str = None) -> HealthSummaryResponse:
"""
@PURPOSE: Aggregates the latest validation status for all dashboards.
@PRE: environment_id (optional) to filter by environment.
@POST: Returns a HealthSummaryResponse with aggregated status counts and items.
"""
# [REASON] We need the latest ValidationRecord for each unique dashboard_id.
# We use a subquery to find the max timestamp per dashboard_id.
subquery = self.db.query(
ValidationRecord.dashboard_id,
func.max(ValidationRecord.timestamp).label("max_ts")
)
if environment_id:
subquery = subquery.filter(ValidationRecord.environment_id == environment_id)
subquery = subquery.group_by(ValidationRecord.dashboard_id).subquery()
query = self.db.query(ValidationRecord).join(
subquery,
(ValidationRecord.dashboard_id == subquery.c.dashboard_id) &
(ValidationRecord.timestamp == subquery.c.max_ts)
)
records = query.all()
items = []
pass_count = 0
warn_count = 0
fail_count = 0
unknown_count = 0
for rec in records:
status = rec.status.upper()
if status == "PASS":
pass_count += 1
elif status == "WARN":
warn_count += 1
elif status == "FAIL":
fail_count += 1
else:
unknown_count += 1
status = "UNKNOWN"
items.append(DashboardHealthItem(
dashboard_id=rec.dashboard_id,
environment_id=rec.environment_id or "unknown",
status=status,
last_check=rec.timestamp,
task_id=rec.task_id,
summary=rec.summary
))
logger.info(f"[HealthService][get_health_summary] Aggregated {len(items)} dashboard health records.")
return HealthSummaryResponse(
items=items,
pass_count=pass_count,
warn_count=warn_count,
fail_count=fail_count,
unknown_count=unknown_count
)
# [/DEF:health_service:Module]

View File

@@ -0,0 +1,120 @@
# [DEF:backend.src.services.notifications.__tests__.test_notification_service:Module]
# @TIER: STANDARD
# @PURPOSE: Unit tests for NotificationService routing and dispatch logic.
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from datetime import time
from src.models.llm import ValidationRecord, ValidationPolicy
from src.models.profile import UserDashboardPreference
from src.models.auth import User
from src.services.notifications.service import NotificationService
@pytest.fixture
def mock_db():
return MagicMock()
@pytest.fixture
def mock_config_manager():
cm = MagicMock()
cm.get_payload.return_value = {
"notifications": {
"smtp": {"host": "localhost", "port": 25, "from_email": "test@example.com"},
"telegram": {"bot_token": "test_token"}
}
}
return cm
@pytest.fixture
def service(mock_db, mock_config_manager):
return NotificationService(mock_db, mock_config_manager)
@pytest.mark.asyncio
async def test_should_notify_fail_only(service):
record = ValidationRecord(status="FAIL")
policy = ValidationPolicy(alert_condition="FAIL_ONLY")
assert service._should_notify(record, policy) is True
record.status = "WARN"
assert service._should_notify(record, policy) is False
@pytest.mark.asyncio
async def test_should_notify_warn_and_fail(service):
policy = ValidationPolicy(alert_condition="WARN_AND_FAIL")
record = ValidationRecord(status="FAIL")
assert service._should_notify(record, policy) is True
record.status = "WARN"
assert service._should_notify(record, policy) is True
record.status = "PASS"
assert service._should_notify(record, policy) is False
@pytest.mark.asyncio
async def test_resolve_targets_owner_routing(service, mock_db):
record = ValidationRecord(dashboard_id="dash-1", environment_id="env-1")
user = User(email="user@example.com")
pref = UserDashboardPreference(
user=user,
telegram_id="12345",
notify_on_fail=True,
superset_username="user1"
)
mock_db.query.return_value.filter.return_value.all.return_value = [pref]
targets = service._resolve_targets(record, None)
assert ("TELEGRAM", "12345") in targets
assert ("SMTP", "user@example.com") in targets
@pytest.mark.asyncio
async def test_resolve_targets_custom_channels(service):
record = ValidationRecord(status="FAIL")
policy = ValidationPolicy(
notify_owners=False,
custom_channels=[{"type": "SLACK", "target": "#alerts"}]
)
targets = service._resolve_targets(record, policy)
assert targets == [("SLACK", "#alerts")]
@pytest.mark.asyncio
async def test_dispatch_report_skips_if_no_notify(service):
record = ValidationRecord(status="PASS")
policy = ValidationPolicy(alert_condition="FAIL_ONLY")
with patch.object(service, "_resolve_targets") as mock_resolve:
await service.dispatch_report(record, policy)
mock_resolve.assert_not_called()
@pytest.mark.asyncio
async def test_dispatch_report_calls_providers(service, mock_db):
record = ValidationRecord(id="rec-1", status="FAIL", summary="Bad", issues=[])
# Mock providers
service._initialize_providers()
service._providers["TELEGRAM"] = AsyncMock()
service._providers["SMTP"] = AsyncMock()
# Mock targets
with patch.object(service, "_resolve_targets") as mock_resolve:
mock_resolve.return_value = [("TELEGRAM", "123"), ("SMTP", "a@b.com")]
await service.dispatch_report(record, None)
service._providers["TELEGRAM"].send.assert_called_once()
service._providers["SMTP"].send.assert_called_once()
# [/DEF:backend.src.services.notifications.__tests__.test_notification_service:Module]

View File

@@ -0,0 +1,123 @@
# [DEF:backend.src.services.notifications.providers:Module]
#
# @TIER: CRITICAL
# @SEMANTICS: notifications, providers, smtp, slack, telegram, abstraction
# @PURPOSE: Defines abstract base and concrete implementations for external notification delivery.
# @LAYER: Infra
#
# @INVARIANT: Providers must be stateless and resilient to network failures.
# @INVARIANT: Sensitive credentials must be handled via encrypted config.
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from ...core.logger import logger
# [DEF:NotificationProvider:Class]
# @PURPOSE: Abstract base class for all notification providers.
class NotificationProvider(ABC):
@abstractmethod
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
"""
Send a notification to a specific target.
:param target: Recipient identifier (email, channel ID, user ID).
:param subject: Notification subject or title.
:param body: Main content of the notification.
:param context: Additional metadata for the provider.
:return: True if successfully dispatched.
"""
pass
# [/DEF:NotificationProvider:Class]
# [DEF:SMTPProvider:Class]
# @PURPOSE: Delivers notifications via SMTP.
class SMTPProvider(NotificationProvider):
def __init__(self, config: Dict[str, Any]):
self.host = config.get("host")
self.port = int(config.get("port", 587))
self.username = config.get("username")
self.password = config.get("password")
self.from_email = config.get("from_email")
self.use_tls = config.get("use_tls", True)
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
try:
msg = MIMEMultipart()
msg["From"] = self.from_email
msg["To"] = target
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
server = smtplib.SMTP(self.host, self.port)
if self.use_tls:
server.starttls()
if self.username and self.password:
server.login(self.username, self.password)
server.send_message(msg)
server.quit()
return True
except Exception as e:
logger.error(f"[SMTPProvider][FAILED] Failed to send email to {target}: {e}")
return False
# [/DEF:SMTPProvider:Class]
# [DEF:TelegramProvider:Class]
# @PURPOSE: Delivers notifications via Telegram Bot API.
class TelegramProvider(NotificationProvider):
def __init__(self, config: Dict[str, Any]):
self.bot_token = config.get("bot_token")
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
if not self.bot_token:
logger.error("[TelegramProvider][FAILED] Bot token not configured")
return False
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
"chat_id": target,
"text": f"*{subject}*\n\n{body}",
"parse_mode": "Markdown"
}
try:
response = requests.post(url, json=payload, timeout=10)
response.raise_for_status()
return True
except Exception as e:
logger.error(f"[TelegramProvider][FAILED] Failed to send Telegram message to {target}: {e}")
return False
# [/DEF:TelegramProvider:Class]
# [DEF:SlackProvider:Class]
# @PURPOSE: Delivers notifications via Slack Webhooks or API.
class SlackProvider(NotificationProvider):
def __init__(self, config: Dict[str, Any]):
self.webhook_url = config.get("webhook_url")
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
if not self.webhook_url:
logger.error("[SlackProvider][FAILED] Webhook URL not configured")
return False
payload = {
"text": f"*{subject}*\n{body}"
}
try:
response = requests.post(self.webhook_url, json=payload, timeout=10)
response.raise_for_status()
return True
except Exception as e:
logger.error(f"[SlackProvider][FAILED] Failed to send Slack message: {e}")
return False
# [/DEF:SlackProvider:Class]
# [/DEF:backend.src.services.notifications.providers:Module]

View File

@@ -0,0 +1,146 @@
# [DEF:backend.src.services.notifications.service:Module]
#
# @TIER: CRITICAL
# @SEMANTICS: notifications, service, routing, dispatch, background-tasks
# @PURPOSE: Orchestrates notification routing based on user preferences and policy context.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.services.notifications.providers
# @RELATION: DEPENDS_ON -> backend.src.services.profile_service
# @RELATION: DEPENDS_ON -> backend.src.models.llm
#
# @INVARIANT: Notifications are dispatched asynchronously via BackgroundTasks.
# @INVARIANT: Missing profile or provider config must not crash the pipeline.
from typing import Any, Dict, List, Optional
from fastapi import BackgroundTasks
from sqlalchemy.orm import Session
from ...core.logger import logger, belief_scope
from ...core.config_manager import ConfigManager
from ...models.llm import ValidationRecord, ValidationPolicy
from ...models.profile import UserDashboardPreference
from .providers import SMTPProvider, TelegramProvider, SlackProvider, NotificationProvider
# [DEF:NotificationService:Class]
# @PURPOSE: Routes validation reports to appropriate users and channels.
class NotificationService:
def __init__(self, db: Session, config_manager: ConfigManager):
self.db = db
self.config_manager = config_manager
self._providers: Dict[str, NotificationProvider] = {}
self._initialized = False
def _initialize_providers(self):
if self._initialized:
return
# In a real implementation, we would fetch these from NotificationConfig model
# For now, we'll use a placeholder initialization logic
# T033 will implement the UI/API for this.
configs = self.config_manager.get_payload().get("notifications", {})
if "smtp" in configs:
self._providers["SMTP"] = SMTPProvider(configs["smtp"])
if "telegram" in configs:
self._providers["TELEGRAM"] = TelegramProvider(configs["telegram"])
if "slack" in configs:
self._providers["SLACK"] = SlackProvider(configs["slack"])
self._initialized = True
async def dispatch_report(
self,
record: ValidationRecord,
policy: Optional[ValidationPolicy] = None,
background_tasks: Optional[BackgroundTasks] = None
):
"""
Route a validation record to owners and custom channels.
@PRE: record is persisted.
@POST: Dispatches async tasks for each resolved target.
"""
with belief_scope("NotificationService.dispatch_report", f"record_id={record.id}"):
self._initialize_providers()
# 1. Determine if we should notify based on status and policy
should_notify = self._should_notify(record, policy)
if not should_notify:
logger.reason(f"[REASON] Notification skipped for record {record.id} (status={record.status})")
return
# 2. Resolve targets (Owners + Custom Channels)
targets = self._resolve_targets(record, policy)
# 3. Dispatch
subject = f"Dashboard Health Alert: {record.status}"
body = self._build_body(record)
for channel_type, recipient in targets:
provider = self._providers.get(channel_type)
if not provider:
logger.warning(f"[NotificationService][EXPLORE] Unsupported or unconfigured channel: {channel_type}")
continue
if background_tasks:
background_tasks.add_task(provider.send, recipient, subject, body)
else:
# Fallback to sync for tests or if no background_tasks provided
await provider.send(recipient, subject, body)
def _should_notify(self, record: ValidationRecord, policy: Optional[ValidationPolicy]) -> bool:
condition = policy.alert_condition if policy else "FAIL_ONLY"
if condition == "ALWAYS":
return True
if condition == "WARN_AND_FAIL":
return record.status in ("WARN", "FAIL")
return record.status == "FAIL"
def _resolve_targets(self, record: ValidationRecord, policy: Optional[ValidationPolicy]) -> List[tuple]:
targets = []
# Owner routing
if not policy or policy.notify_owners:
owners = self._find_dashboard_owners(record)
for owner_pref in owners:
if not owner_pref.notify_on_fail:
continue
if owner_pref.telegram_id:
targets.append(("TELEGRAM", owner_pref.telegram_id))
email = owner_pref.email_address or getattr(owner_pref.user, "email", None)
if email:
targets.append(("SMTP", email))
# Custom channels from policy
if policy and policy.custom_channels:
for channel in policy.custom_channels:
# channel format: {"type": "SLACK", "target": "#alerts"}
targets.append((channel.get("type"), channel.get("target")))
return targets
def _find_dashboard_owners(self, record: ValidationRecord) -> List[UserDashboardPreference]:
# This is a simplified owner lookup.
# In a real scenario, we'd query Superset for owners, then match them to our UserDashboardPreference.
# For now, we'll return all users who have bound this dashboard's environment and have a username.
# Placeholder: return all preferences that have a superset_username
# (In production, we'd filter by actual ownership from Superset metadata)
return self.db.query(UserDashboardPreference).filter(
UserDashboardPreference.superset_username != None
).all()
def _build_body(self, record: ValidationRecord) -> str:
return (
f"Dashboard ID: {record.dashboard_id}\n"
f"Environment: {record.environment_id}\n"
f"Status: {record.status}\n\n"
f"Summary: {record.summary}\n\n"
f"Issues found: {len(record.issues)}"
)
# [/DEF:NotificationService:Class]
# [/DEF:backend.src.services.notifications.service:Module]

View File

@@ -173,12 +173,29 @@ class ProfileService:
payload.dashboards_table_density
)
effective_telegram_id = self._sanitize_text(preference.telegram_id)
if "telegram_id" in provided_fields:
effective_telegram_id = self._sanitize_text(payload.telegram_id)
effective_email_address = self._sanitize_text(preference.email_address)
if "email_address" in provided_fields:
effective_email_address = self._sanitize_text(payload.email_address)
effective_notify_on_fail = (
bool(preference.notify_on_fail)
if preference.notify_on_fail is not None
else True
)
if "notify_on_fail" in provided_fields:
effective_notify_on_fail = bool(payload.notify_on_fail)
validation_errors = self._validate_update_payload(
superset_username=effective_superset_username,
show_only_my_dashboards=effective_show_only,
git_email=effective_git_email,
start_page=effective_start_page,
dashboards_table_density=effective_dashboards_table_density,
email_address=effective_email_address,
)
if validation_errors:
logger.reflect("[REFLECT] Validation failed; mutation is denied")
@@ -205,6 +222,9 @@ class ProfileService:
preference.start_page = effective_start_page
preference.auto_open_task_drawer = effective_auto_open_task_drawer
preference.dashboards_table_density = effective_dashboards_table_density
preference.telegram_id = effective_telegram_id
preference.email_address = effective_email_address
preference.notify_on_fail = effective_notify_on_fail
preference.updated_at = datetime.utcnow()
persisted_preference = self.auth_repository.save_user_dashboard_preference(preference)
@@ -453,6 +473,9 @@ class ProfileService:
dashboards_table_density=self._normalize_density(
preference.dashboards_table_density
),
telegram_id=self._sanitize_text(preference.telegram_id),
email_address=self._sanitize_text(preference.email_address),
notify_on_fail=bool(preference.notify_on_fail) if preference.notify_on_fail is not None else True,
created_at=created_at,
updated_at=updated_at,
)
@@ -570,6 +593,9 @@ class ProfileService:
start_page="dashboards",
auto_open_task_drawer=True,
dashboards_table_density="comfortable",
telegram_id=None,
email_address=None,
notify_on_fail=True,
created_at=now,
updated_at=now,
)
@@ -586,6 +612,7 @@ class ProfileService:
git_email: Optional[str],
start_page: str,
dashboards_table_density: str,
email_address: Optional[str] = None,
) -> List[str]:
errors: List[str] = []
sanitized_username = self._sanitize_username(superset_username)
@@ -613,6 +640,16 @@ class ProfileService:
if dashboards_table_density not in SUPPORTED_DENSITIES:
errors.append("Dashboards table density value is not supported.")
sanitized_email = self._sanitize_text(email_address)
if sanitized_email:
if (
" " in sanitized_email
or "@" not in sanitized_email
or sanitized_email.startswith("@")
or sanitized_email.endswith("@")
):
errors.append("Notification email should be a valid email address.")
return errors
# [/DEF:_validate_update_payload:Function]