код написан
This commit is contained in:
85
backend/src/services/__tests__/test_health_service.py
Normal file
85
backend/src/services/__tests__/test_health_service.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import MagicMock
|
||||
from src.services.health_service import HealthService
|
||||
from src.models.llm import ValidationRecord
|
||||
|
||||
# [DEF:test_health_service:Module]
|
||||
# @TIER: STANDARD
|
||||
# @PURPOSE: Unit tests for HealthService aggregation logic.
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_health_summary_aggregation():
|
||||
"""
|
||||
@TEST_SCENARIO: Verify that HealthService correctly aggregates the latest record per dashboard.
|
||||
"""
|
||||
# Setup: Mock DB session
|
||||
db = MagicMock()
|
||||
|
||||
now = datetime.utcnow()
|
||||
|
||||
# Dashboard 1: Old FAIL, New PASS
|
||||
rec1_old = ValidationRecord(
|
||||
dashboard_id="dash_1",
|
||||
environment_id="env_1",
|
||||
status="FAIL",
|
||||
timestamp=now - timedelta(hours=1),
|
||||
summary="Old failure",
|
||||
issues=[]
|
||||
)
|
||||
rec1_new = ValidationRecord(
|
||||
dashboard_id="dash_1",
|
||||
environment_id="env_1",
|
||||
status="PASS",
|
||||
timestamp=now,
|
||||
summary="New pass",
|
||||
issues=[]
|
||||
)
|
||||
|
||||
# Dashboard 2: Single WARN
|
||||
rec2 = ValidationRecord(
|
||||
dashboard_id="dash_2",
|
||||
environment_id="env_1",
|
||||
status="WARN",
|
||||
timestamp=now,
|
||||
summary="Warning",
|
||||
issues=[]
|
||||
)
|
||||
|
||||
# Mock the query chain
|
||||
# subquery = self.db.query(...).filter(...).group_by(...).subquery()
|
||||
# query = self.db.query(ValidationRecord).join(subquery, ...).all()
|
||||
|
||||
mock_query = db.query.return_value
|
||||
mock_query.filter.return_value = mock_query
|
||||
mock_query.group_by.return_value = mock_query
|
||||
mock_query.subquery.return_value = MagicMock()
|
||||
|
||||
db.query.return_value.join.return_value.all.return_value = [rec1_new, rec2]
|
||||
|
||||
service = HealthService(db)
|
||||
summary = await service.get_health_summary(environment_id="env_1")
|
||||
|
||||
assert summary.pass_count == 1
|
||||
assert summary.warn_count == 1
|
||||
assert summary.fail_count == 0
|
||||
assert len(summary.items) == 2
|
||||
|
||||
# Verify dash_1 has the latest status (PASS)
|
||||
dash_1_item = next(item for item in summary.items if item.dashboard_id == "dash_1")
|
||||
assert dash_1_item.status == "PASS"
|
||||
assert dash_1_item.summary == "New pass"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_health_summary_empty():
|
||||
"""
|
||||
@TEST_SCENARIO: Verify behavior with no records.
|
||||
"""
|
||||
db = MagicMock()
|
||||
db.query.return_value.join.return_value.all.return_value = []
|
||||
|
||||
service = HealthService(db)
|
||||
summary = await service.get_health_summary(environment_id="env_none")
|
||||
|
||||
assert summary.pass_count == 0
|
||||
assert len(summary.items) == 0
|
||||
150
backend/src/services/__tests__/test_llm_plugin_persistence.py
Normal file
150
backend/src/services/__tests__/test_llm_plugin_persistence.py
Normal file
@@ -0,0 +1,150 @@
|
||||
# [DEF:backend.src.services.__tests__.test_llm_plugin_persistence:Module]
|
||||
# @TIER: STANDARD
|
||||
# @PURPOSE: Regression test for ValidationRecord persistence fields populated from task context.
|
||||
|
||||
import types
|
||||
import pytest
|
||||
|
||||
from src.plugins.llm_analysis import plugin as plugin_module
|
||||
|
||||
|
||||
# [DEF:_DummyLogger:Class]
|
||||
# @PURPOSE: Minimal logger shim for TaskContext-like objects used in tests.
|
||||
class _DummyLogger:
|
||||
def with_source(self, _source: str):
|
||||
return self
|
||||
|
||||
def info(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
def debug(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
def warning(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
def error(self, *_args, **_kwargs):
|
||||
return None
|
||||
# [/DEF:_DummyLogger:Class]
|
||||
|
||||
|
||||
# [DEF:_FakeDBSession:Class]
|
||||
# @PURPOSE: Captures persisted records for assertion and mimics SQLAlchemy session methods used by plugin.
|
||||
class _FakeDBSession:
|
||||
def __init__(self):
|
||||
self.added = None
|
||||
self.committed = False
|
||||
self.closed = False
|
||||
|
||||
def add(self, obj):
|
||||
self.added = obj
|
||||
|
||||
def commit(self):
|
||||
self.committed = True
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
# [/DEF:_FakeDBSession:Class]
|
||||
|
||||
|
||||
# [DEF:test_dashboard_validation_plugin_persists_task_and_environment_ids:Function]
|
||||
# @PURPOSE: Ensure db ValidationRecord includes context.task_id and params.environment_id.
|
||||
@pytest.mark.asyncio
|
||||
async def test_dashboard_validation_plugin_persists_task_and_environment_ids(tmp_path, monkeypatch):
|
||||
fake_db = _FakeDBSession()
|
||||
|
||||
env = types.SimpleNamespace(id="env-42")
|
||||
provider = types.SimpleNamespace(
|
||||
id="provider-1",
|
||||
name="Main LLM",
|
||||
provider_type="openai",
|
||||
base_url="https://example.invalid/v1",
|
||||
default_model="gpt-4o",
|
||||
is_active=True,
|
||||
)
|
||||
|
||||
class _FakeProviderService:
|
||||
def __init__(self, _db):
|
||||
return None
|
||||
|
||||
def get_provider(self, _provider_id):
|
||||
return provider
|
||||
|
||||
def get_decrypted_api_key(self, _provider_id):
|
||||
return "a" * 32
|
||||
|
||||
class _FakeScreenshotService:
|
||||
def __init__(self, _env):
|
||||
return None
|
||||
|
||||
async def capture_dashboard(self, _dashboard_id, _screenshot_path):
|
||||
return None
|
||||
|
||||
class _FakeLLMClient:
|
||||
def __init__(self, **_kwargs):
|
||||
return None
|
||||
|
||||
async def analyze_dashboard(self, *_args, **_kwargs):
|
||||
return {
|
||||
"status": "PASS",
|
||||
"summary": "Dashboard healthy",
|
||||
"issues": [],
|
||||
}
|
||||
|
||||
class _FakeNotificationService:
|
||||
def __init__(self, *_args, **_kwargs):
|
||||
return None
|
||||
|
||||
async def dispatch_report(self, **_kwargs):
|
||||
return None
|
||||
|
||||
class _FakeConfigManager:
|
||||
def get_environment(self, _env_id):
|
||||
return env
|
||||
|
||||
def get_config(self):
|
||||
return types.SimpleNamespace(
|
||||
settings=types.SimpleNamespace(
|
||||
storage=types.SimpleNamespace(root_path=str(tmp_path)),
|
||||
llm={},
|
||||
)
|
||||
)
|
||||
|
||||
class _FakeSupersetClient:
|
||||
def __init__(self, _env):
|
||||
self.network = types.SimpleNamespace(request=lambda **_kwargs: {"result": []})
|
||||
|
||||
monkeypatch.setattr(plugin_module, "SessionLocal", lambda: fake_db)
|
||||
monkeypatch.setattr(plugin_module, "LLMProviderService", _FakeProviderService)
|
||||
monkeypatch.setattr(plugin_module, "ScreenshotService", _FakeScreenshotService)
|
||||
monkeypatch.setattr(plugin_module, "LLMClient", _FakeLLMClient)
|
||||
monkeypatch.setattr(plugin_module, "NotificationService", _FakeNotificationService)
|
||||
monkeypatch.setattr(plugin_module, "SupersetClient", _FakeSupersetClient)
|
||||
monkeypatch.setattr("src.dependencies.get_config_manager", lambda: _FakeConfigManager())
|
||||
|
||||
context = types.SimpleNamespace(
|
||||
task_id="task-999",
|
||||
logger=_DummyLogger(),
|
||||
background_tasks=None,
|
||||
)
|
||||
|
||||
plugin = plugin_module.DashboardValidationPlugin()
|
||||
result = await plugin.execute(
|
||||
{
|
||||
"dashboard_id": "11",
|
||||
"environment_id": "env-42",
|
||||
"provider_id": "provider-1",
|
||||
},
|
||||
context=context,
|
||||
)
|
||||
|
||||
assert result["environment_id"] == "env-42"
|
||||
assert fake_db.committed is True
|
||||
assert fake_db.closed is True
|
||||
assert fake_db.added is not None
|
||||
assert fake_db.added.task_id == "task-999"
|
||||
assert fake_db.added.environment_id == "env-42"
|
||||
# [/DEF:test_dashboard_validation_plugin_persists_task_and_environment_ids:Function]
|
||||
|
||||
|
||||
# [/DEF:backend.src.services.__tests__.test_llm_plugin_persistence:Module]
|
||||
84
backend/src/services/health_service.py
Normal file
84
backend/src/services/health_service.py
Normal file
@@ -0,0 +1,84 @@
|
||||
# [DEF:health_service:Module]
|
||||
# @TIER: STANDARD
|
||||
# @SEMANTICS: health, aggregation, dashboards
|
||||
# @PURPOSE: Business logic for aggregating dashboard health status from validation records.
|
||||
# @LAYER: Domain/Service
|
||||
# @RELATION: DEPENDS_ON -> ValidationRecord
|
||||
|
||||
from typing import List, Dict, Any
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import func, desc
|
||||
from ..models.llm import ValidationRecord
|
||||
from ..schemas.health import DashboardHealthItem, HealthSummaryResponse
|
||||
from ..core.logger import logger
|
||||
|
||||
class HealthService:
|
||||
"""
|
||||
@PURPOSE: Service for managing and querying dashboard health data.
|
||||
"""
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
async def get_health_summary(self, environment_id: str = None) -> HealthSummaryResponse:
|
||||
"""
|
||||
@PURPOSE: Aggregates the latest validation status for all dashboards.
|
||||
@PRE: environment_id (optional) to filter by environment.
|
||||
@POST: Returns a HealthSummaryResponse with aggregated status counts and items.
|
||||
"""
|
||||
# [REASON] We need the latest ValidationRecord for each unique dashboard_id.
|
||||
# We use a subquery to find the max timestamp per dashboard_id.
|
||||
|
||||
subquery = self.db.query(
|
||||
ValidationRecord.dashboard_id,
|
||||
func.max(ValidationRecord.timestamp).label("max_ts")
|
||||
)
|
||||
if environment_id:
|
||||
subquery = subquery.filter(ValidationRecord.environment_id == environment_id)
|
||||
subquery = subquery.group_by(ValidationRecord.dashboard_id).subquery()
|
||||
|
||||
query = self.db.query(ValidationRecord).join(
|
||||
subquery,
|
||||
(ValidationRecord.dashboard_id == subquery.c.dashboard_id) &
|
||||
(ValidationRecord.timestamp == subquery.c.max_ts)
|
||||
)
|
||||
|
||||
records = query.all()
|
||||
|
||||
items = []
|
||||
pass_count = 0
|
||||
warn_count = 0
|
||||
fail_count = 0
|
||||
unknown_count = 0
|
||||
|
||||
for rec in records:
|
||||
status = rec.status.upper()
|
||||
if status == "PASS":
|
||||
pass_count += 1
|
||||
elif status == "WARN":
|
||||
warn_count += 1
|
||||
elif status == "FAIL":
|
||||
fail_count += 1
|
||||
else:
|
||||
unknown_count += 1
|
||||
status = "UNKNOWN"
|
||||
|
||||
items.append(DashboardHealthItem(
|
||||
dashboard_id=rec.dashboard_id,
|
||||
environment_id=rec.environment_id or "unknown",
|
||||
status=status,
|
||||
last_check=rec.timestamp,
|
||||
task_id=rec.task_id,
|
||||
summary=rec.summary
|
||||
))
|
||||
|
||||
logger.info(f"[HealthService][get_health_summary] Aggregated {len(items)} dashboard health records.")
|
||||
|
||||
return HealthSummaryResponse(
|
||||
items=items,
|
||||
pass_count=pass_count,
|
||||
warn_count=warn_count,
|
||||
fail_count=fail_count,
|
||||
unknown_count=unknown_count
|
||||
)
|
||||
|
||||
# [/DEF:health_service:Module]
|
||||
@@ -0,0 +1,120 @@
|
||||
# [DEF:backend.src.services.notifications.__tests__.test_notification_service:Module]
|
||||
# @TIER: STANDARD
|
||||
# @PURPOSE: Unit tests for NotificationService routing and dispatch logic.
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
from datetime import time
|
||||
|
||||
from src.models.llm import ValidationRecord, ValidationPolicy
|
||||
from src.models.profile import UserDashboardPreference
|
||||
from src.models.auth import User
|
||||
from src.services.notifications.service import NotificationService
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_db():
|
||||
return MagicMock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_config_manager():
|
||||
cm = MagicMock()
|
||||
cm.get_payload.return_value = {
|
||||
"notifications": {
|
||||
"smtp": {"host": "localhost", "port": 25, "from_email": "test@example.com"},
|
||||
"telegram": {"bot_token": "test_token"}
|
||||
}
|
||||
}
|
||||
return cm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def service(mock_db, mock_config_manager):
|
||||
return NotificationService(mock_db, mock_config_manager)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_notify_fail_only(service):
|
||||
record = ValidationRecord(status="FAIL")
|
||||
policy = ValidationPolicy(alert_condition="FAIL_ONLY")
|
||||
assert service._should_notify(record, policy) is True
|
||||
|
||||
record.status = "WARN"
|
||||
assert service._should_notify(record, policy) is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_notify_warn_and_fail(service):
|
||||
policy = ValidationPolicy(alert_condition="WARN_AND_FAIL")
|
||||
|
||||
record = ValidationRecord(status="FAIL")
|
||||
assert service._should_notify(record, policy) is True
|
||||
|
||||
record.status = "WARN"
|
||||
assert service._should_notify(record, policy) is True
|
||||
|
||||
record.status = "PASS"
|
||||
assert service._should_notify(record, policy) is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_targets_owner_routing(service, mock_db):
|
||||
record = ValidationRecord(dashboard_id="dash-1", environment_id="env-1")
|
||||
|
||||
user = User(email="user@example.com")
|
||||
pref = UserDashboardPreference(
|
||||
user=user,
|
||||
telegram_id="12345",
|
||||
notify_on_fail=True,
|
||||
superset_username="user1"
|
||||
)
|
||||
|
||||
mock_db.query.return_value.filter.return_value.all.return_value = [pref]
|
||||
|
||||
targets = service._resolve_targets(record, None)
|
||||
|
||||
assert ("TELEGRAM", "12345") in targets
|
||||
assert ("SMTP", "user@example.com") in targets
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_targets_custom_channels(service):
|
||||
record = ValidationRecord(status="FAIL")
|
||||
policy = ValidationPolicy(
|
||||
notify_owners=False,
|
||||
custom_channels=[{"type": "SLACK", "target": "#alerts"}]
|
||||
)
|
||||
|
||||
targets = service._resolve_targets(record, policy)
|
||||
assert targets == [("SLACK", "#alerts")]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_report_skips_if_no_notify(service):
|
||||
record = ValidationRecord(status="PASS")
|
||||
policy = ValidationPolicy(alert_condition="FAIL_ONLY")
|
||||
|
||||
with patch.object(service, "_resolve_targets") as mock_resolve:
|
||||
await service.dispatch_report(record, policy)
|
||||
mock_resolve.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_report_calls_providers(service, mock_db):
|
||||
record = ValidationRecord(id="rec-1", status="FAIL", summary="Bad", issues=[])
|
||||
|
||||
# Mock providers
|
||||
service._initialize_providers()
|
||||
service._providers["TELEGRAM"] = AsyncMock()
|
||||
service._providers["SMTP"] = AsyncMock()
|
||||
|
||||
# Mock targets
|
||||
with patch.object(service, "_resolve_targets") as mock_resolve:
|
||||
mock_resolve.return_value = [("TELEGRAM", "123"), ("SMTP", "a@b.com")]
|
||||
await service.dispatch_report(record, None)
|
||||
|
||||
service._providers["TELEGRAM"].send.assert_called_once()
|
||||
service._providers["SMTP"].send.assert_called_once()
|
||||
|
||||
# [/DEF:backend.src.services.notifications.__tests__.test_notification_service:Module]
|
||||
123
backend/src/services/notifications/providers.py
Normal file
123
backend/src/services/notifications/providers.py
Normal file
@@ -0,0 +1,123 @@
|
||||
# [DEF:backend.src.services.notifications.providers:Module]
|
||||
#
|
||||
# @TIER: CRITICAL
|
||||
# @SEMANTICS: notifications, providers, smtp, slack, telegram, abstraction
|
||||
# @PURPOSE: Defines abstract base and concrete implementations for external notification delivery.
|
||||
# @LAYER: Infra
|
||||
#
|
||||
# @INVARIANT: Providers must be stateless and resilient to network failures.
|
||||
# @INVARIANT: Sensitive credentials must be handled via encrypted config.
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict, Optional
|
||||
import requests
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
|
||||
from ...core.logger import logger
|
||||
|
||||
|
||||
# [DEF:NotificationProvider:Class]
|
||||
# @PURPOSE: Abstract base class for all notification providers.
|
||||
class NotificationProvider(ABC):
|
||||
@abstractmethod
|
||||
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
|
||||
"""
|
||||
Send a notification to a specific target.
|
||||
:param target: Recipient identifier (email, channel ID, user ID).
|
||||
:param subject: Notification subject or title.
|
||||
:param body: Main content of the notification.
|
||||
:param context: Additional metadata for the provider.
|
||||
:return: True if successfully dispatched.
|
||||
"""
|
||||
pass
|
||||
# [/DEF:NotificationProvider:Class]
|
||||
|
||||
|
||||
# [DEF:SMTPProvider:Class]
|
||||
# @PURPOSE: Delivers notifications via SMTP.
|
||||
class SMTPProvider(NotificationProvider):
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.host = config.get("host")
|
||||
self.port = int(config.get("port", 587))
|
||||
self.username = config.get("username")
|
||||
self.password = config.get("password")
|
||||
self.from_email = config.get("from_email")
|
||||
self.use_tls = config.get("use_tls", True)
|
||||
|
||||
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
|
||||
try:
|
||||
msg = MIMEMultipart()
|
||||
msg["From"] = self.from_email
|
||||
msg["To"] = target
|
||||
msg["Subject"] = subject
|
||||
msg.attach(MIMEText(body, "plain"))
|
||||
|
||||
server = smtplib.SMTP(self.host, self.port)
|
||||
if self.use_tls:
|
||||
server.starttls()
|
||||
if self.username and self.password:
|
||||
server.login(self.username, self.password)
|
||||
server.send_message(msg)
|
||||
server.quit()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"[SMTPProvider][FAILED] Failed to send email to {target}: {e}")
|
||||
return False
|
||||
# [/DEF:SMTPProvider:Class]
|
||||
|
||||
|
||||
# [DEF:TelegramProvider:Class]
|
||||
# @PURPOSE: Delivers notifications via Telegram Bot API.
|
||||
class TelegramProvider(NotificationProvider):
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.bot_token = config.get("bot_token")
|
||||
|
||||
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
|
||||
if not self.bot_token:
|
||||
logger.error("[TelegramProvider][FAILED] Bot token not configured")
|
||||
return False
|
||||
|
||||
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
|
||||
payload = {
|
||||
"chat_id": target,
|
||||
"text": f"*{subject}*\n\n{body}",
|
||||
"parse_mode": "Markdown"
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, json=payload, timeout=10)
|
||||
response.raise_for_status()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"[TelegramProvider][FAILED] Failed to send Telegram message to {target}: {e}")
|
||||
return False
|
||||
# [/DEF:TelegramProvider:Class]
|
||||
|
||||
|
||||
# [DEF:SlackProvider:Class]
|
||||
# @PURPOSE: Delivers notifications via Slack Webhooks or API.
|
||||
class SlackProvider(NotificationProvider):
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.webhook_url = config.get("webhook_url")
|
||||
|
||||
async def send(self, target: str, subject: str, body: str, context: Optional[Dict[str, Any]] = None) -> bool:
|
||||
if not self.webhook_url:
|
||||
logger.error("[SlackProvider][FAILED] Webhook URL not configured")
|
||||
return False
|
||||
|
||||
payload = {
|
||||
"text": f"*{subject}*\n{body}"
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(self.webhook_url, json=payload, timeout=10)
|
||||
response.raise_for_status()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"[SlackProvider][FAILED] Failed to send Slack message: {e}")
|
||||
return False
|
||||
# [/DEF:SlackProvider:Class]
|
||||
|
||||
# [/DEF:backend.src.services.notifications.providers:Module]
|
||||
146
backend/src/services/notifications/service.py
Normal file
146
backend/src/services/notifications/service.py
Normal file
@@ -0,0 +1,146 @@
|
||||
# [DEF:backend.src.services.notifications.service:Module]
|
||||
#
|
||||
# @TIER: CRITICAL
|
||||
# @SEMANTICS: notifications, service, routing, dispatch, background-tasks
|
||||
# @PURPOSE: Orchestrates notification routing based on user preferences and policy context.
|
||||
# @LAYER: Domain
|
||||
# @RELATION: DEPENDS_ON -> backend.src.services.notifications.providers
|
||||
# @RELATION: DEPENDS_ON -> backend.src.services.profile_service
|
||||
# @RELATION: DEPENDS_ON -> backend.src.models.llm
|
||||
#
|
||||
# @INVARIANT: Notifications are dispatched asynchronously via BackgroundTasks.
|
||||
# @INVARIANT: Missing profile or provider config must not crash the pipeline.
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
from fastapi import BackgroundTasks
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from ...core.logger import logger, belief_scope
|
||||
from ...core.config_manager import ConfigManager
|
||||
from ...models.llm import ValidationRecord, ValidationPolicy
|
||||
from ...models.profile import UserDashboardPreference
|
||||
from .providers import SMTPProvider, TelegramProvider, SlackProvider, NotificationProvider
|
||||
|
||||
|
||||
# [DEF:NotificationService:Class]
|
||||
# @PURPOSE: Routes validation reports to appropriate users and channels.
|
||||
class NotificationService:
|
||||
def __init__(self, db: Session, config_manager: ConfigManager):
|
||||
self.db = db
|
||||
self.config_manager = config_manager
|
||||
self._providers: Dict[str, NotificationProvider] = {}
|
||||
self._initialized = False
|
||||
|
||||
def _initialize_providers(self):
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# In a real implementation, we would fetch these from NotificationConfig model
|
||||
# For now, we'll use a placeholder initialization logic
|
||||
# T033 will implement the UI/API for this.
|
||||
configs = self.config_manager.get_payload().get("notifications", {})
|
||||
|
||||
if "smtp" in configs:
|
||||
self._providers["SMTP"] = SMTPProvider(configs["smtp"])
|
||||
if "telegram" in configs:
|
||||
self._providers["TELEGRAM"] = TelegramProvider(configs["telegram"])
|
||||
if "slack" in configs:
|
||||
self._providers["SLACK"] = SlackProvider(configs["slack"])
|
||||
|
||||
self._initialized = True
|
||||
|
||||
async def dispatch_report(
|
||||
self,
|
||||
record: ValidationRecord,
|
||||
policy: Optional[ValidationPolicy] = None,
|
||||
background_tasks: Optional[BackgroundTasks] = None
|
||||
):
|
||||
"""
|
||||
Route a validation record to owners and custom channels.
|
||||
@PRE: record is persisted.
|
||||
@POST: Dispatches async tasks for each resolved target.
|
||||
"""
|
||||
with belief_scope("NotificationService.dispatch_report", f"record_id={record.id}"):
|
||||
self._initialize_providers()
|
||||
|
||||
# 1. Determine if we should notify based on status and policy
|
||||
should_notify = self._should_notify(record, policy)
|
||||
if not should_notify:
|
||||
logger.reason(f"[REASON] Notification skipped for record {record.id} (status={record.status})")
|
||||
return
|
||||
|
||||
# 2. Resolve targets (Owners + Custom Channels)
|
||||
targets = self._resolve_targets(record, policy)
|
||||
|
||||
# 3. Dispatch
|
||||
subject = f"Dashboard Health Alert: {record.status}"
|
||||
body = self._build_body(record)
|
||||
|
||||
for channel_type, recipient in targets:
|
||||
provider = self._providers.get(channel_type)
|
||||
if not provider:
|
||||
logger.warning(f"[NotificationService][EXPLORE] Unsupported or unconfigured channel: {channel_type}")
|
||||
continue
|
||||
|
||||
if background_tasks:
|
||||
background_tasks.add_task(provider.send, recipient, subject, body)
|
||||
else:
|
||||
# Fallback to sync for tests or if no background_tasks provided
|
||||
await provider.send(recipient, subject, body)
|
||||
|
||||
def _should_notify(self, record: ValidationRecord, policy: Optional[ValidationPolicy]) -> bool:
|
||||
condition = policy.alert_condition if policy else "FAIL_ONLY"
|
||||
|
||||
if condition == "ALWAYS":
|
||||
return True
|
||||
if condition == "WARN_AND_FAIL":
|
||||
return record.status in ("WARN", "FAIL")
|
||||
return record.status == "FAIL"
|
||||
|
||||
def _resolve_targets(self, record: ValidationRecord, policy: Optional[ValidationPolicy]) -> List[tuple]:
|
||||
targets = []
|
||||
|
||||
# Owner routing
|
||||
if not policy or policy.notify_owners:
|
||||
owners = self._find_dashboard_owners(record)
|
||||
for owner_pref in owners:
|
||||
if not owner_pref.notify_on_fail:
|
||||
continue
|
||||
|
||||
if owner_pref.telegram_id:
|
||||
targets.append(("TELEGRAM", owner_pref.telegram_id))
|
||||
|
||||
email = owner_pref.email_address or getattr(owner_pref.user, "email", None)
|
||||
if email:
|
||||
targets.append(("SMTP", email))
|
||||
|
||||
# Custom channels from policy
|
||||
if policy and policy.custom_channels:
|
||||
for channel in policy.custom_channels:
|
||||
# channel format: {"type": "SLACK", "target": "#alerts"}
|
||||
targets.append((channel.get("type"), channel.get("target")))
|
||||
|
||||
return targets
|
||||
|
||||
def _find_dashboard_owners(self, record: ValidationRecord) -> List[UserDashboardPreference]:
|
||||
# This is a simplified owner lookup.
|
||||
# In a real scenario, we'd query Superset for owners, then match them to our UserDashboardPreference.
|
||||
# For now, we'll return all users who have bound this dashboard's environment and have a username.
|
||||
|
||||
# Placeholder: return all preferences that have a superset_username
|
||||
# (In production, we'd filter by actual ownership from Superset metadata)
|
||||
return self.db.query(UserDashboardPreference).filter(
|
||||
UserDashboardPreference.superset_username != None
|
||||
).all()
|
||||
|
||||
def _build_body(self, record: ValidationRecord) -> str:
|
||||
return (
|
||||
f"Dashboard ID: {record.dashboard_id}\n"
|
||||
f"Environment: {record.environment_id}\n"
|
||||
f"Status: {record.status}\n\n"
|
||||
f"Summary: {record.summary}\n\n"
|
||||
f"Issues found: {len(record.issues)}"
|
||||
)
|
||||
# [/DEF:NotificationService:Class]
|
||||
|
||||
# [/DEF:backend.src.services.notifications.service:Module]
|
||||
@@ -173,12 +173,29 @@ class ProfileService:
|
||||
payload.dashboards_table_density
|
||||
)
|
||||
|
||||
effective_telegram_id = self._sanitize_text(preference.telegram_id)
|
||||
if "telegram_id" in provided_fields:
|
||||
effective_telegram_id = self._sanitize_text(payload.telegram_id)
|
||||
|
||||
effective_email_address = self._sanitize_text(preference.email_address)
|
||||
if "email_address" in provided_fields:
|
||||
effective_email_address = self._sanitize_text(payload.email_address)
|
||||
|
||||
effective_notify_on_fail = (
|
||||
bool(preference.notify_on_fail)
|
||||
if preference.notify_on_fail is not None
|
||||
else True
|
||||
)
|
||||
if "notify_on_fail" in provided_fields:
|
||||
effective_notify_on_fail = bool(payload.notify_on_fail)
|
||||
|
||||
validation_errors = self._validate_update_payload(
|
||||
superset_username=effective_superset_username,
|
||||
show_only_my_dashboards=effective_show_only,
|
||||
git_email=effective_git_email,
|
||||
start_page=effective_start_page,
|
||||
dashboards_table_density=effective_dashboards_table_density,
|
||||
email_address=effective_email_address,
|
||||
)
|
||||
if validation_errors:
|
||||
logger.reflect("[REFLECT] Validation failed; mutation is denied")
|
||||
@@ -205,6 +222,9 @@ class ProfileService:
|
||||
preference.start_page = effective_start_page
|
||||
preference.auto_open_task_drawer = effective_auto_open_task_drawer
|
||||
preference.dashboards_table_density = effective_dashboards_table_density
|
||||
preference.telegram_id = effective_telegram_id
|
||||
preference.email_address = effective_email_address
|
||||
preference.notify_on_fail = effective_notify_on_fail
|
||||
preference.updated_at = datetime.utcnow()
|
||||
|
||||
persisted_preference = self.auth_repository.save_user_dashboard_preference(preference)
|
||||
@@ -453,6 +473,9 @@ class ProfileService:
|
||||
dashboards_table_density=self._normalize_density(
|
||||
preference.dashboards_table_density
|
||||
),
|
||||
telegram_id=self._sanitize_text(preference.telegram_id),
|
||||
email_address=self._sanitize_text(preference.email_address),
|
||||
notify_on_fail=bool(preference.notify_on_fail) if preference.notify_on_fail is not None else True,
|
||||
created_at=created_at,
|
||||
updated_at=updated_at,
|
||||
)
|
||||
@@ -570,6 +593,9 @@ class ProfileService:
|
||||
start_page="dashboards",
|
||||
auto_open_task_drawer=True,
|
||||
dashboards_table_density="comfortable",
|
||||
telegram_id=None,
|
||||
email_address=None,
|
||||
notify_on_fail=True,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
@@ -586,6 +612,7 @@ class ProfileService:
|
||||
git_email: Optional[str],
|
||||
start_page: str,
|
||||
dashboards_table_density: str,
|
||||
email_address: Optional[str] = None,
|
||||
) -> List[str]:
|
||||
errors: List[str] = []
|
||||
sanitized_username = self._sanitize_username(superset_username)
|
||||
@@ -613,6 +640,16 @@ class ProfileService:
|
||||
if dashboards_table_density not in SUPPORTED_DENSITIES:
|
||||
errors.append("Dashboards table density value is not supported.")
|
||||
|
||||
sanitized_email = self._sanitize_text(email_address)
|
||||
if sanitized_email:
|
||||
if (
|
||||
" " in sanitized_email
|
||||
or "@" not in sanitized_email
|
||||
or sanitized_email.startswith("@")
|
||||
or sanitized_email.endswith("@")
|
||||
):
|
||||
errors.append("Notification email should be a valid email address.")
|
||||
|
||||
return errors
|
||||
# [/DEF:_validate_update_payload:Function]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user