feat add connections management and health summary improvements

This commit is contained in:
2026-03-15 16:40:43 +03:00
parent eba0fab091
commit 027d17f193
17 changed files with 8521 additions and 10819 deletions

View File

@@ -0,0 +1,72 @@
# [DEF:backend.src.api.routes.__tests__.test_connections_routes:Module]
# @TIER: STANDARD
# @PURPOSE: Verifies connection routes bootstrap their table before CRUD access.
# @LAYER: API
# @RELATION: VERIFIES -> backend.src.api.routes.connections
import os
import sys
import asyncio
from pathlib import Path
import pytest
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
# Force SQLite in-memory for database module imports.
os.environ["DATABASE_URL"] = "sqlite:///:memory:"
os.environ["TASKS_DATABASE_URL"] = "sqlite:///:memory:"
os.environ["AUTH_DATABASE_URL"] = "sqlite:///:memory:"
os.environ["ENVIRONMENT"] = "testing"
backend_dir = str(Path(__file__).parent.parent.parent.parent.resolve())
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
@pytest.fixture
def db_session():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
session = sessionmaker(bind=engine)()
try:
yield session
finally:
session.close()
def test_list_connections_bootstraps_missing_table(db_session):
from src.api.routes.connections import list_connections
result = asyncio.run(list_connections(db=db_session))
inspector = inspect(db_session.get_bind())
assert result == []
assert "connection_configs" in inspector.get_table_names()
def test_create_connection_bootstraps_missing_table(db_session):
from src.api.routes.connections import ConnectionCreate, create_connection
payload = ConnectionCreate(
name="Analytics Warehouse",
type="postgres",
host="warehouse.internal",
port=5432,
database="analytics",
username="reporter",
password="secret",
)
created = asyncio.run(create_connection(connection=payload, db=db_session))
inspector = inspect(db_session.get_bind())
assert created.name == "Analytics Warehouse"
assert created.host == "warehouse.internal"
assert "connection_configs" in inspector.get_table_names()
# [/DEF:backend.src.api.routes.__tests__.test_connections_routes:Module]

View File

@@ -9,7 +9,7 @@
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from ...core.database import get_db
from ...core.database import get_db, ensure_connection_configs_table
from ...models.connection import ConnectionConfig
from pydantic import BaseModel
from datetime import datetime
@@ -18,6 +18,16 @@ from ...core.logger import logger, belief_scope
router = APIRouter()
# [DEF:_ensure_connections_schema:Function]
# @PURPOSE: Ensures the connection_configs table exists before CRUD access.
# @PRE: db is an active SQLAlchemy session.
# @POST: The current bind can safely query ConnectionConfig.
def _ensure_connections_schema(db: Session):
with belief_scope("ConnectionsRouter.ensure_schema"):
ensure_connection_configs_table(db.get_bind())
# [/DEF:_ensure_connections_schema:Function]
# [DEF:ConnectionSchema:Class]
# @PURPOSE: Pydantic model for connection response.
class ConnectionSchema(BaseModel):
@@ -55,6 +65,7 @@ class ConnectionCreate(BaseModel):
@router.get("", response_model=List[ConnectionSchema])
async def list_connections(db: Session = Depends(get_db)):
with belief_scope("ConnectionsRouter.list_connections"):
_ensure_connections_schema(db)
connections = db.query(ConnectionConfig).all()
return connections
# [/DEF:list_connections:Function]
@@ -69,6 +80,7 @@ async def list_connections(db: Session = Depends(get_db)):
@router.post("", response_model=ConnectionSchema, status_code=status.HTTP_201_CREATED)
async def create_connection(connection: ConnectionCreate, db: Session = Depends(get_db)):
with belief_scope("ConnectionsRouter.create_connection", f"name={connection.name}"):
_ensure_connections_schema(db)
db_connection = ConnectionConfig(**connection.dict())
db.add(db_connection)
db.commit()
@@ -87,6 +99,7 @@ async def create_connection(connection: ConnectionCreate, db: Session = Depends(
@router.delete("/{connection_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_connection(connection_id: str, db: Session = Depends(get_db)):
with belief_scope("ConnectionsRouter.delete_connection", f"id={connection_id}"):
_ensure_connections_schema(db)
db_connection = db.query(ConnectionConfig).filter(ConnectionConfig.id == connection_id).first()
if not db_connection:
logger.error(f"[ConnectionsRouter.delete_connection][State] Connection {connection_id} not found")
@@ -97,4 +110,4 @@ async def delete_connection(connection_id: str, db: Session = Depends(get_db)):
return
# [/DEF:delete_connection:Function]
# [/DEF:ConnectionsRouter:Module]
# [/DEF:ConnectionsRouter:Module]

View File

@@ -16,13 +16,14 @@ from starlette.middleware.sessions import SessionMiddleware
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import asyncio
from .dependencies import get_task_manager, get_scheduler_service
from .core.utils.network import NetworkError
from .core.logger import logger, belief_scope
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile, health
from .api import auth
import asyncio
from .dependencies import get_task_manager, get_scheduler_service
from .core.encryption_key import ensure_encryption_key
from .core.utils.network import NetworkError
from .core.logger import logger, belief_scope
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile, health
from .api import auth
# [DEF:App:Global]
# @SEMANTICS: app, fastapi, instance
@@ -39,12 +40,13 @@ app = FastAPI(
# @PRE: None.
# @POST: Scheduler is started.
# Startup event
@app.on_event("startup")
async def startup_event():
with belief_scope("startup_event"):
scheduler = get_scheduler_service()
scheduler.start()
# [/DEF:startup_event:Function]
@app.on_event("startup")
async def startup_event():
with belief_scope("startup_event"):
ensure_encryption_key()
scheduler = get_scheduler_service()
scheduler.start()
# [/DEF:startup_event:Function]
# [DEF:shutdown_event:Function]
# @PURPOSE: Handles application shutdown tasks, such as stopping the scheduler.

View File

@@ -14,6 +14,7 @@
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.orm import sessionmaker
from ..models.mapping import Base
from ..models.connection import ConnectionConfig
# Import models to ensure they're registered with Base
from ..models import task as _task_models # noqa: F401
from ..models import auth as _auth_models # noqa: F401
@@ -22,6 +23,7 @@ from ..models import llm as _llm_models # noqa: F401
from ..models import assistant as _assistant_models # noqa: F401
from ..models import profile as _profile_models # noqa: F401
from ..models import clean_release as _clean_release_models # noqa: F401
from ..models import connection as _connection_models # noqa: F401
from .logger import belief_scope, logger
from .auth.config import auth_config
import os
@@ -281,6 +283,23 @@ def _ensure_git_server_configs_columns(bind_engine):
# [/DEF:_ensure_git_server_configs_columns:Function]
# [DEF:ensure_connection_configs_table:Function]
# @PURPOSE: Ensures the external connection registry table exists in the main database.
# @PRE: bind_engine points to the application database.
# @POST: connection_configs table exists without dropping existing data.
def ensure_connection_configs_table(bind_engine):
with belief_scope("ensure_connection_configs_table"):
try:
ConnectionConfig.__table__.create(bind=bind_engine, checkfirst=True)
except Exception as migration_error:
logger.warning(
"[database][EXPLORE] ConnectionConfig table ensure failed: %s",
migration_error,
)
raise
# [/DEF:ensure_connection_configs_table:Function]
# [DEF:init_db:Function]
# @PURPOSE: Initializes the database by creating all tables.
# @PRE: engine, tasks_engine and auth_engine are initialized.
@@ -295,6 +314,7 @@ def init_db():
_ensure_llm_validation_results_columns(engine)
_ensure_user_dashboard_preferences_health_columns(engine)
_ensure_git_server_configs_columns(engine)
ensure_connection_configs_table(engine)
# [/DEF:init_db:Function]
# [DEF:get_db:Function]

View File

@@ -0,0 +1,56 @@
# [DEF:backend.src.core.encryption_key:Module]
# @TIER: CRITICAL
# @SEMANTICS: encryption, key, bootstrap, environment, startup
# @PURPOSE: Resolve and persist the Fernet encryption key required by runtime services.
# @LAYER: Infra
# @RELATION: DEPENDS_ON -> backend.src.core.logger
# @INVARIANT: Runtime key resolution never falls back to an ephemeral secret.
from __future__ import annotations
import os
from pathlib import Path
from cryptography.fernet import Fernet
from .logger import logger, belief_scope
DEFAULT_ENV_FILE_PATH = Path(__file__).resolve().parents[2] / ".env"
# [DEF:ensure_encryption_key:Function]
# @PURPOSE: Ensure backend runtime has a persistent valid Fernet key.
# @PRE: env_file_path points to a writable backend .env file or ENCRYPTION_KEY exists in process environment.
# @POST: Returns a valid Fernet key and guarantees it is present in process environment.
# @SIDE_EFFECT: May create or append backend/.env when key is missing.
def ensure_encryption_key(env_file_path: Path = DEFAULT_ENV_FILE_PATH) -> str:
with belief_scope("ensure_encryption_key", f"env_file_path={env_file_path}"):
existing_key = os.getenv("ENCRYPTION_KEY", "").strip()
if existing_key:
Fernet(existing_key.encode())
logger.reason("Using ENCRYPTION_KEY from process environment.")
return existing_key
if env_file_path.exists():
for raw_line in env_file_path.read_text(encoding="utf-8").splitlines():
if raw_line.startswith("ENCRYPTION_KEY="):
persisted_key = raw_line.partition("=")[2].strip()
if persisted_key:
Fernet(persisted_key.encode())
os.environ["ENCRYPTION_KEY"] = persisted_key
logger.reason(f"Loaded ENCRYPTION_KEY from {env_file_path}.")
return persisted_key
generated_key = Fernet.generate_key().decode()
with env_file_path.open("a", encoding="utf-8") as env_file:
if env_file.tell() > 0:
env_file.write("\n")
env_file.write(f"ENCRYPTION_KEY={generated_key}\n")
os.environ["ENCRYPTION_KEY"] = generated_key
logger.reason(f"Generated ENCRYPTION_KEY and persisted it to {env_file_path}.")
logger.reflect("Encryption key is available for runtime services.")
return generated_key
# [/DEF:ensure_encryption_key:Function]
# [/DEF:backend.src.core.encryption_key:Module]

View File

@@ -8,54 +8,18 @@
# @INVARIANT: Safe to run multiple times (idempotent).
# [SECTION: IMPORTS]
import os
import sys
from pathlib import Path
from cryptography.fernet import Fernet
# Add src to path
sys.path.append(str(Path(__file__).parent.parent.parent))
from src.core.database import init_db
from src.core.encryption_key import ensure_encryption_key
from src.core.logger import logger, belief_scope
from src.scripts.seed_permissions import seed_permissions
# [/SECTION]
ENV_FILE_PATH = Path(__file__).resolve().parents[2] / ".env"
# [DEF:ensure_encryption_key:Function]
# @PURPOSE: Ensure backend runtime has a persistent Fernet encryption key during first-time installation.
# @PRE: Backend root is writable or ENCRYPTION_KEY is already provided via environment.
# @POST: ENCRYPTION_KEY exists in process environment or backend/.env.
def ensure_encryption_key(env_file_path: Path = ENV_FILE_PATH) -> str:
existing_key = os.getenv("ENCRYPTION_KEY", "").strip()
if existing_key:
Fernet(existing_key.encode())
logger.info("ENCRYPTION_KEY already provided via environment; skipping generation.")
return existing_key
if env_file_path.exists():
for raw_line in env_file_path.read_text(encoding="utf-8").splitlines():
if raw_line.startswith("ENCRYPTION_KEY="):
persisted_key = raw_line.partition("=")[2].strip()
if persisted_key:
Fernet(persisted_key.encode())
os.environ["ENCRYPTION_KEY"] = persisted_key
logger.info(f"Loaded existing ENCRYPTION_KEY from {env_file_path}.")
return persisted_key
generated_key = Fernet.generate_key().decode()
with env_file_path.open("a", encoding="utf-8") as env_file:
if env_file.tell() > 0:
env_file.write("\n")
env_file.write(f"ENCRYPTION_KEY={generated_key}\n")
os.environ["ENCRYPTION_KEY"] = generated_key
logger.info(f"Generated ENCRYPTION_KEY and persisted it to {env_file_path}.")
return generated_key
# [/DEF:ensure_encryption_key:Function]
# [DEF:run_init:Function]
# @PURPOSE: Main entry point for the initialization script.
# @POST: auth.db is initialized with the correct schema and seeded permissions.

View File

@@ -122,6 +122,43 @@ async def test_get_health_summary_resolves_slug_and_title_from_superset():
mock_client.get_dashboards_summary.assert_called_once_with()
@pytest.mark.anyio
async def test_get_health_summary_reuses_dashboard_metadata_cache_across_service_instances():
HealthService._dashboard_summary_cache.clear()
db = MagicMock()
config_manager = MagicMock()
config_manager.get_environments.return_value = [MagicMock(id="env_1")]
record = ValidationRecord(
id="rec-1",
dashboard_id="42",
environment_id="env_1",
status="PASS",
timestamp=datetime.utcnow(),
summary="Healthy",
issues=[],
)
db.query.return_value.join.return_value.all.return_value = [record]
with patch("src.services.health_service.SupersetClient") as mock_client_cls:
mock_client = MagicMock()
mock_client.get_dashboards_summary.return_value = [
{"id": 42, "slug": "ops-overview", "title": "Ops Overview"}
]
mock_client_cls.return_value = mock_client
first_service = HealthService(db, config_manager=config_manager)
second_service = HealthService(db, config_manager=config_manager)
first_summary = await first_service.get_health_summary(environment_id="env_1")
second_summary = await second_service.get_health_summary(environment_id="env_1")
assert first_summary.items[0].dashboard_slug == "ops-overview"
assert second_summary.items[0].dashboard_slug == "ops-overview"
mock_client.get_dashboards_summary.assert_called_once_with()
HealthService._dashboard_summary_cache.clear()
def test_delete_validation_report_deletes_dashboard_scope_and_linked_tasks():
db = MagicMock()
config_manager = MagicMock()

View File

@@ -8,6 +8,7 @@
# @RELATION: DEPENDS_ON -> backend.src.core.task_manager.cleanup.TaskCleanupService
from typing import List, Dict, Any, Optional, Tuple
import time
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
import os
@@ -24,6 +25,9 @@ from ..core.task_manager import TaskManager
# @RELATION: CALLS -> backend.src.core.superset_client.SupersetClient
# @RELATION: CALLS -> backend.src.core.task_manager.cleanup.TaskCleanupService
class HealthService:
_dashboard_summary_cache: Dict[str, Tuple[float, Dict[str, Dict[str, Optional[str]]]]] = {}
_dashboard_summary_cache_ttl_seconds = 60.0
"""
@PURPOSE: Service for managing and querying dashboard health data.
"""
@@ -77,15 +81,27 @@ class HealthService:
continue
try:
dashboards = SupersetClient(env).get_dashboards_summary()
dashboard_meta_map = {
str(item.get("id")): {
"slug": item.get("slug"),
"title": item.get("title"),
cached_meta = self.__class__._dashboard_summary_cache.get(environment_id)
cache_is_fresh = (
cached_meta is not None
and (time.monotonic() - cached_meta[0]) < self.__class__._dashboard_summary_cache_ttl_seconds
)
if cache_is_fresh:
dashboard_meta_map = cached_meta[1]
else:
dashboards = SupersetClient(env).get_dashboards_summary()
dashboard_meta_map = {
str(item.get("id")): {
"slug": item.get("slug"),
"title": item.get("title"),
}
for item in dashboards
if str(item.get("id") or "").strip()
}
for item in dashboards
if str(item.get("id") or "").strip()
}
self.__class__._dashboard_summary_cache[environment_id] = (
time.monotonic(),
dashboard_meta_map,
)
for dashboard_id in dashboard_ids:
self._dashboard_meta_cache[(environment_id, dashboard_id)] = dashboard_meta_map.get(
dashboard_id,