# [DEF:backend.src.api.routes.environments:Module] # # @TIER: STANDARD # @SEMANTICS: api, environments, superset, databases # @PURPOSE: API endpoints for listing environments and their databases. # @LAYER: API # @RELATION: DEPENDS_ON -> backend.src.dependencies # @RELATION: DEPENDS_ON -> backend.src.core.superset_client # # @INVARIANT: Environment IDs must exist in the configuration. # [SECTION: IMPORTS] from fastapi import APIRouter, Depends, HTTPException from typing import List, Optional from ...dependencies import get_config_manager, get_scheduler_service, has_permission from ...core.superset_client import SupersetClient from pydantic import BaseModel, Field from ...core.logger import belief_scope # [/SECTION] router = APIRouter(prefix="/api/environments", tags=["Environments"]) # [DEF:_normalize_superset_env_url:Function] # @PURPOSE: Canonicalize Superset environment URL to base host/path without trailing /api/v1. # @PRE: raw_url can be empty. # @POST: Returns normalized base URL. def _normalize_superset_env_url(raw_url: str) -> str: normalized = str(raw_url or "").strip().rstrip("/") if normalized.lower().endswith("/api/v1"): normalized = normalized[:-len("/api/v1")] return normalized.rstrip("/") # [/DEF:_normalize_superset_env_url:Function] # [DEF:ScheduleSchema:DataClass] class ScheduleSchema(BaseModel): enabled: bool = False cron_expression: str = Field(..., pattern=r'^(@(annually|yearly|monthly|weekly|daily|hourly|reboot))|((((\d+,)*\d+|(\d+(\/|-)\d+)|\d+|\*) ?){4,6})$') # [/DEF:ScheduleSchema:DataClass] # [DEF:EnvironmentResponse:DataClass] class EnvironmentResponse(BaseModel): id: str name: str url: str stage: str = "DEV" is_production: bool = False backup_schedule: Optional[ScheduleSchema] = None # [/DEF:EnvironmentResponse:DataClass] # [DEF:DatabaseResponse:DataClass] class DatabaseResponse(BaseModel): uuid: str database_name: str engine: Optional[str] # [/DEF:DatabaseResponse:DataClass] # [DEF:get_environments:Function] # @PURPOSE: List all configured environments. # @LAYER: API # @SEMANTICS: list, environments, config # @PRE: config_manager is injected via Depends. # @POST: Returns a list of EnvironmentResponse objects. # @RETURN: List[EnvironmentResponse] @router.get("", response_model=List[EnvironmentResponse]) async def get_environments( config_manager=Depends(get_config_manager), _ = Depends(has_permission("environments", "READ")) ): with belief_scope("get_environments"): envs = config_manager.get_environments() # Ensure envs is a list if not isinstance(envs, list): envs = [] response_items = [] for e in envs: resolved_stage = str( getattr(e, "stage", "") or ("PROD" if bool(getattr(e, "is_production", False)) else "DEV") ).upper() response_items.append( EnvironmentResponse( id=e.id, name=e.name, url=_normalize_superset_env_url(e.url), stage=resolved_stage, is_production=(resolved_stage == "PROD"), backup_schedule=ScheduleSchema( enabled=e.backup_schedule.enabled, cron_expression=e.backup_schedule.cron_expression ) if getattr(e, 'backup_schedule', None) else None ) ) return response_items # [/DEF:get_environments:Function] # [DEF:update_environment_schedule:Function] # @PURPOSE: Update backup schedule for an environment. # @LAYER: API # @SEMANTICS: update, schedule, backup, environment # @PRE: Environment id exists, schedule is valid ScheduleSchema. # @POST: Backup schedule updated and scheduler reloaded. # @PARAM: id (str) - The environment ID. # @PARAM: schedule (ScheduleSchema) - The new schedule. @router.put("/{id}/schedule") async def update_environment_schedule( id: str, schedule: ScheduleSchema, config_manager=Depends(get_config_manager), scheduler_service=Depends(get_scheduler_service), _ = Depends(has_permission("admin:settings", "WRITE")) ): with belief_scope("update_environment_schedule", f"id={id}"): envs = config_manager.get_environments() env = next((e for e in envs if e.id == id), None) if not env: raise HTTPException(status_code=404, detail="Environment not found") # Update environment config env.backup_schedule.enabled = schedule.enabled env.backup_schedule.cron_expression = schedule.cron_expression config_manager.update_environment(id, env) # Refresh scheduler scheduler_service.load_schedules() return {"message": "Schedule updated successfully"} # [/DEF:update_environment_schedule:Function] # [DEF:get_environment_databases:Function] # @PURPOSE: Fetch the list of databases from a specific environment. # @LAYER: API # @SEMANTICS: fetch, databases, superset, environment # @PRE: Environment id exists. # @POST: Returns a list of database summaries from the environment. # @PARAM: id (str) - The environment ID. # @RETURN: List[Dict] - List of databases. @router.get("/{id}/databases") async def get_environment_databases( id: str, config_manager=Depends(get_config_manager), _ = Depends(has_permission("admin:settings", "READ")) ): with belief_scope("get_environment_databases", f"id={id}"): envs = config_manager.get_environments() env = next((e for e in envs if e.id == id), None) if not env: raise HTTPException(status_code=404, detail="Environment not found") try: # Initialize SupersetClient from environment config client = SupersetClient(env) return client.get_databases_summary() except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to fetch databases: {str(e)}") # [/DEF:get_environment_databases:Function] # [/DEF:backend.src.api.routes.environments:Module]