chore(semantic): checkpoint remediation progress

This commit is contained in:
2026-03-15 21:08:00 +03:00
parent 15d3141aef
commit 84a2cd5429
25 changed files with 1935 additions and 1559 deletions

View File

@@ -4,8 +4,8 @@
# @SEMANTICS: api, admin, users, roles, permissions
# @PURPOSE: Admin API endpoints for user and role management.
# @LAYER: API
# @RELATION: USES -> backend.src.core.auth.repository.AuthRepository
# @RELATION: USES -> backend.src.dependencies.has_permission
# @RELATION: [USES] ->[backend.src.core.auth.repository.AuthRepository]
# @RELATION: [USES] ->[backend.src.dependencies.has_permission]
#
# @INVARIANT: All endpoints in this module require 'Admin' role or 'admin' scope.
@@ -36,6 +36,7 @@ router = APIRouter(prefix="/api/admin", tags=["admin"])
# [/DEF:router:Variable]
# [DEF:list_users:Function]
# @TIER: STANDARD
# @PURPOSE: Lists all registered users.
# @PRE: Current user has 'Admin' role.
# @POST: Returns a list of UserSchema objects.
@@ -52,6 +53,7 @@ async def list_users(
# [/DEF:list_users:Function]
# [DEF:create_user:Function]
# @TIER: STANDARD
# @PURPOSE: Creates a new local user.
# @PRE: Current user has 'Admin' role.
# @POST: New user is created in the database.
@@ -89,6 +91,7 @@ async def create_user(
# [/DEF:create_user:Function]
# [DEF:update_user:Function]
# @TIER: STANDARD
# @PURPOSE: Updates an existing user.
@router.put("/users/{user_id}", response_model=UserSchema)
async def update_user(
@@ -123,6 +126,7 @@ async def update_user(
# [/DEF:update_user:Function]
# [DEF:delete_user:Function]
# @TIER: STANDARD
# @PURPOSE: Deletes a user.
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
@@ -146,6 +150,7 @@ async def delete_user(
# [/DEF:delete_user:Function]
# [DEF:list_roles:Function]
# @TIER: STANDARD
# @PURPOSE: Lists all available roles.
# @RETURN: List[RoleSchema] - List of roles.
# @RELATION: CALLS -> backend.src.models.auth.Role
@@ -159,6 +164,7 @@ async def list_roles(
# [/DEF:list_roles:Function]
# [DEF:create_role:Function]
# @TIER: STANDARD
# @PURPOSE: Creates a new system role with associated permissions.
# @PRE: Role name must be unique.
# @POST: New Role record is created in auth.db.
@@ -196,6 +202,7 @@ async def create_role(
# [/DEF:create_role:Function]
# [DEF:update_role:Function]
# @TIER: STANDARD
# @PURPOSE: Updates an existing role's metadata and permissions.
# @PRE: role_id must be a valid existing role UUID.
# @POST: Role record is updated in auth.db.
@@ -240,6 +247,7 @@ async def update_role(
# [/DEF:update_role:Function]
# [DEF:delete_role:Function]
# @TIER: STANDARD
# @PURPOSE: Removes a role from the system.
# @PRE: role_id must be a valid existing role UUID.
# @POST: Role record is removed from auth.db.
@@ -266,6 +274,7 @@ async def delete_role(
# [/DEF:delete_role:Function]
# [DEF:list_permissions:Function]
# @TIER: STANDARD
# @PURPOSE: Lists all available system permissions for assignment.
# @POST: Returns a list of all PermissionSchema objects.
# @PARAM: db (Session) - Auth database session.
@@ -291,6 +300,7 @@ async def list_permissions(
# [/DEF:list_permissions:Function]
# [DEF:list_ad_mappings:Function]
# @TIER: STANDARD
# @PURPOSE: Lists all AD Group to Role mappings.
@router.get("/ad-mappings", response_model=List[ADGroupMappingSchema])
async def list_ad_mappings(
@@ -302,6 +312,7 @@ async def list_ad_mappings(
# [/DEF:list_ad_mappings:Function]
# [DEF:create_ad_mapping:Function]
# @TIER: STANDARD
# @PURPOSE: Creates a new AD Group mapping.
@router.post("/ad-mappings", response_model=ADGroupMappingSchema)
async def create_ad_mapping(

View File

@@ -3,8 +3,8 @@
# @SEMANTICS: api, assistant, chat, command, confirmation
# @PURPOSE: API routes for LLM assistant command parsing and safe execution orchestration.
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.core.task_manager
# @RELATION: DEPENDS_ON -> backend.src.models.assistant
# @RELATION: [DEPENDS_ON] ->[backend.src.core.task_manager.manager.TaskManager]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.assistant]
# @INVARIANT: Risky operations are never executed without valid confirmation token.
from __future__ import annotations
@@ -125,6 +125,7 @@ INTENT_PERMISSION_CHECKS: Dict[str, List[Tuple[str, str]]] = {
# [DEF:_append_history:Function]
# @TIER: STANDARD
# @PURPOSE: Append conversation message to in-memory history buffer.
# @PRE: user_id and conversation_id identify target conversation bucket.
# @POST: Message entry is appended to CONVERSATIONS key list.
@@ -156,6 +157,7 @@ def _append_history(
# [DEF:_persist_message:Function]
# @TIER: STANDARD
# @PURPOSE: Persist assistant/user message record to database.
# @PRE: db session is writable and message payload is serializable.
# @POST: Message row is committed or persistence failure is logged.
@@ -191,6 +193,7 @@ def _persist_message(
# [DEF:_audit:Function]
# @TIER: STANDARD
# @PURPOSE: Append in-memory audit record for assistant decision trace.
# @PRE: payload describes decision/outcome fields.
# @POST: ASSISTANT_AUDIT list for user contains new timestamped entry.
@@ -203,6 +206,7 @@ def _audit(user_id: str, payload: Dict[str, Any]):
# [DEF:_persist_audit:Function]
# @TIER: STANDARD
# @PURPOSE: Persist structured assistant audit payload in database.
# @PRE: db session is writable and payload is JSON-serializable.
# @POST: Audit row is committed or failure is logged with rollback.
@@ -226,6 +230,7 @@ def _persist_audit(db: Session, user_id: str, payload: Dict[str, Any], conversat
# [DEF:_persist_confirmation:Function]
# @TIER: STANDARD
# @PURPOSE: Persist confirmation token record to database.
# @PRE: record contains id/user/intent/dispatch/expiry fields.
# @POST: Confirmation row exists in persistent storage.
@@ -251,6 +256,7 @@ def _persist_confirmation(db: Session, record: ConfirmationRecord):
# [DEF:_update_confirmation_state:Function]
# @TIER: STANDARD
# @PURPOSE: Update persistent confirmation token lifecycle state.
# @PRE: confirmation_id references existing row.
# @POST: State and consumed_at fields are updated when applicable.
@@ -270,6 +276,7 @@ def _update_confirmation_state(db: Session, confirmation_id: str, state: str):
# [DEF:_load_confirmation_from_db:Function]
# @TIER: STANDARD
# @PURPOSE: Load confirmation token from database into in-memory model.
# @PRE: confirmation_id may or may not exist in storage.
# @POST: Returns ConfirmationRecord when found, otherwise None.
@@ -295,6 +302,7 @@ def _load_confirmation_from_db(db: Session, confirmation_id: str) -> Optional[Co
# [DEF:_ensure_conversation:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve active conversation id in memory or create a new one.
# @PRE: user_id identifies current actor.
# @POST: Returns stable conversation id and updates USER_ACTIVE_CONVERSATION.
@@ -314,6 +322,7 @@ def _ensure_conversation(user_id: str, conversation_id: Optional[str]) -> str:
# [DEF:_resolve_or_create_conversation:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve active conversation using explicit id, memory cache, or persisted history.
# @PRE: user_id and db session are available.
# @POST: Returns conversation id and updates USER_ACTIVE_CONVERSATION cache.
@@ -343,6 +352,7 @@ def _resolve_or_create_conversation(user_id: str, conversation_id: Optional[str]
# [DEF:_cleanup_history_ttl:Function]
# @TIER: STANDARD
# @PURPOSE: Enforce assistant message retention window by deleting expired rows and in-memory records.
# @PRE: db session is available and user_id references current actor scope.
# @POST: Messages older than ASSISTANT_MESSAGE_TTL_DAYS are removed from persistence and memory mirrors.
@@ -380,6 +390,7 @@ def _cleanup_history_ttl(db: Session, user_id: str):
# [DEF:_is_conversation_archived:Function]
# @TIER: STANDARD
# @PURPOSE: Determine archived state for a conversation based on last update timestamp.
# @PRE: updated_at can be null for empty conversations.
# @POST: Returns True when conversation inactivity exceeds archive threshold.
@@ -392,6 +403,7 @@ def _is_conversation_archived(updated_at: Optional[datetime]) -> bool:
# [DEF:_coerce_query_bool:Function]
# @TIER: STANDARD
# @PURPOSE: Normalize bool-like query values for compatibility in direct handler invocations/tests.
# @PRE: value may be bool, string, or FastAPI Query metadata object.
# @POST: Returns deterministic boolean flag.
@@ -405,6 +417,7 @@ def _coerce_query_bool(value: Any) -> bool:
# [DEF:_extract_id:Function]
# @TIER: STANDARD
# @PURPOSE: Extract first regex match group from text by ordered pattern list.
# @PRE: patterns contain at least one capture group.
# @POST: Returns first matched token or None.
@@ -418,6 +431,7 @@ def _extract_id(text: str, patterns: List[str]) -> Optional[str]:
# [DEF:_resolve_env_id:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve environment identifier/name token to canonical environment id.
# @PRE: config_manager provides environment list.
# @POST: Returns matched environment id or None.
@@ -435,6 +449,7 @@ def _resolve_env_id(token: Optional[str], config_manager: ConfigManager) -> Opti
# [DEF:_is_production_env:Function]
# @TIER: STANDARD
# @PURPOSE: Determine whether environment token resolves to production-like target.
# @PRE: config_manager provides environments or token text is provided.
# @POST: Returns True for production/prod synonyms, else False.
@@ -452,6 +467,7 @@ def _is_production_env(token: Optional[str], config_manager: ConfigManager) -> b
# [DEF:_resolve_provider_id:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve provider token to provider id with active/default fallback.
# @PRE: db session can load provider list through LLMProviderService.
# @POST: Returns provider id or None when no providers configured.
@@ -487,6 +503,7 @@ def _resolve_provider_id(
# [DEF:_get_default_environment_id:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve default environment id from settings or first configured environment.
# @PRE: config_manager returns environments list.
# @POST: Returns default environment id or None when environment list is empty.
@@ -508,6 +525,7 @@ def _get_default_environment_id(config_manager: ConfigManager) -> Optional[str]:
# [DEF:_resolve_dashboard_id_by_ref:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard id by title or slug reference in selected environment.
# @PRE: dashboard_ref is a non-empty string-like token.
# @POST: Returns dashboard id when uniquely matched, otherwise None.
@@ -550,6 +568,7 @@ def _resolve_dashboard_id_by_ref(
# [DEF:_resolve_dashboard_id_entity:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard id from intent entities using numeric id or dashboard_ref fallback.
# @PRE: entities may contain dashboard_id as int/str and optional dashboard_ref.
# @POST: Returns resolved dashboard id or None when ambiguous/unresolvable.
@@ -581,6 +600,7 @@ def _resolve_dashboard_id_entity(
# [DEF:_get_environment_name_by_id:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve human-readable environment name by id.
# @PRE: environment id may be None.
# @POST: Returns matching environment name or fallback id.
@@ -593,6 +613,7 @@ def _get_environment_name_by_id(env_id: Optional[str], config_manager: ConfigMan
# [DEF:_extract_result_deep_links:Function]
# @TIER: STANDARD
# @PURPOSE: Build deep-link actions to verify task result from assistant chat.
# @PRE: task object is available.
# @POST: Returns zero or more assistant actions for dashboard open/diff.
@@ -649,6 +670,7 @@ def _extract_result_deep_links(task: Any, config_manager: ConfigManager) -> List
# [DEF:_build_task_observability_summary:Function]
# @TIER: STANDARD
# @PURPOSE: Build compact textual summary for completed tasks to reduce "black box" effect.
# @PRE: task may contain plugin-specific result payload.
# @POST: Returns non-empty summary line for known task types or empty string fallback.
@@ -712,6 +734,7 @@ def _build_task_observability_summary(task: Any, config_manager: ConfigManager)
# [DEF:_parse_command:Function]
# @TIER: STANDARD
# @PURPOSE: Deterministically parse RU/EN command text into intent payload.
# @PRE: message contains raw user text and config manager resolves environments.
# @POST: Returns intent dict with domain/operation/entities/confidence/risk fields.
@@ -905,6 +928,7 @@ def _parse_command(message: str, config_manager: ConfigManager) -> Dict[str, Any
# [DEF:_check_any_permission:Function]
# @TIER: STANDARD
# @PURPOSE: Validate user against alternative permission checks (logical OR).
# @PRE: checks list contains resource-action tuples.
# @POST: Returns on first successful permission; raises 403-like HTTPException otherwise.
@@ -922,6 +946,7 @@ def _check_any_permission(current_user: User, checks: List[Tuple[str, str]]):
# [DEF:_has_any_permission:Function]
# @TIER: STANDARD
# @PURPOSE: Check whether user has at least one permission tuple from the provided list.
# @PRE: current_user and checks list are valid.
# @POST: Returns True when at least one permission check passes.
@@ -935,6 +960,7 @@ def _has_any_permission(current_user: User, checks: List[Tuple[str, str]]) -> bo
# [DEF:_build_tool_catalog:Function]
# @TIER: STANDARD
# @PURPOSE: Build current-user tool catalog for LLM planner with operation contracts and defaults.
# @PRE: current_user is authenticated; config/db are available.
# @POST: Returns list of executable tools filtered by permission and runtime availability.
@@ -1058,6 +1084,7 @@ def _build_tool_catalog(current_user: User, config_manager: ConfigManager, db: S
# [DEF:_coerce_intent_entities:Function]
# @TIER: STANDARD
# @PURPOSE: Normalize intent entity value types from LLM output to route-compatible values.
# @PRE: intent contains entities dict or missing entities.
# @POST: Returned intent has numeric ids coerced where possible and string values stripped.
@@ -1082,6 +1109,7 @@ _SAFE_OPS = {"show_capabilities", "get_task_status", "get_health_summary"}
# [DEF:_confirmation_summary:Function]
# @TIER: STANDARD
# @PURPOSE: Build human-readable confirmation prompt for an intent before execution.
# @PRE: intent contains operation and entities fields.
# @POST: Returns descriptive Russian-language text ending with confirmation prompt.
@@ -1177,6 +1205,7 @@ async def _async_confirmation_summary(intent: Dict[str, Any], config_manager: Co
# [DEF:_clarification_text_for_intent:Function]
# @TIER: STANDARD
# @PURPOSE: Convert technical missing-parameter errors into user-facing clarification prompts.
# @PRE: state was classified as needs_clarification for current intent/error combination.
# @POST: Returned text is human-readable and actionable for target operation.
@@ -1200,6 +1229,7 @@ def _clarification_text_for_intent(intent: Optional[Dict[str, Any]], detail_text
# [DEF:_plan_intent_with_llm:Function]
# @TIER: STANDARD
# @PURPOSE: Use active LLM provider to select best tool/operation from dynamic catalog.
# @PRE: tools list contains allowed operations for current user.
# @POST: Returns normalized intent dict when planning succeeds; otherwise None.
@@ -1310,6 +1340,7 @@ async def _plan_intent_with_llm(
# [DEF:_authorize_intent:Function]
# @TIER: STANDARD
# @PURPOSE: Validate user permissions for parsed intent before confirmation/dispatch.
# @PRE: intent.operation is present for known assistant command domains.
# @POST: Returns if authorized; raises HTTPException(403) when denied.
@@ -1321,6 +1352,7 @@ def _authorize_intent(intent: Dict[str, Any], current_user: User):
# [DEF:_dispatch_intent:Function]
# @TIER: STANDARD
# @PURPOSE: Execute parsed assistant intent via existing task/plugin/git services.
# @PRE: intent operation is known and actor permissions are validated per operation.
# @POST: Returns response text, optional task id, and UI actions for follow-up.
@@ -1642,6 +1674,7 @@ async def _dispatch_intent(
@router.post("/messages", response_model=AssistantMessageResponse)
# [DEF:send_message:Function]
# @TIER: STANDARD
# @PURPOSE: Parse assistant command, enforce safety gates, and dispatch executable intent.
# @PRE: Authenticated user is available and message text is non-empty.
# @POST: Response state is one of clarification/confirmation/started/success/denied/failed.
@@ -1811,6 +1844,7 @@ async def send_message(
@router.post("/confirmations/{confirmation_id}/confirm", response_model=AssistantMessageResponse)
# [DEF:confirm_operation:Function]
# @TIER: STANDARD
# @PURPOSE: Execute previously requested risky operation after explicit user confirmation.
# @PRE: confirmation_id exists, belongs to current user, is pending, and not expired.
# @POST: Confirmation state becomes consumed and operation result is persisted in history.
@@ -1877,6 +1911,7 @@ async def confirm_operation(
@router.post("/confirmations/{confirmation_id}/cancel", response_model=AssistantMessageResponse)
# [DEF:cancel_operation:Function]
# @TIER: STANDARD
# @PURPOSE: Cancel pending risky operation and mark confirmation token as cancelled.
# @PRE: confirmation_id exists, belongs to current user, and is still pending.
# @POST: Confirmation becomes cancelled and cannot be executed anymore.
@@ -1933,6 +1968,7 @@ async def cancel_operation(
# [DEF:list_conversations:Function]
# @TIER: STANDARD
# @PURPOSE: Return paginated conversation list for current user with archived flag and last message preview.
# @PRE: Authenticated user context and valid pagination params.
# @POST: Conversations are grouped by conversation_id sorted by latest activity descending.
@@ -2020,6 +2056,7 @@ async def list_conversations(
# [DEF:delete_conversation:Function]
# @TIER: STANDARD
# @PURPOSE: Soft-delete or hard-delete a conversation and clear its in-memory trace.
# @PRE: conversation_id belongs to current_user.
# @POST: Conversation records are removed from DB and CONVERSATIONS cache.

View File

@@ -4,12 +4,17 @@
# @SEMANTICS: api, dashboards, resources, hub
# @PURPOSE: API endpoints for the Dashboard Hub - listing dashboards with Git and task status
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.dependencies
# @RELATION: DEPENDS_ON -> backend.src.services.resource_service
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
# @RELATION: DEPENDS_ON ->[backend.src.dependencies:Dependencies]
# @RELATION: DEPENDS_ON ->[backend.src.services.resource_service:ResourceService]
# @RELATION: DEPENDS_ON ->[backend.src.core.superset_client:SupersetClient]
#
# @INVARIANT: All dashboard responses include git_status and last_task metadata
#
# @PRE: Valid environment configurations exist in ConfigManager.
# @POST: Dashboard responses are projected into DashboardsResponse DTO.
# @SIDE_EFFECT: Performs external calls to Superset API and potentially Git providers.
# @DATA_CONTRACT: Input(env_id, filters) -> Output(DashboardsResponse)
#
# @TEST_CONTRACT: DashboardsAPI -> {
# required_fields: {env_id: string, page: integer, page_size: integer},
# optional_fields: {search: string},
@@ -61,6 +66,8 @@ from ...services.resource_service import ResourceService
router = APIRouter(prefix="/api/dashboards", tags=["Dashboards"])
# [DEF:GitStatus:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for dashboard Git synchronization status.
class GitStatus(BaseModel):
branch: Optional[str] = None
sync_status: Optional[str] = Field(None, pattern="^OK|DIFF|NO_REPO|ERROR$")
@@ -69,6 +76,8 @@ class GitStatus(BaseModel):
# [/DEF:GitStatus:DataClass]
# [DEF:LastTask:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for the most recent background task associated with a dashboard.
class LastTask(BaseModel):
task_id: Optional[str] = None
status: Optional[str] = Field(
@@ -79,6 +88,8 @@ class LastTask(BaseModel):
# [/DEF:LastTask:DataClass]
# [DEF:DashboardItem:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO representing a single dashboard with projected metadata.
class DashboardItem(BaseModel):
id: int
title: str
@@ -93,6 +104,8 @@ class DashboardItem(BaseModel):
# [/DEF:DashboardItem:DataClass]
# [DEF:EffectiveProfileFilter:DataClass]
# @TIER: STANDARD
# @PURPOSE: Metadata about applied profile filters for UI context.
class EffectiveProfileFilter(BaseModel):
applied: bool
source_page: Literal["dashboards_main", "other"] = "dashboards_main"
@@ -104,6 +117,8 @@ class EffectiveProfileFilter(BaseModel):
# [/DEF:EffectiveProfileFilter:DataClass]
# [DEF:DashboardsResponse:DataClass]
# @TIER: STANDARD
# @PURPOSE: Envelope DTO for paginated dashboards list.
class DashboardsResponse(BaseModel):
dashboards: List[DashboardItem]
total: int
@@ -114,6 +129,8 @@ class DashboardsResponse(BaseModel):
# [/DEF:DashboardsResponse:DataClass]
# [DEF:DashboardChartItem:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for a chart linked to a dashboard.
class DashboardChartItem(BaseModel):
id: int
title: str
@@ -124,6 +141,8 @@ class DashboardChartItem(BaseModel):
# [/DEF:DashboardChartItem:DataClass]
# [DEF:DashboardDatasetItem:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for a dataset associated with a dashboard.
class DashboardDatasetItem(BaseModel):
id: int
table_name: str
@@ -134,6 +153,8 @@ class DashboardDatasetItem(BaseModel):
# [/DEF:DashboardDatasetItem:DataClass]
# [DEF:DashboardDetailResponse:DataClass]
# @TIER: STANDARD
# @PURPOSE: Detailed dashboard metadata including children.
class DashboardDetailResponse(BaseModel):
id: int
title: str
@@ -149,6 +170,8 @@ class DashboardDetailResponse(BaseModel):
# [/DEF:DashboardDetailResponse:DataClass]
# [DEF:DashboardTaskHistoryItem:DataClass]
# @TIER: STANDARD
# @PURPOSE: Individual history record entry.
class DashboardTaskHistoryItem(BaseModel):
id: str
plugin_id: str
@@ -161,12 +184,16 @@ class DashboardTaskHistoryItem(BaseModel):
# [/DEF:DashboardTaskHistoryItem:DataClass]
# [DEF:DashboardTaskHistoryResponse:DataClass]
# @TIER: STANDARD
# @PURPOSE: Collection DTO for task history.
class DashboardTaskHistoryResponse(BaseModel):
dashboard_id: int
items: List[DashboardTaskHistoryItem]
# [/DEF:DashboardTaskHistoryResponse:DataClass]
# [DEF:DatabaseMapping:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for cross-environment database ID mapping.
class DatabaseMapping(BaseModel):
source_db: str
target_db: str
@@ -176,12 +203,15 @@ class DatabaseMapping(BaseModel):
# [/DEF:DatabaseMapping:DataClass]
# [DEF:DatabaseMappingsResponse:DataClass]
# @TIER: STANDARD
# @PURPOSE: Wrapper for database mappings.
class DatabaseMappingsResponse(BaseModel):
mappings: List[DatabaseMapping]
# [/DEF:DatabaseMappingsResponse:DataClass]
# [DEF:_find_dashboard_id_by_slug:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard numeric ID by slug using Superset list endpoint.
# @PRE: `dashboard_slug` is non-empty.
# @POST: Returns dashboard ID when found, otherwise None.
@@ -209,6 +239,7 @@ def _find_dashboard_id_by_slug(
# [DEF:_resolve_dashboard_id_from_ref:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard ID from slug-first reference with numeric fallback.
# @PRE: `dashboard_ref` is provided in route path.
# @POST: Returns a valid dashboard ID or raises HTTPException(404).
@@ -233,6 +264,7 @@ def _resolve_dashboard_id_from_ref(
# [DEF:_find_dashboard_id_by_slug_async:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard numeric ID by slug using async Superset list endpoint.
# @PRE: dashboard_slug is non-empty.
# @POST: Returns dashboard ID when found, otherwise None.
@@ -260,6 +292,7 @@ async def _find_dashboard_id_by_slug_async(
# [DEF:_resolve_dashboard_id_from_ref_async:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard ID from slug-first reference using async Superset client.
# @PRE: dashboard_ref is provided in route path.
# @POST: Returns valid dashboard ID or raises HTTPException(404).
@@ -283,6 +316,7 @@ async def _resolve_dashboard_id_from_ref_async(
# [DEF:_normalize_filter_values:Function]
# @TIER: STANDARD
# @PURPOSE: Normalize query filter values to lower-cased non-empty tokens.
# @PRE: values may be None or list of strings.
# @POST: Returns trimmed normalized list preserving input order.
@@ -299,6 +333,7 @@ def _normalize_filter_values(values: Optional[List[str]]) -> List[str]:
# [DEF:_dashboard_git_filter_value:Function]
# @TIER: STANDARD
# @PURPOSE: Build comparable git status token for dashboards filtering.
# @PRE: dashboard payload may contain git_status or None.
# @POST: Returns one of ok|diff|no_repo|error|pending.
@@ -318,6 +353,7 @@ def _dashboard_git_filter_value(dashboard: Dict[str, Any]) -> str:
# [/DEF:_dashboard_git_filter_value:Function]
# [DEF:_normalize_actor_alias_token:Function]
# @TIER: STANDARD
# @PURPOSE: Normalize actor alias token to comparable trim+lower text.
# @PRE: value can be scalar/None.
# @POST: Returns normalized token or None.
@@ -328,6 +364,7 @@ def _normalize_actor_alias_token(value: Any) -> Optional[str]:
# [DEF:_normalize_owner_display_token:Function]
# @TIER: STANDARD
# @PURPOSE: Project owner payload value into stable display string for API response contracts.
# @PRE: owner can be scalar, dict or None.
# @POST: Returns trimmed non-empty owner display token or None.
@@ -354,6 +391,7 @@ def _normalize_owner_display_token(owner: Any) -> Optional[str]:
# [DEF:_normalize_dashboard_owner_values:Function]
# @TIER: STANDARD
# @PURPOSE: Normalize dashboard owners payload to optional list of display strings.
# @PRE: owners payload can be None, scalar, or list with mixed values.
# @POST: Returns deduplicated owner labels preserving order, or None when absent.
@@ -378,6 +416,7 @@ def _normalize_dashboard_owner_values(owners: Any) -> Optional[List[str]]:
# [DEF:_project_dashboard_response_items:Function]
# @TIER: STANDARD
# @PURPOSE: Project dashboard payloads to response-contract-safe shape.
# @PRE: dashboards is a list of dict-like dashboard payloads.
# @POST: Returned items satisfy DashboardItem owners=list[str]|None contract.
@@ -394,6 +433,7 @@ def _project_dashboard_response_items(dashboards: List[Dict[str, Any]]) -> List[
# [DEF:_resolve_profile_actor_aliases:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve stable actor aliases for profile filtering without per-dashboard detail fan-out.
# @PRE: bound username is available and env is valid.
# @POST: Returns at least normalized username; may include Superset display-name alias.
@@ -458,6 +498,7 @@ def _resolve_profile_actor_aliases(env: Any, bound_username: str) -> List[str]:
# [DEF:_matches_dashboard_actor_aliases:Function]
# @TIER: STANDARD
# @PURPOSE: Apply profile actor matching against multiple aliases (username + optional display name).
# @PRE: actor_aliases contains normalized non-empty tokens.
# @POST: Returns True when any alias matches owners OR modified_by.
@@ -479,6 +520,7 @@ def _matches_dashboard_actor_aliases(
# [DEF:get_dashboards:Function]
# @TIER: STANDARD
# @PURPOSE: Fetch list of dashboards from a specific environment with Git status and last task status
# @PRE: env_id must be a valid environment ID
# @PRE: page must be >= 1 if provided
@@ -491,7 +533,7 @@ def _matches_dashboard_actor_aliases(
# @PARAM: page (Optional[int]) - Page number (default: 1)
# @PARAM: page_size (Optional[int]) - Items per page (default: 10, max: 100)
# @RETURN: DashboardsResponse - List of dashboards with status metadata
# @RELATION: CALLS -> ResourceService.get_dashboards_with_status
# @RELATION: CALLS ->[ResourceService:get_dashboards_with_status]
@router.get("", response_model=DashboardsResponse)
async def get_dashboards(
env_id: str,
@@ -781,6 +823,7 @@ async def get_dashboards(
# [/DEF:get_dashboards:Function]
# [DEF:get_database_mappings:Function]
# @TIER: STANDARD
# @PURPOSE: Get database mapping suggestions between source and target environments
# @PRE: User has permission plugin:migration:read
# @PRE: source_env_id and target_env_id are valid environment IDs
@@ -788,7 +831,7 @@ async def get_dashboards(
# @PARAM: source_env_id (str) - Source environment ID
# @PARAM: target_env_id (str) - Target environment ID
# @RETURN: DatabaseMappingsResponse - List of suggested mappings
# @RELATION: CALLS -> MappingService.get_suggestions
# @RELATION: CALLS ->[MappingService:get_suggestions]
@router.get("/db-mappings", response_model=DatabaseMappingsResponse)
async def get_database_mappings(
source_env_id: str,
@@ -836,10 +879,11 @@ async def get_database_mappings(
# [/DEF:get_database_mappings:Function]
# [DEF:get_dashboard_detail:Function]
# @TIER: STANDARD
# @PURPOSE: Fetch detailed dashboard info with related charts and datasets
# @PRE: env_id must be valid and dashboard ref (slug or id) must exist
# @POST: Returns dashboard detail payload for overview page
# @RELATION: CALLS -> SupersetClient.get_dashboard_detail
# @RELATION: CALLS ->[AsyncSupersetClient:get_dashboard_detail_async]
@router.get("/{dashboard_ref}", response_model=DashboardDetailResponse)
async def get_dashboard_detail(
dashboard_ref: str,
@@ -873,6 +917,7 @@ async def get_dashboard_detail(
# [DEF:_task_matches_dashboard:Function]
# @TIER: STANDARD
# @PURPOSE: Checks whether task params are tied to a specific dashboard and environment.
# @PRE: task-like object exposes plugin_id and params fields.
# @POST: Returns True only for supported task plugins tied to dashboard_id (+optional env_id).
@@ -906,6 +951,7 @@ def _task_matches_dashboard(task: Any, dashboard_id: int, env_id: Optional[str])
# [DEF:get_dashboard_tasks_history:Function]
# @TIER: STANDARD
# @PURPOSE: Returns history of backup and LLM validation tasks for a dashboard.
# @PRE: dashboard ref (slug or id) is valid.
# @POST: Response contains sorted task history (newest first).
@@ -992,6 +1038,7 @@ async def get_dashboard_tasks_history(
# [DEF:get_dashboard_thumbnail:Function]
# @TIER: STANDARD
# @PURPOSE: Proxies Superset dashboard thumbnail with cache support.
# @PRE: env_id must exist.
# @POST: Returns image bytes or 202 when thumbnail is being prepared by Superset.
@@ -1072,7 +1119,7 @@ async def get_dashboard_thumbnail(
content_type = thumb_response.headers.get("Content-Type", "image/png")
return Response(content=thumb_response.content, media_type=content_type)
except DashboardNotFoundError as e:
except DashboardNotFoundError as e:
logger.error(f"[get_dashboard_thumbnail][Coherence:Failed] Dashboard not found for thumbnail: {e}")
raise HTTPException(status_code=404, detail="Dashboard thumbnail not found")
except HTTPException:
@@ -1085,6 +1132,8 @@ async def get_dashboard_thumbnail(
# [/DEF:get_dashboard_thumbnail:Function]
# [DEF:MigrateRequest:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for dashboard migration requests.
class MigrateRequest(BaseModel):
source_env_id: str = Field(..., description="Source environment ID")
target_env_id: str = Field(..., description="Target environment ID")
@@ -1094,11 +1143,14 @@ class MigrateRequest(BaseModel):
# [/DEF:MigrateRequest:DataClass]
# [DEF:TaskResponse:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for async task ID return.
class TaskResponse(BaseModel):
task_id: str
# [/DEF:TaskResponse:DataClass]
# [DEF:migrate_dashboards:Function]
# @TIER: STANDARD
# @PURPOSE: Trigger bulk migration of dashboards from source to target environment
# @PRE: User has permission plugin:migration:execute
# @PRE: source_env_id and target_env_id are valid environment IDs
@@ -1107,8 +1159,8 @@ class TaskResponse(BaseModel):
# @POST: Task is created and queued for execution
# @PARAM: request (MigrateRequest) - Migration request with source, target, and dashboard IDs
# @RETURN: TaskResponse - Task ID for tracking
# @RELATION: DISPATCHES -> MigrationPlugin
# @RELATION: CALLS -> task_manager.create_task
# @RELATION: DISPATCHES ->[MigrationPlugin:execute]
# @RELATION: CALLS ->[task_manager:create_task]
@router.post("/migrate", response_model=TaskResponse)
async def migrate_dashboards(
request: MigrateRequest,
@@ -1159,6 +1211,8 @@ async def migrate_dashboards(
# [/DEF:migrate_dashboards:Function]
# [DEF:BackupRequest:DataClass]
# @TIER: STANDARD
# @PURPOSE: DTO for dashboard backup requests.
class BackupRequest(BaseModel):
env_id: str = Field(..., description="Environment ID")
dashboard_ids: List[int] = Field(..., description="List of dashboard IDs to backup")
@@ -1166,6 +1220,7 @@ class BackupRequest(BaseModel):
# [/DEF:BackupRequest:DataClass]
# [DEF:backup_dashboards:Function]
# @TIER: STANDARD
# @PURPOSE: Trigger bulk backup of dashboards with optional cron schedule
# @PRE: User has permission plugin:backup:execute
# @PRE: env_id is a valid environment ID
@@ -1175,8 +1230,8 @@ class BackupRequest(BaseModel):
# @POST: If schedule is provided, a scheduled task is created
# @PARAM: request (BackupRequest) - Backup request with environment and dashboard IDs
# @RETURN: TaskResponse - Task ID for tracking
# @RELATION: DISPATCHES -> BackupPlugin
# @RELATION: CALLS -> task_manager.create_task
# @RELATION: DISPATCHES ->[BackupPlugin:execute]
# @RELATION: CALLS ->[task_manager:create_task]
@router.post("/backup", response_model=TaskResponse)
async def backup_dashboards(
request: BackupRequest,

View File

@@ -4,9 +4,9 @@
# @SEMANTICS: api, datasets, resources, hub
# @PURPOSE: API endpoints for the Dataset Hub - listing datasets with mapping progress
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.dependencies
# @RELATION: DEPENDS_ON -> backend.src.services.resource_service
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
# @RELATION: DEPENDS_ON ->[backend.src.dependencies]
# @RELATION: DEPENDS_ON ->[backend.src.services.resource_service]
# @RELATION: DEPENDS_ON ->[backend.src.core.superset_client]
#
# @INVARIANT: All dataset responses include last_task metadata
@@ -22,28 +22,39 @@ from ...core.superset_client import SupersetClient
router = APIRouter(prefix="/api/datasets", tags=["Datasets"])
# [DEF:MappedFields:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: DTO for dataset mapping progress statistics
class MappedFields(BaseModel):
total: int
mapped: int
# [/DEF:MappedFields:DataClass]
# [DEF:LastTask:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: DTO for the most recent task associated with a dataset
class LastTask(BaseModel):
task_id: Optional[str] = None
status: Optional[str] = Field(None, pattern="^RUNNING|SUCCESS|ERROR|WAITING_INPUT$")
# [/DEF:LastTask:DataClass]
# [DEF:DatasetItem:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: Summary DTO for a dataset in the hub listing
class DatasetItem(BaseModel):
id: int
table_name: str
schema: str
schema_name: str = Field(..., alias="schema")
database: str
mapped_fields: Optional[MappedFields] = None
last_task: Optional[LastTask] = None
class Config:
allow_population_by_field_name = True
# [/DEF:DatasetItem:DataClass]
# [DEF:LinkedDashboard:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: DTO for a dashboard linked to a dataset
class LinkedDashboard(BaseModel):
id: int
title: str
@@ -51,6 +62,8 @@ class LinkedDashboard(BaseModel):
# [/DEF:LinkedDashboard:DataClass]
# [DEF:DatasetColumn:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: DTO for a single dataset column's metadata
class DatasetColumn(BaseModel):
id: int
name: str
@@ -61,10 +74,12 @@ class DatasetColumn(BaseModel):
# [/DEF:DatasetColumn:DataClass]
# [DEF:DatasetDetailResponse:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: Detailed DTO for a dataset including columns and links
class DatasetDetailResponse(BaseModel):
id: int
table_name: Optional[str] = None
schema: Optional[str] = None
schema_name: Optional[str] = Field(None, alias="schema")
database: str
description: Optional[str] = None
columns: List[DatasetColumn]
@@ -75,9 +90,14 @@ class DatasetDetailResponse(BaseModel):
is_sqllab_view: bool = False
created_on: Optional[str] = None
changed_on: Optional[str] = None
class Config:
allow_population_by_field_name = True
# [/DEF:DatasetDetailResponse:DataClass]
# [DEF:DatasetsResponse:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: Paginated response DTO for dataset listings
class DatasetsResponse(BaseModel):
datasets: List[DatasetItem]
total: int
@@ -87,18 +107,21 @@ class DatasetsResponse(BaseModel):
# [/DEF:DatasetsResponse:DataClass]
# [DEF:TaskResponse:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: Response DTO containing a task ID for tracking
class TaskResponse(BaseModel):
task_id: str
# [/DEF:TaskResponse:DataClass]
# [DEF:get_dataset_ids:Function]
# @TIER: STANDARD
# @PURPOSE: Fetch list of all dataset IDs from a specific environment (without pagination)
# @PRE: env_id must be a valid environment ID
# @POST: Returns a list of all dataset IDs
# @PARAM: env_id (str) - The environment ID to fetch datasets from
# @PARAM: search (Optional[str]) - Filter by table name
# @RETURN: List[int] - List of dataset IDs
# @RELATION: CALLS -> ResourceService.get_datasets_with_status
# @RELATION: CALLS ->[backend.src.services.resource_service.ResourceService:get_datasets_with_status]
@router.get("/ids")
async def get_dataset_ids(
env_id: str,
@@ -143,6 +166,7 @@ async def get_dataset_ids(
# [/DEF:get_dataset_ids:Function]
# [DEF:get_datasets:Function]
# @TIER: STANDARD
# @PURPOSE: Fetch list of datasets from a specific environment with mapping progress
# @PRE: env_id must be a valid environment ID
# @PRE: page must be >= 1 if provided
@@ -154,7 +178,7 @@ async def get_dataset_ids(
# @PARAM: page (Optional[int]) - Page number (default: 1)
# @PARAM: page_size (Optional[int]) - Items per page (default: 10, max: 100)
# @RETURN: DatasetsResponse - List of datasets with status metadata
# @RELATION: CALLS -> ResourceService.get_datasets_with_status
# @RELATION: CALLS ->[backend.src.services.resource_service.ResourceService:get_datasets_with_status]
@router.get("", response_model=DatasetsResponse)
async def get_datasets(
env_id: str,
@@ -222,6 +246,8 @@ async def get_datasets(
# [/DEF:get_datasets:Function]
# [DEF:MapColumnsRequest:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: Request DTO for initiating column mapping
class MapColumnsRequest(BaseModel):
env_id: str = Field(..., description="Environment ID")
dataset_ids: List[int] = Field(..., description="List of dataset IDs to map")
@@ -231,6 +257,7 @@ class MapColumnsRequest(BaseModel):
# [/DEF:MapColumnsRequest:DataClass]
# [DEF:map_columns:Function]
# @TIER: STANDARD
# @PURPOSE: Trigger bulk column mapping for datasets
# @PRE: User has permission plugin:mapper:execute
# @PRE: env_id is a valid environment ID
@@ -239,8 +266,8 @@ class MapColumnsRequest(BaseModel):
# @POST: Task is created and queued for execution
# @PARAM: request (MapColumnsRequest) - Mapping request with environment and dataset IDs
# @RETURN: TaskResponse - Task ID for tracking
# @RELATION: DISPATCHES -> MapperPlugin
# @RELATION: CALLS -> task_manager.create_task
# @RELATION: DISPATCHES ->[backend.src.plugins.mapper.MapperPlugin]
# @RELATION: CALLS ->[backend.src.core.task_manager.manager.TaskManager:create_task]
@router.post("/map-columns", response_model=TaskResponse)
async def map_columns(
request: MapColumnsRequest,
@@ -292,6 +319,8 @@ async def map_columns(
# [/DEF:map_columns:Function]
# [DEF:GenerateDocsRequest:DataClass]
# @TIER: TRIVIAL
# @PURPOSE: Request DTO for initiating documentation generation
class GenerateDocsRequest(BaseModel):
env_id: str = Field(..., description="Environment ID")
dataset_ids: List[int] = Field(..., description="List of dataset IDs to generate docs for")
@@ -300,6 +329,7 @@ class GenerateDocsRequest(BaseModel):
# [/DEF:GenerateDocsRequest:DataClass]
# [DEF:generate_docs:Function]
# @TIER: STANDARD
# @PURPOSE: Trigger bulk documentation generation for datasets
# @PRE: User has permission plugin:llm_analysis:execute
# @PRE: env_id is a valid environment ID
@@ -308,8 +338,8 @@ class GenerateDocsRequest(BaseModel):
# @POST: Task is created and queued for execution
# @PARAM: request (GenerateDocsRequest) - Documentation generation request
# @RETURN: TaskResponse - Task ID for tracking
# @RELATION: DISPATCHES -> LLMAnalysisPlugin
# @RELATION: CALLS -> task_manager.create_task
# @RELATION: DISPATCHES ->[backend.src.plugins.llm_analysis.plugin.DocumentationPlugin]
# @RELATION: CALLS ->[backend.src.core.task_manager.manager.TaskManager:create_task]
@router.post("/generate-docs", response_model=TaskResponse)
async def generate_docs(
request: GenerateDocsRequest,
@@ -355,6 +385,7 @@ async def generate_docs(
# [/DEF:generate_docs:Function]
# [DEF:get_dataset_detail:Function]
# @TIER: STANDARD
# @PURPOSE: Get detailed dataset information including columns and linked dashboards
# @PRE: env_id is a valid environment ID
# @PRE: dataset_id is a valid dataset ID
@@ -362,7 +393,7 @@ async def generate_docs(
# @PARAM: env_id (str) - The environment ID
# @PARAM: dataset_id (int) - The dataset ID
# @RETURN: DatasetDetailResponse - Detailed dataset information
# @RELATION: CALLS -> SupersetClient.get_dataset_detail
# @RELATION: CALLS ->[backend.src.core.superset_client.SupersetClient:get_dataset_detail]
@router.get("/{dataset_id}", response_model=DatasetDetailResponse)
async def get_dataset_detail(
env_id: str,

View File

@@ -4,9 +4,9 @@
# @SEMANTICS: git, routes, api, fastapi, repository, deployment
# @PURPOSE: Provides FastAPI endpoints for Git integration operations.
# @LAYER: API
# @RELATION: USES -> src.services.git_service.GitService
# @RELATION: USES -> src.api.routes.git_schemas
# @RELATION: USES -> src.models.git
# @RELATION: USES -> [backend.src.services.git_service.GitService]
# @RELATION: USES -> [backend.src.api.routes.git_schemas]
# @RELATION: USES -> [backend.src.models.git]
#
# @INVARIANT: All Git operations must be routed through GitService.
@@ -48,6 +48,7 @@ MAX_REPOSITORY_STATUS_BATCH = 50
# [DEF:_build_no_repo_status_payload:Function]
# @TIER: TRIVIAL
# @PURPOSE: Build a consistent status payload for dashboards without initialized repositories.
# @PRE: None.
# @POST: Returns a stable payload compatible with frontend repository status parsing.
@@ -72,6 +73,7 @@ def _build_no_repo_status_payload() -> dict:
# [DEF:_handle_unexpected_git_route_error:Function]
# @TIER: TRIVIAL
# @PURPOSE: Convert unexpected route-level exceptions to stable 500 API responses.
# @PRE: `error` is a non-HTTPException instance.
# @POST: Raises HTTPException(500) with route-specific context.
@@ -84,6 +86,7 @@ def _handle_unexpected_git_route_error(route_name: str, error: Exception) -> Non
# [DEF:_resolve_repository_status:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve repository status for one dashboard with graceful NO_REPO semantics.
# @PRE: `dashboard_id` is a valid integer.
# @POST: Returns standard status payload or `NO_REPO` payload when repository path is absent.
@@ -110,6 +113,7 @@ def _resolve_repository_status(dashboard_id: int) -> dict:
# [DEF:_get_git_config_or_404:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve GitServerConfig by id or raise 404.
# @PRE: db session is available.
# @POST: Returns GitServerConfig model.
@@ -122,6 +126,7 @@ def _get_git_config_or_404(db: Session, config_id: str) -> GitServerConfig:
# [DEF:_find_dashboard_id_by_slug:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard numeric ID by slug in a specific environment.
# @PRE: dashboard_slug is non-empty.
# @POST: Returns dashboard ID or None when not found.
@@ -148,6 +153,7 @@ def _find_dashboard_id_by_slug(
# [DEF:_resolve_dashboard_id_from_ref:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard ID from slug-or-id reference for Git routes.
# @PRE: dashboard_ref is provided; env_id is required for slug values.
# @POST: Returns numeric dashboard ID or raises HTTPException.
@@ -182,6 +188,7 @@ def _resolve_dashboard_id_from_ref(
# [DEF:_find_dashboard_id_by_slug_async:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard numeric ID by slug asynchronously for hot-path Git routes.
# @PRE: dashboard_slug is non-empty.
# @POST: Returns dashboard ID or None when not found.
@@ -208,6 +215,7 @@ async def _find_dashboard_id_by_slug_async(
# [DEF:_resolve_dashboard_id_from_ref_async:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve dashboard ID asynchronously from slug-or-id reference for hot Git routes.
# @PRE: dashboard_ref is provided; env_id is required for slug values.
# @POST: Returns numeric dashboard ID or raises HTTPException.
@@ -246,6 +254,7 @@ async def _resolve_dashboard_id_from_ref_async(
# [DEF:_resolve_repo_key_from_ref:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve repository folder key with slug-first strategy and deterministic fallback.
# @PRE: dashboard_id is resolved and valid.
# @POST: Returns safe key to be used in local repository path.
@@ -278,6 +287,7 @@ def _resolve_repo_key_from_ref(
# [DEF:_sanitize_optional_identity_value:Function]
# @TIER: TRIVIAL
# @PURPOSE: Normalize optional identity value into trimmed string or None.
# @PRE: value may be None or blank.
# @POST: Returns sanitized value suitable for git identity configuration.
@@ -291,6 +301,7 @@ def _sanitize_optional_identity_value(value: Optional[str]) -> Optional[str]:
# [DEF:_resolve_current_user_git_identity:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve configured Git username/email from current user's profile preferences.
# @PRE: `db` may be stubbed in tests; `current_user` may be absent for direct handler invocations.
# @POST: Returns tuple(username, email) only when both values are configured.
@@ -332,6 +343,7 @@ def _resolve_current_user_git_identity(
# [DEF:_apply_git_identity_from_profile:Function]
# @TIER: STANDARD
# @PURPOSE: Apply user-scoped Git identity to repository-local config before write/pull operations.
# @PRE: dashboard_id is resolved; db/current_user may be missing in direct test invocation context.
# @POST: git_service.configure_identity is called only when identity and method are available.
@@ -355,6 +367,7 @@ def _apply_git_identity_from_profile(
# [DEF:get_git_configs:Function]
# @TIER: STANDARD
# @PURPOSE: List all configured Git servers.
# @PRE: Database session `db` is available.
# @POST: Returns a list of all GitServerConfig objects from the database.
@@ -375,6 +388,7 @@ async def get_git_configs(
# [/DEF:get_git_configs:Function]
# [DEF:create_git_config:Function]
# @TIER: STANDARD
# @PURPOSE: Register a new Git server configuration.
# @PRE: `config` contains valid GitServerConfigCreate data.
# @POST: A new GitServerConfig record is created in the database.
@@ -396,6 +410,7 @@ async def create_git_config(
# [/DEF:create_git_config:Function]
# [DEF:update_git_config:Function]
# @TIER: STANDARD
# @PURPOSE: Update an existing Git server configuration.
# @PRE: `config_id` corresponds to an existing configuration.
# @POST: The configuration record is updated in the database.
@@ -430,6 +445,7 @@ async def update_git_config(
# [/DEF:update_git_config:Function]
# [DEF:delete_git_config:Function]
# @TIER: STANDARD
# @PURPOSE: Remove a Git server configuration.
# @PRE: `config_id` corresponds to an existing configuration.
# @POST: The configuration record is removed from the database.
@@ -451,6 +467,7 @@ async def delete_git_config(
# [/DEF:delete_git_config:Function]
# [DEF:test_git_config:Function]
# @TIER: STANDARD
# @PURPOSE: Validate connection to a Git server using provided credentials.
# @PRE: `config` contains provider, url, and pat.
# @POST: Returns success if the connection is validated via GitService.
@@ -482,6 +499,7 @@ async def test_git_config(
# [DEF:list_gitea_repositories:Function]
# @TIER: STANDARD
# @PURPOSE: List repositories in Gitea for a saved Gitea config.
# @PRE: config_id exists and provider is GITEA.
# @POST: Returns repositories visible to PAT user.
@@ -512,6 +530,7 @@ async def list_gitea_repositories(
# [DEF:create_gitea_repository:Function]
# @TIER: STANDARD
# @PURPOSE: Create a repository in Gitea for a saved Gitea config.
# @PRE: config_id exists and provider is GITEA.
# @POST: Returns created repository payload.
@@ -548,6 +567,7 @@ async def create_gitea_repository(
# [DEF:create_remote_repository:Function]
# @TIER: STANDARD
# @PURPOSE: Create repository on remote Git server using selected provider config.
# @PRE: config_id exists and PAT has creation permissions.
# @POST: Returns normalized remote repository payload.
@@ -608,6 +628,7 @@ async def create_remote_repository(
# [DEF:delete_gitea_repository:Function]
# @TIER: STANDARD
# @PURPOSE: Delete repository in Gitea for a saved Gitea config.
# @PRE: config_id exists and provider is GITEA.
# @POST: Target repository is deleted on Gitea.
@@ -633,6 +654,7 @@ async def delete_gitea_repository(
# [/DEF:delete_gitea_repository:Function]
# [DEF:init_repository:Function]
# @TIER: STANDARD
# @PURPOSE: Link a dashboard to a Git repository and perform initial clone/init.
# @PRE: `dashboard_ref` exists and `init_data` contains valid config_id and remote_url.
# @POST: Repository is initialized on disk and a GitRepository record is saved in DB.
@@ -690,6 +712,7 @@ async def init_repository(
# [/DEF:init_repository:Function]
# [DEF:get_repository_binding:Function]
# @TIER: STANDARD
# @PURPOSE: Return repository binding with provider metadata for selected dashboard.
# @PRE: `dashboard_ref` resolves to a valid dashboard and repository is initialized.
# @POST: Returns dashboard repository binding and linked provider.
@@ -724,6 +747,7 @@ async def get_repository_binding(
# [/DEF:get_repository_binding:Function]
# [DEF:delete_repository:Function]
# @TIER: STANDARD
# @PURPOSE: Delete local repository workspace and DB binding for selected dashboard.
# @PRE: `dashboard_ref` resolves to a valid dashboard.
# @POST: Repository files and binding record are removed when present.
@@ -748,6 +772,7 @@ async def delete_repository(
# [/DEF:delete_repository:Function]
# [DEF:get_branches:Function]
# @TIER: STANDARD
# @PURPOSE: List all branches for a dashboard's repository.
# @PRE: Repository for `dashboard_ref` is initialized.
# @POST: Returns a list of branches from the local repository.
@@ -771,6 +796,7 @@ async def get_branches(
# [/DEF:get_branches:Function]
# [DEF:create_branch:Function]
# @TIER: STANDARD
# @PURPOSE: Create a new branch in the dashboard's repository.
# @PRE: `dashboard_ref` repository exists and `branch_data` has name and from_branch.
# @POST: A new branch is created in the local repository.
@@ -799,6 +825,7 @@ async def create_branch(
# [/DEF:create_branch:Function]
# [DEF:checkout_branch:Function]
# @TIER: STANDARD
# @PURPOSE: Switch the dashboard's repository to a specific branch.
# @PRE: `dashboard_ref` repository exists and branch `checkout_data.name` exists.
# @POST: The local repository HEAD is moved to the specified branch.
@@ -824,6 +851,7 @@ async def checkout_branch(
# [/DEF:checkout_branch:Function]
# [DEF:commit_changes:Function]
# @TIER: STANDARD
# @PURPOSE: Stage and commit changes in the dashboard's repository.
# @PRE: `dashboard_ref` repository exists and `commit_data` has message and files.
# @POST: Specified files are staged and a new commit is created.
@@ -852,6 +880,7 @@ async def commit_changes(
# [/DEF:commit_changes:Function]
# [DEF:push_changes:Function]
# @TIER: STANDARD
# @PURPOSE: Push local commits to the remote repository.
# @PRE: `dashboard_ref` repository exists and has a remote configured.
# @POST: Local commits are pushed to the remote repository.
@@ -875,6 +904,7 @@ async def push_changes(
# [/DEF:push_changes:Function]
# [DEF:pull_changes:Function]
# @TIER: STANDARD
# @PURPOSE: Pull changes from the remote repository.
# @PRE: `dashboard_ref` repository exists and has a remote configured.
# @POST: Remote changes are fetched and merged into the local branch.
@@ -922,6 +952,7 @@ async def pull_changes(
# [/DEF:pull_changes:Function]
# [DEF:get_merge_status:Function]
# @TIER: STANDARD
# @PURPOSE: Return unfinished-merge status for repository (web-only recovery support).
# @PRE: `dashboard_ref` resolves to a valid dashboard repository.
# @POST: Returns merge status payload.
@@ -944,6 +975,7 @@ async def get_merge_status(
# [DEF:get_merge_conflicts:Function]
# @TIER: STANDARD
# @PURPOSE: Return conflicted files with mine/theirs previews for web conflict resolver.
# @PRE: `dashboard_ref` resolves to a valid dashboard repository.
# @POST: Returns conflict file list.
@@ -966,6 +998,7 @@ async def get_merge_conflicts(
# [DEF:resolve_merge_conflicts:Function]
# @TIER: STANDARD
# @PURPOSE: Apply mine/theirs/manual conflict resolutions from WebUI and stage files.
# @PRE: `dashboard_ref` resolves; request contains at least one resolution item.
# @POST: Resolved files are staged in index.
@@ -993,6 +1026,7 @@ async def resolve_merge_conflicts(
# [DEF:abort_merge:Function]
# @TIER: STANDARD
# @PURPOSE: Abort unfinished merge from WebUI flow.
# @PRE: `dashboard_ref` resolves to repository.
# @POST: Merge operation is aborted or reports no active merge.
@@ -1015,6 +1049,7 @@ async def abort_merge(
# [DEF:continue_merge:Function]
# @TIER: STANDARD
# @PURPOSE: Finalize unfinished merge from WebUI flow.
# @PRE: All conflicts are resolved and staged.
# @POST: Merge commit is created.
@@ -1038,6 +1073,7 @@ async def continue_merge(
# [DEF:sync_dashboard:Function]
# @TIER: STANDARD
# @PURPOSE: Sync dashboard state from Superset to Git using the GitPlugin.
# @PRE: `dashboard_ref` is valid; GitPlugin is available.
# @POST: Dashboard YAMLs are exported from Superset and committed to Git.
@@ -1069,6 +1105,7 @@ async def sync_dashboard(
# [DEF:promote_dashboard:Function]
# @TIER: STANDARD
# @PURPOSE: Promote changes between branches via MR or direct merge.
# @PRE: dashboard repository is initialized and Git config is valid.
# @POST: Returns promotion result metadata.
@@ -1171,6 +1208,7 @@ async def promote_dashboard(
# [/DEF:promote_dashboard:Function]
# [DEF:get_environments:Function]
# @TIER: STANDARD
# @PURPOSE: List all deployment environments.
# @PRE: Config manager is accessible.
# @POST: Returns a list of DeploymentEnvironmentSchema objects.
@@ -1193,6 +1231,7 @@ async def get_environments(
# [/DEF:get_environments:Function]
# [DEF:deploy_dashboard:Function]
# @TIER: STANDARD
# @PURPOSE: Deploy dashboard from Git to a target environment.
# @PRE: `dashboard_ref` and `deploy_data.environment_id` are valid.
# @POST: Dashboard YAMLs are read from Git and imported into the target Superset.
@@ -1223,6 +1262,7 @@ async def deploy_dashboard(
# [/DEF:deploy_dashboard:Function]
# [DEF:get_history:Function]
# @TIER: STANDARD
# @PURPOSE: View commit history for a dashboard's repository.
# @PRE: `dashboard_ref` repository exists.
# @POST: Returns a list of recent commits from the repository.
@@ -1248,6 +1288,7 @@ async def get_history(
# [/DEF:get_history:Function]
# [DEF:get_repository_status:Function]
# @TIER: STANDARD
# @PURPOSE: Get current Git status for a dashboard repository.
# @PRE: `dashboard_ref` resolves to a valid dashboard.
# @POST: Returns repository status; if repo is not initialized, returns `NO_REPO` payload.
@@ -1272,6 +1313,7 @@ async def get_repository_status(
# [DEF:get_repository_status_batch:Function]
# @TIER: STANDARD
# @PURPOSE: Get Git statuses for multiple dashboard repositories in one request.
# @PRE: `request.dashboard_ids` is provided.
# @POST: Returns `statuses` map where each key is dashboard ID and value is repository status payload.
@@ -1315,6 +1357,7 @@ async def get_repository_status_batch(
# [/DEF:get_repository_status_batch:Function]
# [DEF:get_repository_diff:Function]
# @TIER: STANDARD
# @PURPOSE: Get Git diff for a dashboard repository.
# @PRE: `dashboard_ref` repository exists.
# @POST: Returns the diff text for the specified file or all changes.
@@ -1343,6 +1386,7 @@ async def get_repository_diff(
# [/DEF:get_repository_diff:Function]
# [DEF:generate_commit_message:Function]
# @TIER: STANDARD
# @PURPOSE: Generate a suggested commit message using LLM.
# @PRE: Repository for `dashboard_ref` is initialized.
# @POST: Returns a suggested commit message string.

View File

@@ -3,14 +3,18 @@
# @SEMANTICS: api, migration, dashboards, sync, dry-run
# @PURPOSE: HTTP contract layer for migration orchestration, settings, dry-run, and mapping sync endpoints.
# @LAYER: Infra
# @RELATION: [DEPENDS_ON] ->[backend.src.dependencies]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.database]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.superset_client]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.migration.dry_run_orchestrator]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.mapping_service]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.dashboard]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.mapping]
# @RELATION: DEPENDS_ON ->[backend.src.dependencies]
# @RELATION: DEPENDS_ON ->[backend.src.core.database]
# @RELATION: DEPENDS_ON ->[backend.src.core.superset_client.SupersetClient]
# @RELATION: DEPENDS_ON ->[backend.src.core.migration.dry_run_orchestrator.MigrationDryRunService]
# @RELATION: DEPENDS_ON ->[backend.src.core.mapping_service.IdMappingService]
# @RELATION: DEPENDS_ON ->[backend.src.models.dashboard]
# @RELATION: DEPENDS_ON ->[backend.src.models.mapping]
# @INVARIANT: Migration endpoints never execute with invalid environment references and always return explicit HTTP errors on guard failures.
# @PRE: Backend core services initialized and Database session available.
# @POST: Migration tasks are enqueued or dry-run results are computed and returned.
# @SIDE_EFFECT: Enqueues long-running tasks, potentially mutates ResourceMapping table, and performs remote Superset API calls.
# @DATA_CONTRACT: [DashboardSelection | QueryParams] -> [TaskResponse | DryRunResult | MappingSummary]
# @TEST_CONTRACT: [DashboardSelection + configured envs] -> [task_id | dry-run result | sync summary]
# @TEST_SCENARIO: [invalid_environment] -> [HTTP_400_or_404]
# @TEST_SCENARIO: [valid_execution] -> [success_payload_with_required_fields]
@@ -34,6 +38,7 @@ from ...models.mapping import ResourceMapping
router = APIRouter(prefix="/api", tags=["migration"])
# [DEF:get_dashboards:Function]
# @TIER: STANDARD
# @PURPOSE: Fetch dashboard metadata from a requested environment for migration selection UI.
# @PRE: env_id is provided and exists in configured environments.
# @POST: Returns List[DashboardMetadata] for the resolved environment; emits HTTP_404 when environment is absent.
@@ -61,6 +66,7 @@ async def get_dashboards(
# [/DEF:get_dashboards:Function]
# [DEF:execute_migration:Function]
# @TIER: CRITICAL
# @PURPOSE: Validate migration selection and enqueue asynchronous migration task execution.
# @PRE: DashboardSelection payload is valid and both source/target environments exist.
# @POST: Returns {"task_id": str, "message": str} when task creation succeeds; emits HTTP_400/HTTP_500 on failure.
@@ -102,6 +108,7 @@ async def execute_migration(
# [DEF:dry_run_migration:Function]
# @TIER: CRITICAL
# @PURPOSE: Build pre-flight migration diff and risk summary without mutating target systems.
# @PRE: DashboardSelection is valid, source and target environments exist, differ, and selected_ids is non-empty.
# @POST: Returns deterministic dry-run payload; emits HTTP_400 for guard violations and HTTP_500 for orchestrator value errors.
@@ -153,6 +160,7 @@ async def dry_run_migration(
# [/DEF:dry_run_migration:Function]
# [DEF:get_migration_settings:Function]
# @TIER: STANDARD
# @PURPOSE: Read and return configured migration synchronization cron expression.
# @PRE: Configuration store is available and requester has READ permission.
# @POST: Returns {"cron": str} reflecting current persisted settings value.
@@ -170,6 +178,7 @@ async def get_migration_settings(
# [/DEF:get_migration_settings:Function]
# [DEF:update_migration_settings:Function]
# @TIER: STANDARD
# @PURPOSE: Validate and persist migration synchronization cron expression update.
# @PRE: Payload includes "cron" key and requester has WRITE permission.
# @POST: Returns {"cron": str, "status": "updated"} and persists updated cron value.
@@ -195,6 +204,7 @@ async def update_migration_settings(
# [/DEF:update_migration_settings:Function]
# [DEF:get_resource_mappings:Function]
# @TIER: STANDARD
# @PURPOSE: Fetch synchronized resource mappings with optional filters and pagination for migration mappings view.
# @PRE: skip>=0, 1<=limit<=500, DB session is active, requester has READ permission.
# @POST: Returns {"items": [...], "total": int} where items reflect applied filters and pagination.
@@ -245,6 +255,7 @@ async def get_resource_mappings(
# [/DEF:get_resource_mappings:Function]
# [DEF:trigger_sync_now:Function]
# @TIER: STANDARD
# @PURPOSE: Trigger immediate ID synchronization for every configured environment.
# @PRE: At least one environment is configured and requester has EXECUTE permission.
# @POST: Returns sync summary with synced/failed counts after attempting all environments.

View File

@@ -3,9 +3,13 @@
# @SEMANTICS: api, reports, list, detail, pagination, filters
# @PURPOSE: FastAPI router for unified task report list and detail retrieval endpoints.
# @LAYER: UI (API)
# @RELATION: DEPENDS_ON -> backend.src.services.reports.report_service.ReportsService
# @RELATION: DEPENDS_ON -> backend.src.dependencies
# @RELATION: DEPENDS_ON -> [backend.src.services.reports.report_service.ReportsService]
# @RELATION: DEPENDS_ON -> [backend.src.dependencies]
# @INVARIANT: Endpoints are read-only and do not trigger long-running tasks.
# @PRE: Reports service and dependencies are initialized.
# @POST: Router is configured and endpoints are ready for registration.
# @SIDE_EFFECT: None
# @DATA_CONTRACT: [ReportQuery] -> [ReportCollection | ReportDetailView]
# [SECTION: IMPORTS]
from datetime import datetime
@@ -25,6 +29,7 @@ router = APIRouter(prefix="/api/reports", tags=["Reports"])
# [DEF:_parse_csv_enum_list:Function]
# @TIER: TRIVIAL
# @PURPOSE: Parse comma-separated query value into enum list.
# @PRE: raw may be None/empty or comma-separated values.
# @POST: Returns enum list or raises HTTP 400 with deterministic machine-readable payload.
@@ -59,6 +64,7 @@ def _parse_csv_enum_list(raw: Optional[str], enum_cls, field_name: str) -> List:
# [DEF:list_reports:Function]
# @TIER: STANDARD
# @PURPOSE: Return paginated unified reports list.
# @PRE: authenticated/authorized request and validated query params.
# @POST: returns {items,total,page,page_size,has_next,applied_filters}.
@@ -125,6 +131,7 @@ async def list_reports(
# [DEF:get_report_detail:Function]
# @TIER: STANDARD
# @PURPOSE: Return one normalized report detail with diagnostics and next actions.
# @PRE: authenticated/authorized request and existing report_id.
# @POST: returns normalized detail envelope or 404 when report is not found.

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@
# @SEMANTICS: storage, files, upload, download, backup, repository
# @PURPOSE: API endpoints for file storage management (backups and repositories).
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.models.storage
# @RELATION: DEPENDS_ON -> [backend.src.models.storage]
#
# @INVARIANT: All paths must be validated against path traversal.
@@ -22,6 +22,7 @@ from ...core.logger import belief_scope
router = APIRouter(tags=["storage"])
# [DEF:list_files:Function]
# @TIER: STANDARD
# @PURPOSE: List all files and directories in the storage system.
#
# @PRE: None.
@@ -31,7 +32,7 @@ router = APIRouter(tags=["storage"])
# @PARAM: path (Optional[str]) - Subpath within the category.
# @RETURN: List[StoredFile] - List of files/directories.
#
# @RELATION: CALLS -> StoragePlugin.list_files
# @RELATION: CALLS -> [backend.src.plugins.storage.plugin.StoragePlugin.list_files]
@router.get("/files", response_model=List[StoredFile])
async def list_files(
category: Optional[FileCategory] = None,
@@ -48,6 +49,7 @@ async def list_files(
# [/DEF:list_files:Function]
# [DEF:upload_file:Function]
# @TIER: STANDARD
# @PURPOSE: Upload a file to the storage system.
#
# @PRE: category must be a valid FileCategory.
@@ -61,7 +63,7 @@ async def list_files(
#
# @SIDE_EFFECT: Writes file to the filesystem.
#
# @RELATION: CALLS -> StoragePlugin.save_file
# @RELATION: CALLS -> [backend.src.plugins.storage.plugin.StoragePlugin.save_file]
@router.post("/upload", response_model=StoredFile, status_code=201)
async def upload_file(
category: FileCategory = Form(...),
@@ -81,6 +83,7 @@ async def upload_file(
# [/DEF:upload_file:Function]
# [DEF:delete_file:Function]
# @TIER: STANDARD
# @PURPOSE: Delete a specific file or directory.
#
# @PRE: category must be a valid FileCategory.
@@ -92,7 +95,7 @@ async def upload_file(
#
# @SIDE_EFFECT: Deletes item from the filesystem.
#
# @RELATION: CALLS -> StoragePlugin.delete_file
# @RELATION: CALLS -> [backend.src.plugins.storage.plugin.StoragePlugin.delete_file]
@router.delete("/files/{category}/{path:path}", status_code=204)
async def delete_file(
category: FileCategory,
@@ -113,6 +116,7 @@ async def delete_file(
# [/DEF:delete_file:Function]
# [DEF:download_file:Function]
# @TIER: STANDARD
# @PURPOSE: Retrieve a file for download.
#
# @PRE: category must be a valid FileCategory.
@@ -122,7 +126,7 @@ async def delete_file(
# @PARAM: path (str) - Relative path of the file.
# @RETURN: FileResponse - The file content.
#
# @RELATION: CALLS -> StoragePlugin.get_file_path
# @RELATION: CALLS -> [backend.src.plugins.storage.plugin.StoragePlugin.get_file_path]
@router.get("/download/{category}/{path:path}")
async def download_file(
category: FileCategory,
@@ -145,6 +149,7 @@ async def download_file(
# [/DEF:download_file:Function]
# [DEF:get_file_by_path:Function]
# @TIER: STANDARD
# @PURPOSE: Retrieve a file by validated absolute/relative path under storage root.
#
# @PRE: path must resolve under configured storage root.
@@ -153,8 +158,8 @@ async def download_file(
# @PARAM: path (str) - Absolute or storage-root-relative file path.
# @RETURN: FileResponse - The file content.
#
# @RELATION: CALLS -> StoragePlugin.get_storage_root
# @RELATION: CALLS -> StoragePlugin.validate_path
# @RELATION: CALLS -> [backend.src.plugins.storage.plugin.StoragePlugin.get_storage_root]
# @RELATION: CALLS -> [backend.src.plugins.storage.plugin.StoragePlugin.validate_path]
@router.get("/file")
async def get_file_by_path(
path: str,

View File

@@ -1,348 +1,324 @@
# [DEF:TasksRouter:Module]
# @TIER: STANDARD
# @SEMANTICS: api, router, tasks, create, list, get, logs
# @PURPOSE: Defines the FastAPI router for task-related endpoints, allowing clients to create, list, and get the status of tasks.
# @LAYER: UI (API)
# @RELATION: Depends on the TaskManager. It is included by the main app.
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel
from ...core.logger import belief_scope
from ...core.task_manager import TaskManager, Task, TaskStatus, LogEntry
from ...core.task_manager.models import LogFilter, LogStats
from ...dependencies import get_task_manager, has_permission, get_current_user, get_config_manager
from ...core.config_manager import ConfigManager
from ...services.llm_prompt_templates import (
is_multimodal_model,
normalize_llm_settings,
resolve_bound_provider_id,
)
router = APIRouter()
TASK_TYPE_PLUGIN_MAP = {
"llm_validation": ["llm_dashboard_validation"],
"backup": ["superset-backup"],
"migration": ["superset-migration"],
}
class CreateTaskRequest(BaseModel):
plugin_id: str
params: Dict[str, Any]
class ResolveTaskRequest(BaseModel):
resolution_params: Dict[str, Any]
class ResumeTaskRequest(BaseModel):
passwords: Dict[str, str]
@router.post("", response_model=Task, status_code=status.HTTP_201_CREATED)
# [DEF:create_task:Function]
# @PURPOSE: Create and start a new task for a given plugin.
# @PARAM: request (CreateTaskRequest) - The request body containing plugin_id and params.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: plugin_id must exist and params must be valid for that plugin.
# @POST: A new task is created and started.
# @RETURN: Task - The created task instance.
async def create_task(
request: CreateTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
current_user = Depends(get_current_user),
config_manager: ConfigManager = Depends(get_config_manager),
):
# Dynamic permission check based on plugin_id
has_permission(f"plugin:{request.plugin_id}", "EXECUTE")(current_user)
"""
Create and start a new task for a given plugin.
"""
with belief_scope("create_task"):
try:
# Special handling for LLM tasks to resolve provider config by task binding.
if request.plugin_id in {"llm_dashboard_validation", "llm_documentation"}:
from ...core.database import SessionLocal
from ...services.llm_provider import LLMProviderService
db = SessionLocal()
try:
llm_service = LLMProviderService(db)
provider_id = request.params.get("provider_id")
if not provider_id:
llm_settings = normalize_llm_settings(config_manager.get_config().settings.llm)
binding_key = "dashboard_validation" if request.plugin_id == "llm_dashboard_validation" else "documentation"
provider_id = resolve_bound_provider_id(llm_settings, binding_key)
if provider_id:
request.params["provider_id"] = provider_id
if not provider_id:
providers = llm_service.get_all_providers()
active_provider = next((p for p in providers if p.is_active), None)
if active_provider:
provider_id = active_provider.id
request.params["provider_id"] = provider_id
if provider_id:
db_provider = llm_service.get_provider(provider_id)
if not db_provider:
raise ValueError(f"LLM Provider {provider_id} not found")
if request.plugin_id == "llm_dashboard_validation" and not is_multimodal_model(
db_provider.default_model,
db_provider.provider_type,
):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Selected provider model is not multimodal for dashboard validation",
)
finally:
db.close()
task = await task_manager.create_task(
plugin_id=request.plugin_id,
params=request.params
)
return task
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
# [/DEF:create_task:Function]
@router.get("", response_model=List[Task])
# [DEF:list_tasks:Function]
# @PURPOSE: Retrieve a list of tasks with pagination and optional status filter.
# @PARAM: limit (int) - Maximum number of tasks to return.
# @PARAM: offset (int) - Number of tasks to skip.
# @PARAM: status (Optional[TaskStatus]) - Filter by task status.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_manager must be available.
# @POST: Returns a list of tasks.
# @RETURN: List[Task] - List of tasks.
async def list_tasks(
limit: int = 10,
offset: int = 0,
status_filter: Optional[TaskStatus] = Query(None, alias="status"),
task_type: Optional[str] = Query(None, description="Task category: llm_validation, backup, migration"),
plugin_id: Optional[List[str]] = Query(None, description="Filter by plugin_id (repeatable query param)"),
completed_only: bool = Query(False, description="Return only completed tasks (SUCCESS/FAILED)"),
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
"""
Retrieve a list of tasks with pagination and optional status filter.
"""
with belief_scope("list_tasks"):
plugin_filters = list(plugin_id) if plugin_id else []
if task_type:
if task_type not in TASK_TYPE_PLUGIN_MAP:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported task_type '{task_type}'. Allowed: {', '.join(TASK_TYPE_PLUGIN_MAP.keys())}"
)
plugin_filters.extend(TASK_TYPE_PLUGIN_MAP[task_type])
return task_manager.get_tasks(
limit=limit,
offset=offset,
status=status_filter,
plugin_ids=plugin_filters or None,
completed_only=completed_only
)
# [/DEF:list_tasks:Function]
@router.get("/{task_id}", response_model=Task)
# [DEF:get_task:Function]
# @PURPOSE: Retrieve the details of a specific task.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns task details or raises 404.
# @RETURN: Task - The task details.
async def get_task(
task_id: str,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
"""
Retrieve the details of a specific task.
"""
with belief_scope("get_task"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task
# [/DEF:get_task:Function]
@router.get("/{task_id}/logs", response_model=List[LogEntry])
# [DEF:get_task_logs:Function]
# @PURPOSE: Retrieve logs for a specific task with optional filtering.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: level (Optional[str]) - Filter by log level (DEBUG, INFO, WARNING, ERROR).
# @PARAM: source (Optional[str]) - Filter by source component.
# @PARAM: search (Optional[str]) - Text search in message.
# @PARAM: offset (int) - Number of logs to skip.
# @PARAM: limit (int) - Maximum number of logs to return.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns a list of log entries or raises 404.
# @RETURN: List[LogEntry] - List of log entries.
# @TIER: CRITICAL
# @TEST_CONTRACT get_task_logs_api ->
# {
# required_params: {task_id: str},
# optional_params: {level: str, source: str, search: str},
# invariants: ["returns 404 for non-existent task", "applies filters correctly"]
# }
# @TEST_FIXTURE valid_task_logs_request -> {"task_id": "test_1", "level": "INFO"}
# @TEST_EDGE task_not_found -> raises 404
# @TEST_EDGE invalid_limit -> Query(limit=0) returns 422
# @TEST_INVARIANT response_purity -> verifies: [valid_task_logs_request]
# @TEST_CONTRACT: TaskLogQueryInput -> List[LogEntry]
# @TEST_SCENARIO: existing_task_logs_filtered -> Returns filtered logs by level/source/search with pagination.
# @TEST_FIXTURE: valid_task_with_mixed_logs -> backend/tests/fixtures/task_logs/valid_task_with_mixed_logs.json
# @TEST_EDGE: missing_task -> Unknown task_id returns 404 Task not found.
# @TEST_EDGE: invalid_level_type -> Non-string/invalid level query rejected by validation or yields empty result.
# @TEST_EDGE: pagination_bounds -> offset=0 and limit=1000 remain within API bounds and do not overflow.
# @TEST_INVARIANT: logs_only_for_existing_task -> VERIFIED_BY: [existing_task_logs_filtered, missing_task]
async def get_task_logs(
task_id: str,
level: Optional[str] = Query(None, description="Filter by log level (DEBUG, INFO, WARNING, ERROR)"),
source: Optional[str] = Query(None, description="Filter by source component"),
search: Optional[str] = Query(None, description="Text search in message"),
offset: int = Query(0, ge=0, description="Number of logs to skip"),
limit: int = Query(100, ge=1, le=1000, description="Maximum number of logs to return"),
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
"""
Retrieve logs for a specific task with optional filtering.
Supports filtering by level, source, and text search.
"""
with belief_scope("get_task_logs"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
log_filter = LogFilter(
level=level.upper() if level else None,
source=source,
search=search,
offset=offset,
limit=limit
)
return task_manager.get_task_logs(task_id, log_filter)
# [/DEF:get_task_logs:Function]
@router.get("/{task_id}/logs/stats", response_model=LogStats)
# [DEF:get_task_log_stats:Function]
# @PURPOSE: Get statistics about logs for a task (counts by level and source).
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns log statistics or raises 404.
# @RETURN: LogStats - Statistics about task logs.
async def get_task_log_stats(
task_id: str,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
"""
Get statistics about logs for a task (counts by level and source).
"""
with belief_scope("get_task_log_stats"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task_manager.get_task_log_stats(task_id)
# [/DEF:get_task_log_stats:Function]
@router.get("/{task_id}/logs/sources", response_model=List[str])
# [DEF:get_task_log_sources:Function]
# @PURPOSE: Get unique sources for a task's logs.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns list of unique source names or raises 404.
# @RETURN: List[str] - Unique source names.
async def get_task_log_sources(
task_id: str,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
"""
Get unique sources for a task's logs.
"""
with belief_scope("get_task_log_sources"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task_manager.get_task_log_sources(task_id)
# [/DEF:get_task_log_sources:Function]
@router.post("/{task_id}/resolve", response_model=Task)
# [DEF:resolve_task:Function]
# @PURPOSE: Resolve a task that is awaiting mapping.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: request (ResolveTaskRequest) - The resolution parameters.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task must be in AWAITING_MAPPING status.
# @POST: Task is resolved and resumes execution.
# @RETURN: Task - The updated task object.
async def resolve_task(
task_id: str,
request: ResolveTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "WRITE"))
):
"""
Resolve a task that is awaiting mapping.
"""
with belief_scope("resolve_task"):
try:
await task_manager.resolve_task(task_id, request.resolution_params)
return task_manager.get_task(task_id)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# [/DEF:resolve_task:Function]
@router.post("/{task_id}/resume", response_model=Task)
# [DEF:resume_task:Function]
# @PURPOSE: Resume a task that is awaiting input (e.g., passwords).
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: request (ResumeTaskRequest) - The input (passwords).
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task must be in AWAITING_INPUT status.
# @POST: Task resumes execution with provided input.
# @RETURN: Task - The updated task object.
async def resume_task(
task_id: str,
request: ResumeTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "WRITE"))
):
"""
Resume a task that is awaiting input (e.g., passwords).
"""
with belief_scope("resume_task"):
try:
task_manager.resume_task_with_password(task_id, request.passwords)
return task_manager.get_task(task_id)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# [/DEF:resume_task:Function]
@router.delete("", status_code=status.HTTP_204_NO_CONTENT)
# [DEF:clear_tasks:Function]
# @PURPOSE: Clear tasks matching the status filter.
# @PARAM: status (Optional[TaskStatus]) - Filter by task status.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_manager is available.
# @POST: Tasks are removed from memory/persistence.
async def clear_tasks(
status: Optional[TaskStatus] = None,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "WRITE"))
):
"""
Clear tasks matching the status filter. If no filter, clears all non-running tasks.
"""
with belief_scope("clear_tasks", f"status={status}"):
task_manager.clear_tasks(status)
return
# [/DEF:clear_tasks:Function]
# [/DEF:TasksRouter:Module]
# [DEF:TasksRouter:Module]
# @TIER: STANDARD
# @SEMANTICS: api, router, tasks, create, list, get, logs
# @PURPOSE: Defines the FastAPI router for task-related endpoints, allowing clients to create, list, and get the status of tasks.
# @LAYER: UI (API)
# @RELATION: DEPENDS_ON -> [backend.src.core.task_manager.manager.TaskManager]
# @RELATION: DEPENDS_ON -> [backend.src.core.config_manager.ConfigManager]
# @RELATION: DEPENDS_ON -> [backend.src.services.llm_provider.LLMProviderService]
# [SECTION: IMPORTS]
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel
from ...core.logger import belief_scope
from ...core.task_manager import TaskManager, Task, TaskStatus, LogEntry
from ...core.task_manager.models import LogFilter, LogStats
from ...dependencies import get_task_manager, has_permission, get_current_user, get_config_manager
from ...core.config_manager import ConfigManager
from ...services.llm_prompt_templates import (
is_multimodal_model,
normalize_llm_settings,
resolve_bound_provider_id,
)
# [/SECTION]
router = APIRouter()
TASK_TYPE_PLUGIN_MAP = {
"llm_validation": ["llm_dashboard_validation"],
"backup": ["superset-backup"],
"migration": ["superset-migration"],
}
class CreateTaskRequest(BaseModel):
plugin_id: str
params: Dict[str, Any]
class ResolveTaskRequest(BaseModel):
resolution_params: Dict[str, Any]
class ResumeTaskRequest(BaseModel):
passwords: Dict[str, str]
# [DEF:create_task:Function]
# @TIER: STANDARD
# @PURPOSE: Create and start a new task for a given plugin.
# @PARAM: request (CreateTaskRequest) - The request body containing plugin_id and params.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: plugin_id must exist and params must be valid for that plugin.
# @POST: A new task is created and started.
# @RETURN: Task - The created task instance.
@router.post("", response_model=Task, status_code=status.HTTP_201_CREATED)
async def create_task(
request: CreateTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
current_user = Depends(get_current_user),
config_manager: ConfigManager = Depends(get_config_manager),
):
# Dynamic permission check based on plugin_id
has_permission(f"plugin:{request.plugin_id}", "EXECUTE")(current_user)
with belief_scope("create_task"):
try:
# Special handling for LLM tasks to resolve provider config by task binding.
if request.plugin_id in {"llm_dashboard_validation", "llm_documentation"}:
from ...core.database import SessionLocal
from ...services.llm_provider import LLMProviderService
db = SessionLocal()
try:
llm_service = LLMProviderService(db)
provider_id = request.params.get("provider_id")
if not provider_id:
llm_settings = normalize_llm_settings(config_manager.get_config().settings.llm)
binding_key = "dashboard_validation" if request.plugin_id == "llm_dashboard_validation" else "documentation"
provider_id = resolve_bound_provider_id(llm_settings, binding_key)
if provider_id:
request.params["provider_id"] = provider_id
if not provider_id:
providers = llm_service.get_all_providers()
active_provider = next((p for p in providers if p.is_active), None)
if active_provider:
provider_id = active_provider.id
request.params["provider_id"] = provider_id
if provider_id:
db_provider = llm_service.get_provider(provider_id)
if not db_provider:
raise ValueError(f"LLM Provider {provider_id} not found")
if request.plugin_id == "llm_dashboard_validation" and not is_multimodal_model(
db_provider.default_model,
db_provider.provider_type,
):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Selected provider model is not multimodal for dashboard validation",
)
finally:
db.close()
task = await task_manager.create_task(
plugin_id=request.plugin_id,
params=request.params
)
return task
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
# [/DEF:create_task:Function]
# [DEF:list_tasks:Function]
# @TIER: STANDARD
# @PURPOSE: Retrieve a list of tasks with pagination and optional status filter.
# @PARAM: limit (int) - Maximum number of tasks to return.
# @PARAM: offset (int) - Number of tasks to skip.
# @PARAM: status (Optional[TaskStatus]) - Filter by task status.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_manager must be available.
# @POST: Returns a list of tasks.
# @RETURN: List[Task] - List of tasks.
@router.get("", response_model=List[Task])
async def list_tasks(
limit: int = 10,
offset: int = 0,
status_filter: Optional[TaskStatus] = Query(None, alias="status"),
task_type: Optional[str] = Query(None, description="Task category: llm_validation, backup, migration"),
plugin_id: Optional[List[str]] = Query(None, description="Filter by plugin_id (repeatable query param)"),
completed_only: bool = Query(False, description="Return only completed tasks (SUCCESS/FAILED)"),
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
with belief_scope("list_tasks"):
plugin_filters = list(plugin_id) if plugin_id else []
if task_type:
if task_type not in TASK_TYPE_PLUGIN_MAP:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported task_type '{task_type}'. Allowed: {', '.join(TASK_TYPE_PLUGIN_MAP.keys())}"
)
plugin_filters.extend(TASK_TYPE_PLUGIN_MAP[task_type])
return task_manager.get_tasks(
limit=limit,
offset=offset,
status=status_filter,
plugin_ids=plugin_filters or None,
completed_only=completed_only
)
# [/DEF:list_tasks:Function]
# [DEF:get_task:Function]
# @TIER: STANDARD
# @PURPOSE: Retrieve the details of a specific task.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns task details or raises 404.
# @RETURN: Task - The task details.
@router.get("/{task_id}", response_model=Task)
async def get_task(
task_id: str,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
with belief_scope("get_task"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task
# [/DEF:get_task:Function]
# [DEF:get_task_logs:Function]
# @TIER: CRITICAL
# @PURPOSE: Retrieve logs for a specific task with optional filtering.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: level (Optional[str]) - Filter by log level (DEBUG, INFO, WARNING, ERROR).
# @PARAM: source (Optional[str]) - Filter by source component.
# @PARAM: search (Optional[str]) - Text search in message.
# @PARAM: offset (int) - Number of logs to skip.
# @PARAM: limit (int) - Maximum number of logs to return.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns a list of log entries or raises 404.
# @RETURN: List[LogEntry] - List of log entries.
# @TEST_CONTRACT: TaskLogQueryInput -> List[LogEntry]
# @TEST_SCENARIO: existing_task_logs_filtered -> Returns filtered logs by level/source/search with pagination.
# @TEST_FIXTURE: valid_task_with_mixed_logs -> backend/tests/fixtures/task_logs/valid_task_with_mixed_logs.json
# @TEST_EDGE: missing_task -> Unknown task_id returns 404 Task not found.
# @TEST_EDGE: invalid_level_type -> Non-string/invalid level query rejected by validation or yields empty result.
# @TEST_EDGE: pagination_bounds -> offset=0 and limit=1000 remain within API bounds and do not overflow.
# @TEST_INVARIANT: logs_only_for_existing_task -> VERIFIED_BY: [existing_task_logs_filtered, missing_task]
@router.get("/{task_id}/logs", response_model=List[LogEntry])
async def get_task_logs(
task_id: str,
level: Optional[str] = Query(None, description="Filter by log level (DEBUG, INFO, WARNING, ERROR)"),
source: Optional[str] = Query(None, description="Filter by source component"),
search: Optional[str] = Query(None, description="Text search in message"),
offset: int = Query(0, ge=0, description="Number of logs to skip"),
limit: int = Query(100, ge=1, le=1000, description="Maximum number of logs to return"),
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
with belief_scope("get_task_logs"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
log_filter = LogFilter(
level=level.upper() if level else None,
source=source,
search=search,
offset=offset,
limit=limit
)
return task_manager.get_task_logs(task_id, log_filter)
# [/DEF:get_task_logs:Function]
# [DEF:get_task_log_stats:Function]
# @TIER: STANDARD
# @PURPOSE: Get statistics about logs for a task (counts by level and source).
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns log statistics or raises 404.
# @RETURN: LogStats - Statistics about task logs.
@router.get("/{task_id}/logs/stats", response_model=LogStats)
async def get_task_log_stats(
task_id: str,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
with belief_scope("get_task_log_stats"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task_manager.get_task_log_stats(task_id)
# [/DEF:get_task_log_stats:Function]
# [DEF:get_task_log_sources:Function]
# @TIER: STANDARD
# @PURPOSE: Get unique sources for a task's logs.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_id must exist.
# @POST: Returns list of unique source names or raises 404.
# @RETURN: List[str] - Unique source names.
@router.get("/{task_id}/logs/sources", response_model=List[str])
async def get_task_log_sources(
task_id: str,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
with belief_scope("get_task_log_sources"):
task = task_manager.get_task(task_id)
if not task:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return task_manager.get_task_log_sources(task_id)
# [/DEF:get_task_log_sources:Function]
# [DEF:resolve_task:Function]
# @TIER: STANDARD
# @PURPOSE: Resolve a task that is awaiting mapping.
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: request (ResolveTaskRequest) - The resolution parameters.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task must be in AWAITING_MAPPING status.
# @POST: Task is resolved and resumes execution.
# @RETURN: Task - The updated task object.
@router.post("/{task_id}/resolve", response_model=Task)
async def resolve_task(
task_id: str,
request: ResolveTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "WRITE"))
):
with belief_scope("resolve_task"):
try:
await task_manager.resolve_task(task_id, request.resolution_params)
return task_manager.get_task(task_id)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# [/DEF:resolve_task:Function]
# [DEF:resume_task:Function]
# @TIER: STANDARD
# @PURPOSE: Resume a task that is awaiting input (e.g., passwords).
# @PARAM: task_id (str) - The unique identifier of the task.
# @PARAM: request (ResumeTaskRequest) - The input (passwords).
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task must be in AWAITING_INPUT status.
# @POST: Task resumes execution with provided input.
# @RETURN: Task - The updated task object.
@router.post("/{task_id}/resume", response_model=Task)
async def resume_task(
task_id: str,
request: ResumeTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "WRITE"))
):
with belief_scope("resume_task"):
try:
task_manager.resume_task_with_password(task_id, request.passwords)
return task_manager.get_task(task_id)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
# [/DEF:resume_task:Function]
# [DEF:clear_tasks:Function]
# @TIER: STANDARD
# @PURPOSE: Clear tasks matching the status filter.
# @PARAM: status (Optional[TaskStatus]) - Filter by task status.
# @PARAM: task_manager (TaskManager) - The task manager instance.
# @PRE: task_manager is available.
# @POST: Tasks are removed from memory/persistence.
@router.delete("", status_code=status.HTTP_204_NO_CONTENT)
async def clear_tasks(
status: Optional[TaskStatus] = None,
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "WRITE"))
):
with belief_scope("clear_tasks", f"status={status}"):
task_manager.clear_tasks(status)
return
# [/DEF:clear_tasks:Function]
# [/DEF:TasksRouter:Module]