diff --git a/backend/src/core/async_superset_client.py b/backend/src/core/async_superset_client.py index 64e193c4..39346b25 100644 --- a/backend/src/core/async_superset_client.py +++ b/backend/src/core/async_superset_client.py @@ -28,12 +28,16 @@ from .utils.async_network import AsyncAPIClient # [DEF:AsyncSupersetClient:Class] # @TIER: STANDARD # @PURPOSE: Async sibling of SupersetClient for dashboard read paths. +# @RELATION: [INHERITS] ->[backend.src.core.superset_client.SupersetClient] +# @RELATION: [DEPENDS_ON] ->[backend.src.core.utils.async_network.AsyncAPIClient] +# @RELATION: [CALLS] ->[backend.src.core.utils.async_network.AsyncAPIClient.request] class AsyncSupersetClient(SupersetClient): # [DEF:__init__:Function] # @TIER: STANDARD # @PURPOSE: Initialize async Superset client with AsyncAPIClient transport. - # @PRE: env is valid. + # @PRE: env is valid Environment instance. # @POST: Client uses async network transport and inherited projection helpers. + # @DATA_CONTRACT: Input[Environment] -> self.network[AsyncAPIClient] def __init__(self, env: Environment): self.env = env auth_payload = { @@ -54,6 +58,7 @@ class AsyncSupersetClient(SupersetClient): # @TIER: STANDARD # @PURPOSE: Close async transport resources. # @POST: Underlying AsyncAPIClient is closed. + # @SIDE_EFFECT: Closes network sockets. async def aclose(self) -> None: await self.network.aclose() # [/DEF:aclose:Function] @@ -62,6 +67,7 @@ class AsyncSupersetClient(SupersetClient): # @TIER: STANDARD # @PURPOSE: Fetch one dashboards page asynchronously. # @POST: Returns total count and page result list. + # @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]] async def get_dashboards_page_async(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("AsyncSupersetClient.get_dashboards_page_async"): validated_query = self._validate_query_params(query or {}) @@ -96,6 +102,7 @@ class AsyncSupersetClient(SupersetClient): # @TIER: STANDARD # @PURPOSE: Fetch one dashboard payload asynchronously. # @POST: Returns raw dashboard payload from Superset API. + # @DATA_CONTRACT: Input[dashboard_id: int] -> Output[Dict] async def get_dashboard_async(self, dashboard_id: int) -> Dict: with belief_scope("AsyncSupersetClient.get_dashboard_async", f"id={dashboard_id}"): response = await self.network.request(method="GET", endpoint=f"/dashboard/{dashboard_id}") @@ -106,6 +113,7 @@ class AsyncSupersetClient(SupersetClient): # @TIER: STANDARD # @PURPOSE: Fetch one chart payload asynchronously. # @POST: Returns raw chart payload from Superset API. + # @DATA_CONTRACT: Input[chart_id: int] -> Output[Dict] async def get_chart_async(self, chart_id: int) -> Dict: with belief_scope("AsyncSupersetClient.get_chart_async", f"id={chart_id}"): response = await self.network.request(method="GET", endpoint=f"/chart/{chart_id}") @@ -116,6 +124,9 @@ class AsyncSupersetClient(SupersetClient): # @TIER: STANDARD # @PURPOSE: Fetch dashboard detail asynchronously with concurrent charts/datasets requests. # @POST: Returns dashboard detail payload for overview page. + # @DATA_CONTRACT: Input[dashboard_id: int] -> Output[Dict] + # @RELATION: [CALLS] ->[self.get_dashboard_async] + # @RELATION: [CALLS] ->[self.get_chart_async] async def get_dashboard_detail_async(self, dashboard_id: int) -> Dict: with belief_scope("AsyncSupersetClient.get_dashboard_detail_async", f"id={dashboard_id}"): dashboard_response = await self.get_dashboard_async(dashboard_id) @@ -280,7 +291,7 @@ class AsyncSupersetClient(SupersetClient): db_name = db_payload.get("database_name") if isinstance(db_payload, dict) else None table_name = dataset_data.get("table_name") or dataset_data.get("datasource_name") or dataset_data.get("name") or f"Dataset {dataset_id}" schema = dataset_data.get("schema") - fq_name = f"{schema}.{table_name}" if schema else table_name + fq_name = f" {schema}.{table_name}" if schema else table_name datasets.append({ "id": int(dataset_id), "table_name": table_name, diff --git a/backend/src/core/database.py b/backend/src/core/database.py index 747025fd..c9534cdf 100644 --- a/backend/src/core/database.py +++ b/backend/src/core/database.py @@ -364,8 +364,9 @@ def get_tasks_db(): # @TIER: STANDARD # @PURPOSE: Dependency for getting an authentication database session. # @PRE: AuthSessionLocal is initialized. -# @POST: Session is closed after use. -# @RETURN: Generator[Session, None, None] +# @POST: Session is closed after use. +# @DATA_CONTRACT: None -> Output[sqlalchemy.orm.Session] +# @RETURN: Generator[Session, None, None] def get_auth_db(): with belief_scope("get_auth_db"): db = AuthSessionLocal() diff --git a/backend/src/core/superset_client.py b/backend/src/core/superset_client.py index 8f0335e2..fae7ca55 100644 --- a/backend/src/core/superset_client.py +++ b/backend/src/core/superset_client.py @@ -24,13 +24,17 @@ from .config_models import Environment # [/SECTION] # [DEF:SupersetClient:Class] +# @TIER: STANDARD # @PURPOSE: Класс-обёртка над Superset REST API, предоставляющий методы для работы с дашбордами и датасетами. +# @RELATION: [DEPENDS_ON] ->[backend.src.core.utils.network.APIClient] +# @RELATION: [DEPENDS_ON] ->[backend.src.core.config_models.Environment] class SupersetClient: # [DEF:__init__:Function] + # @TIER: STANDARD # @PURPOSE: Инициализирует клиент, проверяет конфигурацию и создает сетевой клиент. # @PRE: `env` должен быть валидным объектом Environment. # @POST: Атрибуты `env` и `network` созданы и готовы к работе. - # @PARAM: env (Environment) - Конфигурация окружения. + # @DATA_CONTRACT: Input[Environment] -> self.network[APIClient] def __init__(self, env: Environment): with belief_scope("__init__"): app_logger.info("[SupersetClient.__init__][Enter] Initializing SupersetClient for env %s.", env.name) @@ -55,10 +59,12 @@ class SupersetClient: # [/DEF:__init__:Function] # [DEF:authenticate:Function] + # @TIER: STANDARD # @PURPOSE: Authenticates the client using the configured credentials. # @PRE: self.network must be initialized with valid auth configuration. # @POST: Client is authenticated and tokens are stored. - # @RETURN: Dict[str, str] - Authentication tokens. + # @DATA_CONTRACT: None -> Output[Dict[str, str]] + # @RELATION: [CALLS] ->[self.network.authenticate] def authenticate(self) -> Dict[str, str]: with belief_scope("SupersetClient.authenticate"): return self.network.authenticate() @@ -66,6 +72,7 @@ class SupersetClient: @property # [DEF:headers:Function] + # @TIER: TRIVIAL # @PURPOSE: Возвращает базовые HTTP-заголовки, используемые сетевым клиентом. # @PRE: APIClient is initialized and authenticated. # @POST: Returns a dictionary of HTTP headers. @@ -77,11 +84,12 @@ class SupersetClient: # [SECTION: DASHBOARD OPERATIONS] # [DEF:get_dashboards:Function] + # @TIER: STANDARD # @PURPOSE: Получает полный список дашбордов, автоматически обрабатывая пагинацию. - # @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса для API. # @PRE: Client is authenticated. # @POST: Returns a tuple with total count and list of dashboards. - # @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список дашбордов). + # @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]] + # @RELATION: [CALLS] ->[self._fetch_all_pages] def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_dashboards"): app_logger.info("[get_dashboards][Enter] Fetching dashboards.") @@ -110,11 +118,12 @@ class SupersetClient: # [/DEF:get_dashboards:Function] # [DEF:get_dashboards_page:Function] + # @TIER: STANDARD # @PURPOSE: Fetches a single dashboards page from Superset without iterating all pages. - # @PARAM: query (Optional[Dict]) - Query with page/page_size and optional columns. # @PRE: Client is authenticated. # @POST: Returns total count and one page of dashboards. - # @RETURN: Tuple[int, List[Dict]] + # @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]] + # @RELATION: [CALLS] ->[self.network.request] def get_dashboards_page(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_dashboards_page"): validated_query = self._validate_query_params(query or {}) @@ -146,10 +155,12 @@ class SupersetClient: # [/DEF:get_dashboards_page:Function] # [DEF:get_dashboards_summary:Function] + # @TIER: STANDARD # @PURPOSE: Fetches dashboard metadata optimized for the grid. # @PRE: Client is authenticated. # @POST: Returns a list of dashboard metadata summaries. - # @RETURN: List[Dict] + # @DATA_CONTRACT: None -> Output[List[Dict]] + # @RELATION: [CALLS] ->[self.get_dashboards] def get_dashboards_summary(self, require_slug: bool = False) -> List[Dict]: with belief_scope("SupersetClient.get_dashboards_summary"): # Rely on list endpoint default projection to stay compatible @@ -229,12 +240,12 @@ class SupersetClient: # [/DEF:get_dashboards_summary:Function] # [DEF:get_dashboards_summary_page:Function] + # @TIER: STANDARD # @PURPOSE: Fetches one page of dashboard metadata optimized for the grid. - # @PARAM: page (int) - 1-based page number from API route contract. - # @PARAM: page_size (int) - Number of items per page. # @PRE: page >= 1 and page_size > 0. # @POST: Returns mapped summaries and total dashboard count. - # @RETURN: Tuple[int, List[Dict]] + # @DATA_CONTRACT: Input[page: int, page_size: int] -> Output[Tuple[int, List[Dict]]] + # @RELATION: [CALLS] ->[self.get_dashboards_page] def get_dashboards_summary_page( self, page: int, @@ -302,10 +313,11 @@ class SupersetClient: # [/DEF:get_dashboards_summary_page:Function] # [DEF:_extract_owner_labels:Function] + # @TIER: TRIVIAL # @PURPOSE: Normalize dashboard owners payload to stable display labels. # @PRE: owners payload can be scalar, object or list. # @POST: Returns deduplicated non-empty owner labels preserving order. - # @RETURN: List[str] + # @DATA_CONTRACT: Input[Any] -> Output[List[str]] def _extract_owner_labels(self, owners_payload: Any) -> List[str]: if owners_payload is None: return [] @@ -329,10 +341,11 @@ class SupersetClient: # [/DEF:_extract_owner_labels:Function] # [DEF:_extract_user_display:Function] + # @TIER: TRIVIAL # @PURPOSE: Normalize user payload to a stable display name. # @PRE: user payload can be string, dict or None. # @POST: Returns compact non-empty display value or None. - # @RETURN: Optional[str] + # @DATA_CONTRACT: Input[Optional[str], Optional[Dict]] -> Output[Optional[str]] def _extract_user_display(self, preferred_value: Optional[str], user_payload: Optional[Dict]) -> Optional[str]: preferred = self._sanitize_user_text(preferred_value) if preferred: @@ -357,10 +370,10 @@ class SupersetClient: # [/DEF:_extract_user_display:Function] # [DEF:_sanitize_user_text:Function] + # @TIER: TRIVIAL # @PURPOSE: Convert scalar value to non-empty user-facing text. # @PRE: value can be any scalar type. # @POST: Returns trimmed string or None. - # @RETURN: Optional[str] def _sanitize_user_text(self, value: Optional[Union[str, int]]) -> Optional[str]: if value is None: return None @@ -371,10 +384,12 @@ class SupersetClient: # [/DEF:_sanitize_user_text:Function] # [DEF:get_dashboard:Function] + # @TIER: STANDARD # @PURPOSE: Fetches a single dashboard by ID. # @PRE: Client is authenticated and dashboard_id exists. # @POST: Returns dashboard payload from Superset API. - # @RETURN: Dict + # @DATA_CONTRACT: Input[dashboard_id: int] -> Output[Dict] + # @RELATION: [CALLS] ->[self.network.request] def get_dashboard(self, dashboard_id: int) -> Dict: with belief_scope("SupersetClient.get_dashboard", f"id={dashboard_id}"): response = self.network.request(method="GET", endpoint=f"/dashboard/{dashboard_id}") @@ -382,10 +397,12 @@ class SupersetClient: # [/DEF:get_dashboard:Function] # [DEF:get_chart:Function] + # @TIER: STANDARD # @PURPOSE: Fetches a single chart by ID. # @PRE: Client is authenticated and chart_id exists. # @POST: Returns chart payload from Superset API. - # @RETURN: Dict + # @DATA_CONTRACT: Input[chart_id: int] -> Output[Dict] + # @RELATION: [CALLS] ->[self.network.request] def get_chart(self, chart_id: int) -> Dict: with belief_scope("SupersetClient.get_chart", f"id={chart_id}"): response = self.network.request(method="GET", endpoint=f"/chart/{chart_id}") @@ -393,10 +410,13 @@ class SupersetClient: # [/DEF:get_chart:Function] # [DEF:get_dashboard_detail:Function] + # @TIER: STANDARD # @PURPOSE: Fetches detailed dashboard information including related charts and datasets. # @PRE: Client is authenticated and dashboard_id exists. # @POST: Returns dashboard metadata with charts and datasets lists. - # @RETURN: Dict + # @DATA_CONTRACT: Input[dashboard_id: int] -> Output[Dict] + # @RELATION: [CALLS] ->[self.get_dashboard] + # @RELATION: [CALLS] ->[self.get_chart] def get_dashboard_detail(self, dashboard_id: int) -> Dict: with belief_scope("SupersetClient.get_dashboard_detail", f"id={dashboard_id}"): dashboard_response = self.get_dashboard(dashboard_id) @@ -585,11 +605,12 @@ class SupersetClient: # [/DEF:get_dashboard_detail:Function] # [DEF:get_charts:Function] + # @TIER: STANDARD # @PURPOSE: Fetches all charts with pagination support. - # @PARAM: query (Optional[Dict]) - Optional query params/columns/filters. # @PRE: Client is authenticated. # @POST: Returns total count and charts list. - # @RETURN: Tuple[int, List[Dict]] + # @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]] + # @RELATION: [CALLS] ->[self._fetch_all_pages] def get_charts(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_charts"): validated_query = self._validate_query_params(query or {}) @@ -636,11 +657,13 @@ class SupersetClient: # [/DEF:_extract_chart_ids_from_layout:Function] # [DEF:export_dashboard:Function] + # @TIER: STANDARD # @PURPOSE: Экспортирует дашборд в виде ZIP-архива. - # @PARAM: dashboard_id (int) - ID дашборда для экспорта. # @PRE: dashboard_id must exist in Superset. # @POST: Returns ZIP content and filename. - # @RETURN: Tuple[bytes, str] - Бинарное содержимое ZIP-архива и имя файла. + # @DATA_CONTRACT: Input[dashboard_id: int] -> Output[Tuple[bytes, str]] + # @SIDE_EFFECT: Performs network I/O to download archive. + # @RELATION: [CALLS] ->[self.network.request] def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: with belief_scope("export_dashboard"): app_logger.info("[export_dashboard][Enter] Exporting dashboard %s.", dashboard_id) @@ -659,13 +682,14 @@ class SupersetClient: # [/DEF:export_dashboard:Function] # [DEF:import_dashboard:Function] + # @TIER: STANDARD # @PURPOSE: Импортирует дашборд из ZIP-файла. - # @PARAM: file_name (Union[str, Path]) - Путь к ZIP-архиву. - # @PARAM: dash_id (Optional[int]) - ID дашборда для удаления при сбое. - # @PARAM: dash_slug (Optional[str]) - Slug дашборда для поиска ID. # @PRE: file_name must be a valid ZIP dashboard export. # @POST: Dashboard is imported or re-imported after deletion. - # @RETURN: Dict - Ответ API в случае успеха. + # @DATA_CONTRACT: Input[file_name: Union[str, Path]] -> Output[Dict] + # @SIDE_EFFECT: Performs network I/O to upload archive. + # @RELATION: [CALLS] ->[self._do_import] + # @RELATION: [CALLS] ->[self.delete_dashboard] def import_dashboard(self, file_name: Union[str, Path], dash_id: Optional[int] = None, dash_slug: Optional[str] = None) -> Dict: with belief_scope("import_dashboard"): if file_name is None: @@ -690,10 +714,12 @@ class SupersetClient: # [/DEF:import_dashboard:Function] # [DEF:delete_dashboard:Function] + # @TIER: STANDARD # @PURPOSE: Удаляет дашборд по его ID или slug. - # @PARAM: dashboard_id (Union[int, str]) - ID или slug дашборда. # @PRE: dashboard_id must exist. # @POST: Dashboard is removed from Superset. + # @SIDE_EFFECT: Deletes resource from upstream Superset environment. + # @RELATION: [CALLS] ->[self.network.request] def delete_dashboard(self, dashboard_id: Union[int, str]) -> None: with belief_scope("delete_dashboard"): app_logger.info("[delete_dashboard][Enter] Deleting dashboard %s.", dashboard_id) @@ -710,11 +736,12 @@ class SupersetClient: # [SECTION: DATASET OPERATIONS] # [DEF:get_datasets:Function] + # @TIER: STANDARD # @PURPOSE: Получает полный список датасетов, автоматически обрабатывая пагинацию. - # @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса. # @PRE: Client is authenticated. # @POST: Returns total count and list of datasets. - # @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список датасетов). + # @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]] + # @RELATION: [CALLS] ->[self._fetch_all_pages] def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_datasets"): app_logger.info("[get_datasets][Enter] Fetching datasets.") @@ -866,11 +893,12 @@ class SupersetClient: # [/DEF:get_dataset_detail:Function] # [DEF:get_dataset:Function] + # @TIER: STANDARD # @PURPOSE: Получает информацию о конкретном датасете по его ID. - # @PARAM: dataset_id (int) - ID датасета. # @PRE: dataset_id must exist. # @POST: Returns dataset details. - # @RETURN: Dict - Информация о датасете. + # @DATA_CONTRACT: Input[dataset_id: int] -> Output[Dict] + # @RELATION: [CALLS] ->[self.network.request] def get_dataset(self, dataset_id: int) -> Dict: with belief_scope("SupersetClient.get_dataset", f"id={dataset_id}"): app_logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id) @@ -881,12 +909,13 @@ class SupersetClient: # [/DEF:get_dataset:Function] # [DEF:update_dataset:Function] + # @TIER: STANDARD # @PURPOSE: Обновляет данные датасета по его ID. - # @PARAM: dataset_id (int) - ID датасета. - # @PARAM: data (Dict) - Данные для обновления. # @PRE: dataset_id must exist. # @POST: Dataset is updated in Superset. - # @RETURN: Dict - Ответ API. + # @DATA_CONTRACT: Input[dataset_id: int, data: Dict] -> Output[Dict] + # @SIDE_EFFECT: Modifies resource in upstream Superset environment. + # @RELATION: [CALLS] ->[self.network.request] def update_dataset(self, dataset_id: int, data: Dict) -> Dict: with belief_scope("SupersetClient.update_dataset", f"id={dataset_id}"): app_logger.info("[update_dataset][Enter] Updating dataset %s.", dataset_id) @@ -906,11 +935,12 @@ class SupersetClient: # [SECTION: DATABASE OPERATIONS] # [DEF:get_databases:Function] + # @TIER: STANDARD # @PURPOSE: Получает полный список баз данных. - # @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса. # @PRE: Client is authenticated. # @POST: Returns total count and list of databases. - # @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список баз данных). + # @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]] + # @RELATION: [CALLS] ->[self._fetch_all_pages] def get_databases(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_databases"): app_logger.info("[get_databases][Enter] Fetching databases.") @@ -928,11 +958,12 @@ class SupersetClient: # [/DEF:get_databases:Function] # [DEF:get_database:Function] + # @TIER: STANDARD # @PURPOSE: Получает информацию о конкретной базе данных по её ID. - # @PARAM: database_id (int) - ID базы данных. # @PRE: database_id must exist. # @POST: Returns database details. - # @RETURN: Dict - Информация о базе данных. + # @DATA_CONTRACT: Input[database_id: int] -> Output[Dict] + # @RELATION: [CALLS] ->[self.network.request] def get_database(self, database_id: int) -> Dict: with belief_scope("get_database"): app_logger.info("[get_database][Enter] Fetching database %s.", database_id) @@ -943,10 +974,12 @@ class SupersetClient: # [/DEF:get_database:Function] # [DEF:get_databases_summary:Function] + # @TIER: STANDARD # @PURPOSE: Fetch a summary of databases including uuid, name, and engine. # @PRE: Client is authenticated. # @POST: Returns list of database summaries. - # @RETURN: List[Dict] - Summary of databases. + # @DATA_CONTRACT: None -> Output[List[Dict]] + # @RELATION: [CALLS] ->[self.get_databases] def get_databases_summary(self) -> List[Dict]: with belief_scope("SupersetClient.get_databases_summary"): query = { @@ -962,11 +995,12 @@ class SupersetClient: # [/DEF:get_databases_summary:Function] # [DEF:get_database_by_uuid:Function] + # @TIER: STANDARD # @PURPOSE: Find a database by its UUID. - # @PARAM: db_uuid (str) - The UUID of the database. # @PRE: db_uuid must be a valid UUID string. # @POST: Returns database info or None. - # @RETURN: Optional[Dict] - Database info if found, else None. + # @DATA_CONTRACT: Input[db_uuid: str] -> Output[Optional[Dict]] + # @RELATION: [CALLS] ->[self.get_databases] def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]: with belief_scope("SupersetClient.get_database_by_uuid", f"uuid={db_uuid}"): query = { @@ -981,9 +1015,11 @@ class SupersetClient: # [SECTION: HELPERS] # [DEF:_resolve_target_id_for_delete:Function] + # @TIER: TRIVIAL # @PURPOSE: Resolves a dashboard ID from either an ID or a slug. # @PRE: Either dash_id or dash_slug should be provided. # @POST: Returns the resolved ID or None. + # @RELATION: [CALLS] ->[self.get_dashboards] def _resolve_target_id_for_delete(self, dash_id: Optional[int], dash_slug: Optional[str]) -> Optional[int]: with belief_scope("_resolve_target_id_for_delete"): if dash_id is not None: @@ -1002,9 +1038,11 @@ class SupersetClient: # [/DEF:_resolve_target_id_for_delete:Function] # [DEF:_do_import:Function] + # @TIER: TRIVIAL # @PURPOSE: Performs the actual multipart upload for import. # @PRE: file_name must be a path to an existing ZIP file. # @POST: Returns the API response from the upload. + # @RELATION: [CALLS] ->[self.network.upload_file] def _do_import(self, file_name: Union[str, Path]) -> Dict: with belief_scope("_do_import"): app_logger.debug(f"[_do_import][State] Uploading file: {file_name}") @@ -1022,6 +1060,7 @@ class SupersetClient: # [/DEF:_do_import:Function] # [DEF:_validate_export_response:Function] + # @TIER: TRIVIAL # @PURPOSE: Validates that the export response is a non-empty ZIP archive. # @PRE: response must be a valid requests.Response object. # @POST: Raises SupersetAPIError if validation fails. @@ -1035,6 +1074,7 @@ class SupersetClient: # [/DEF:_validate_export_response:Function] # [DEF:_resolve_export_filename:Function] + # @TIER: TRIVIAL # @PURPOSE: Determines the filename for an exported dashboard. # @PRE: response must contain Content-Disposition header or dashboard_id must be provided. # @POST: Returns a sanitized filename string. @@ -1050,6 +1090,7 @@ class SupersetClient: # [/DEF:_resolve_export_filename:Function] # [DEF:_validate_query_params:Function] + # @TIER: TRIVIAL # @PURPOSE: Ensures query parameters have default page and page_size. # @PRE: query can be None or a dictionary. # @POST: Returns a dictionary with at least page and page_size. @@ -1062,9 +1103,11 @@ class SupersetClient: # [/DEF:_validate_query_params:Function] # [DEF:_fetch_total_object_count:Function] + # @TIER: TRIVIAL # @PURPOSE: Fetches the total number of items for a given endpoint. # @PRE: endpoint must be a valid Superset API path. # @POST: Returns the total count as an integer. + # @RELATION: [CALLS] ->[self.network.fetch_paginated_count] def _fetch_total_object_count(self, endpoint: str) -> int: with belief_scope("_fetch_total_object_count"): return self.network.fetch_paginated_count( diff --git a/backend/src/core/task_manager/persistence.py b/backend/src/core/task_manager/persistence.py index a11a8537..99707708 100644 --- a/backend/src/core/task_manager/persistence.py +++ b/backend/src/core/task_manager/persistence.py @@ -29,6 +29,10 @@ from ..logger import logger, belief_scope # @TIER: CRITICAL # @SEMANTICS: persistence, service, database, sqlalchemy # @PURPOSE: Provides methods to save and load tasks from the tasks.db database using SQLAlchemy. +# @RELATION: [DEPENDS_ON] ->[backend.src.core.database.TasksSessionLocal] +# @RELATION: [DEPENDS_ON] ->[backend.src.models.task.TaskRecord] +# @RELATION: [DEPENDS_ON] ->[backend.src.models.mapping.Environment] +# @RELATION: [USED_BY] ->[backend.src.core.task_manager.manager.TaskManager] # @INVARIANT: Persistence must handle potentially missing task fields natively. # # @TEST_CONTRACT: TaskPersistenceService -> @@ -46,6 +50,7 @@ from ..logger import logger, belief_scope # @TEST_INVARIANT: accurate_round_trip -> verifies: [valid_task_persistence, load_corrupt_json_params] class TaskPersistenceService: # [DEF:_json_load_if_needed:Function] + # @TIER: TRIVIAL # @PURPOSE: Safely load JSON strings from DB if necessary # @PRE: value is an arbitrary database value # @POST: Returns parsed JSON object, list, string, or primitive @@ -68,6 +73,7 @@ class TaskPersistenceService: # [/DEF:_json_load_if_needed:Function] # [DEF:_parse_datetime:Function] + # @TIER: TRIVIAL # @PURPOSE: Safely parse a datetime string from the database # @PRE: value is an ISO string or datetime object # @POST: Returns datetime object or None @@ -89,6 +95,7 @@ class TaskPersistenceService: # @PURPOSE: Resolve environment id into existing environments.id value to satisfy FK constraints. # @PRE: Session is active # @POST: Returns existing environments.id or None when unresolved. + # @DATA_CONTRACT: Input[env_id: Optional[str]] -> Output[Optional[str]] @staticmethod def _resolve_environment_id(session: Session, env_id: Optional[str]) -> Optional[str]: with belief_scope("_resolve_environment_id"): @@ -140,6 +147,8 @@ class TaskPersistenceService: # @POST: Task record created or updated in database. # @PARAM: task (Task) - The task object to persist. # @SIDE_EFFECT: Writes to task_records table in tasks.db + # @DATA_CONTRACT: Input[Task] -> Model[TaskRecord] + # @RELATION: [CALLS] ->[self._resolve_environment_id] def persist_task(self, task: Task) -> None: with belief_scope("TaskPersistenceService.persist_task", f"task_id={task.id}"): session: Session = TasksSessionLocal() @@ -202,6 +211,7 @@ class TaskPersistenceService: # @PRE: isinstance(tasks, list) # @POST: All tasks in list are persisted. # @PARAM: tasks (List[Task]) - The list of tasks to persist. + # @RELATION: [CALLS] ->[self.persist_task] def persist_tasks(self, tasks: List[Task]) -> None: with belief_scope("TaskPersistenceService.persist_tasks"): for task in tasks: @@ -216,6 +226,9 @@ class TaskPersistenceService: # @PARAM: limit (int) - Max tasks to load. # @PARAM: status (Optional[TaskStatus]) - Filter by status. # @RETURN: List[Task] - The loaded tasks. + # @DATA_CONTRACT: Model[TaskRecord] -> Output[List[Task]] + # @RELATION: [CALLS] ->[self._json_load_if_needed] + # @RELATION: [CALLS] ->[self._parse_datetime] def load_tasks(self, limit: int = 100, status: Optional[TaskStatus] = None) -> List[Task]: with belief_scope("TaskPersistenceService.load_tasks"): session: Session = TasksSessionLocal() @@ -269,6 +282,7 @@ class TaskPersistenceService: # @PRE: task_ids is a list of strings. # @POST: Specified task records deleted from database. # @PARAM: task_ids (List[str]) - List of task IDs to delete. + # @SIDE_EFFECT: Deletes rows from task_records table. def delete_tasks(self, task_ids: List[str]) -> None: if not task_ids: return @@ -290,7 +304,9 @@ class TaskPersistenceService: # @TIER: CRITICAL # @SEMANTICS: persistence, service, database, log, sqlalchemy # @PURPOSE: Provides methods to save and query task logs from the task_logs table. -# @RELATION: DEPENDS_ON -> TaskLogRecord +# @RELATION: [DEPENDS_ON] ->[backend.src.models.task.TaskLogRecord] +# @RELATION: [DEPENDS_ON] ->[backend.src.core.database.TasksSessionLocal] +# @RELATION: [USED_BY] ->[backend.src.core.task_manager.manager.TaskManager] # @INVARIANT: Log entries are batch-inserted for performance. # # @TEST_CONTRACT: TaskLogPersistenceService -> @@ -328,6 +344,7 @@ class TaskLogPersistenceService: # @PARAM: task_id (str) - The task ID. # @PARAM: logs (List[LogEntry]) - Log entries to insert. # @SIDE_EFFECT: Writes to task_logs table. + # @DATA_CONTRACT: Input[List[LogEntry]] -> Model[TaskLogRecord] def add_logs(self, task_id: str, logs: List[LogEntry]) -> None: if not logs: return @@ -360,6 +377,7 @@ class TaskLogPersistenceService: # @PARAM: task_id (str) - The task ID. # @PARAM: log_filter (LogFilter) - Filter parameters. # @RETURN: List[TaskLog] - Filtered log entries. + # @DATA_CONTRACT: Model[TaskLogRecord] -> Output[List[TaskLog]] def get_logs(self, task_id: str, log_filter: LogFilter) -> List[TaskLog]: with belief_scope("TaskLogPersistenceService.get_logs", f"task_id={task_id}"): session: Session = TasksSessionLocal() @@ -412,6 +430,7 @@ class TaskLogPersistenceService: # @POST: Returns LogStats with counts by level and source. # @PARAM: task_id (str) - The task ID. # @RETURN: LogStats - Statistics about task logs. + # @DATA_CONTRACT: Model[TaskLogRecord] -> Output[LogStats] def get_log_stats(self, task_id: str) -> LogStats: with belief_scope("TaskLogPersistenceService.get_log_stats", f"task_id={task_id}"): session: Session = TasksSessionLocal() @@ -458,6 +477,7 @@ class TaskLogPersistenceService: # @POST: Returns list of unique source strings. # @PARAM: task_id (str) - The task ID. # @RETURN: List[str] - Unique source names. + # @DATA_CONTRACT: Model[TaskLogRecord] -> Output[List[str]] def get_sources(self, task_id: str) -> List[str]: with belief_scope("TaskLogPersistenceService.get_sources", f"task_id={task_id}"): session: Session = TasksSessionLocal() @@ -499,6 +519,7 @@ class TaskLogPersistenceService: # @PRE: task_ids is a list of task IDs. # @POST: All logs for the tasks are deleted. # @PARAM: task_ids (List[str]) - List of task IDs. + # @SIDE_EFFECT: Deletes rows from task_logs table. def delete_logs_for_tasks(self, task_ids: List[str]) -> None: if not task_ids: return diff --git a/backend/src/core/utils/async_network.py b/backend/src/core/utils/async_network.py index d1ac5cb9..742d96a4 100644 --- a/backend/src/core/utils/async_network.py +++ b/backend/src/core/utils/async_network.py @@ -26,15 +26,21 @@ from .network import ( # [DEF:AsyncAPIClient:Class] +# @TIER: STANDARD # @PURPOSE: Async Superset API client backed by httpx.AsyncClient with shared auth cache. +# @RELATION: [DEPENDS_ON] ->[backend.src.core.utils.network.SupersetAuthCache] +# @RELATION: [CALLS] ->[backend.src.core.utils.network.SupersetAuthCache.get] +# @RELATION: [CALLS] ->[backend.src.core.utils.network.SupersetAuthCache.set] class AsyncAPIClient: DEFAULT_TIMEOUT = 30 _auth_locks: Dict[tuple[str, str, bool], asyncio.Lock] = {} # [DEF:__init__:Function] + # @TIER: STANDARD # @PURPOSE: Initialize async API client for one environment. # @PRE: config contains base_url and auth payload. # @POST: Client is ready for async request/authentication flow. + # @DATA_CONTRACT: Input[config: Dict[str, Any]] -> self._auth_cache_key[str] def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT): self.base_url: str = self._normalize_base_url(config.get("base_url", "")) self.api_base_url: str = f"{self.base_url}/api/v1" @@ -56,6 +62,7 @@ class AsyncAPIClient: # [/DEF:__init__:Function] # [DEF:_normalize_base_url:Function] + # @TIER: TRIVIAL # @PURPOSE: Normalize base URL for Superset API root construction. # @POST: Returns canonical base URL without trailing slash and duplicate /api/v1 suffix. def _normalize_base_url(self, raw_url: str) -> str: @@ -66,6 +73,7 @@ class AsyncAPIClient: # [/DEF:_normalize_base_url:Function] # [DEF:_build_api_url:Function] + # @TIER: TRIVIAL # @PURPOSE: Build full API URL from relative Superset endpoint. # @POST: Returns absolute URL for upstream request. def _build_api_url(self, endpoint: str) -> str: @@ -80,6 +88,7 @@ class AsyncAPIClient: # [/DEF:_build_api_url:Function] # [DEF:_get_auth_lock:Function] + # @TIER: TRIVIAL # @PURPOSE: Return per-cache-key async lock to serialize fresh login attempts. # @POST: Returns stable asyncio.Lock instance. @classmethod @@ -93,8 +102,11 @@ class AsyncAPIClient: # [/DEF:_get_auth_lock:Function] # [DEF:authenticate:Function] + # @TIER: STANDARD # @PURPOSE: Authenticate against Superset and cache access/csrf tokens. # @POST: Client tokens are populated and reusable across requests. + # @SIDE_EFFECT: Performs network requests to Superset authentication endpoints. + # @DATA_CONTRACT: None -> Output[Dict[str, str]] async def authenticate(self) -> Dict[str, str]: cached_tokens = SupersetAuthCache.get(self._auth_cache_key) if cached_tokens and cached_tokens.get("access_token") and cached_tokens.get("csrf_token"): @@ -150,8 +162,10 @@ class AsyncAPIClient: # [/DEF:authenticate:Function] # [DEF:get_headers:Function] + # @TIER: STANDARD # @PURPOSE: Return authenticated Superset headers for async requests. # @POST: Headers include Authorization and CSRF tokens. + # @RELATION: CALLS -> self.authenticate async def get_headers(self) -> Dict[str, str]: if not self._authenticated: await self.authenticate() @@ -164,8 +178,13 @@ class AsyncAPIClient: # [/DEF:get_headers:Function] # [DEF:request:Function] + # @TIER: STANDARD # @PURPOSE: Perform one authenticated async Superset API request. # @POST: Returns JSON payload or raw httpx.Response when raw_response=true. + # @SIDE_EFFECT: Performs network I/O. + # @RELATION: [CALLS] ->[self.get_headers] + # @RELATION: [CALLS] ->[self._handle_http_error] + # @RELATION: [CALLS] ->[self._handle_network_error] async def request( self, method: str, @@ -196,8 +215,10 @@ class AsyncAPIClient: # [/DEF:request:Function] # [DEF:_handle_http_error:Function] + # @TIER: STANDARD # @PURPOSE: Translate upstream HTTP errors into stable domain exceptions. # @POST: Raises domain-specific exception for caller flow control. + # @DATA_CONTRACT: Input[httpx.HTTPStatusError] -> Exception def _handle_http_error(self, exc: httpx.HTTPStatusError, endpoint: str) -> None: with belief_scope("AsyncAPIClient._handle_http_error"): status_code = exc.response.status_code @@ -213,8 +234,10 @@ class AsyncAPIClient: # [/DEF:_handle_http_error:Function] # [DEF:_handle_network_error:Function] + # @TIER: STANDARD # @PURPOSE: Translate generic httpx errors into NetworkError. # @POST: Raises NetworkError with URL context. + # @DATA_CONTRACT: Input[httpx.HTTPError] -> NetworkError def _handle_network_error(self, exc: httpx.HTTPError, url: str) -> None: with belief_scope("AsyncAPIClient._handle_network_error"): if isinstance(exc, httpx.TimeoutException): @@ -227,8 +250,10 @@ class AsyncAPIClient: # [/DEF:_handle_network_error:Function] # [DEF:aclose:Function] + # @TIER: STANDARD # @PURPOSE: Close underlying httpx client. # @POST: Client resources are released. + # @SIDE_EFFECT: Closes network connections. async def aclose(self) -> None: await self._client.aclose() # [/DEF:aclose:Function] diff --git a/backend/src/core/utils/network.py b/backend/src/core/utils/network.py index a4bb5409..7183b9e1 100644 --- a/backend/src/core/utils/network.py +++ b/backend/src/core/utils/network.py @@ -145,7 +145,10 @@ class SupersetAuthCache: # [/DEF:SupersetAuthCache:Class] # [DEF:APIClient:Class] -# @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов. +# @TIER: STANDARD +# @PURPOSE: Synchronous Superset API client with process-local auth token caching. +# @RELATION: DEPENDS_ON -> backend.src.core.utils.network.SupersetAuthCache +# @RELATION: DEPENDS_ON -> backend.src.core.logger.logger class APIClient: DEFAULT_TIMEOUT = 30 diff --git a/backend/src/services/health_service.py b/backend/src/services/health_service.py index a15fd1ac..189093ce 100644 --- a/backend/src/services/health_service.py +++ b/backend/src/services/health_service.py @@ -22,8 +22,11 @@ from ..core.task_manager import TaskManager # [DEF:HealthService:Class] # @TIER: STANDARD # @PURPOSE: Aggregate latest dashboard validation state and manage persisted health report lifecycle. -# @RELATION: CALLS -> backend.src.core.superset_client.SupersetClient -# @RELATION: CALLS -> backend.src.core.task_manager.cleanup.TaskCleanupService +# @RELATION: [DEPENDS_ON] ->[backend.src.models.llm.ValidationRecord] +# @RELATION: [DEPENDS_ON] ->[backend.src.schemas.health.DashboardHealthItem] +# @RELATION: [DEPENDS_ON] ->[backend.src.schemas.health.HealthSummaryResponse] +# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient] +# @RELATION: [CALLS] ->[backend.src.core.task_manager.cleanup.TaskCleanupService] class HealthService: _dashboard_summary_cache: Dict[str, Tuple[float, Dict[str, Dict[str, Optional[str]]]]] = {} _dashboard_summary_cache_ttl_seconds = 60.0 @@ -32,6 +35,7 @@ class HealthService: @PURPOSE: Service for managing and querying dashboard health data. """ # [DEF:HealthService.__init__:Function] + # @TIER: STANDARD # @PURPOSE: Initialize health service with DB session and optional config access for dashboard metadata resolution. # @PRE: db is a valid SQLAlchemy session. # @POST: Service is ready to aggregate summaries and delete health reports. @@ -42,10 +46,12 @@ class HealthService: # [/DEF:HealthService.__init__:Function] # [DEF:HealthService._prime_dashboard_meta_cache:Function] + # @TIER: STANDARD # @PURPOSE: Warm dashboard slug/title cache with one Superset list fetch per environment. # @PRE: records may contain mixed numeric and slug dashboard identifiers. # @POST: Numeric dashboard ids for known environments are cached when discoverable. # @SIDE_EFFECT: May call Superset dashboard list API once per referenced environment. + # @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient.get_dashboards_summary] def _prime_dashboard_meta_cache(self, records: List[ValidationRecord]) -> None: if not self.config_manager or not records: return @@ -121,10 +127,10 @@ class HealthService: # [/DEF:HealthService._prime_dashboard_meta_cache:Function] # [DEF:HealthService._resolve_dashboard_meta:Function] + # @TIER: TRIVIAL # @PURPOSE: Resolve slug/title for a dashboard referenced by persisted validation record. # @PRE: dashboard_id may be numeric or slug-like; environment_id may be empty. # @POST: Returns dict with `slug` and `title` keys, using cache when possible. - # @SIDE_EFFECT: May call Superset API through SupersetClient. def _resolve_dashboard_meta(self, dashboard_id: str, environment_id: Optional[str]) -> Dict[str, Optional[str]]: normalized_dashboard_id = str(dashboard_id or "").strip() normalized_environment_id = str(environment_id or "").strip() @@ -148,10 +154,14 @@ class HealthService: # [/DEF:HealthService._resolve_dashboard_meta:Function] # [DEF:HealthService.get_health_summary:Function] + # @TIER: STANDARD # @PURPOSE: Aggregate latest validation status per dashboard and enrich rows with dashboard slug/title. # @PRE: environment_id may be omitted to aggregate across all environments. # @POST: Returns HealthSummaryResponse with counts and latest record row per dashboard. # @SIDE_EFFECT: May call Superset API to resolve dashboard metadata. + # @DATA_CONTRACT: Input[environment_id: Optional[str]] -> Output[HealthSummaryResponse] + # @RELATION: [CALLS] ->[self._prime_dashboard_meta_cache] + # @RELATION: [CALLS] ->[self._resolve_dashboard_meta] async def get_health_summary(self, environment_id: str = None) -> HealthSummaryResponse: """ @PURPOSE: Aggregates the latest validation status for all dashboards. @@ -222,10 +232,13 @@ class HealthService: # [/DEF:HealthService.get_health_summary:Function] # [DEF:HealthService.delete_validation_report:Function] + # @TIER: STANDARD # @PURPOSE: Delete one persisted health report and optionally clean linked task/log artifacts. # @PRE: record_id is a validation record identifier. # @POST: Returns True only when a matching record was deleted. - # @SIDE_EFFECT: Deletes DB row, optional screenshot file, and optional task/log persistence. + # @SIDE_EFFECT: Deletes DB rows, optional screenshot file, and optional task/log persistence. + # @DATA_CONTRACT: Input[record_id: str, task_manager: Optional[TaskManager]] -> Output[bool] + # @RELATION: [CALLS] ->[backend.src.core.task_manager.cleanup.TaskCleanupService.delete_task_with_logs] def delete_validation_report(self, record_id: str, task_manager: Optional[TaskManager] = None) -> bool: record = self.db.query(ValidationRecord).filter(ValidationRecord.id == record_id).first() if not record: