feat(clean-release): complete compliance redesign phases and polish tasks T047-T052
This commit is contained in:
298
backend/src/core/async_superset_client.py
Normal file
298
backend/src/core/async_superset_client.py
Normal file
@@ -0,0 +1,298 @@
|
||||
# [DEF:backend.src.core.async_superset_client:Module]
|
||||
#
|
||||
# @TIER: CRITICAL
|
||||
# @SEMANTICS: superset, async, client, httpx, dashboards, datasets
|
||||
# @PURPOSE: Async Superset client for dashboard hot-path requests without blocking FastAPI event loop.
|
||||
# @LAYER: Core
|
||||
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
|
||||
# @RELATION: DEPENDS_ON -> backend.src.core.utils.async_network.AsyncAPIClient
|
||||
# @INVARIANT: Async dashboard operations reuse shared auth cache and avoid sync requests in async routes.
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Tuple, cast
|
||||
|
||||
from .config_models import Environment
|
||||
from .logger import logger as app_logger, belief_scope
|
||||
from .superset_client import SupersetClient
|
||||
from .utils.async_network import AsyncAPIClient
|
||||
# [/SECTION]
|
||||
|
||||
|
||||
# [DEF:AsyncSupersetClient:Class]
|
||||
# @PURPOSE: Async sibling of SupersetClient for dashboard read paths.
|
||||
class AsyncSupersetClient(SupersetClient):
|
||||
# [DEF:__init__:Function]
|
||||
# @PURPOSE: Initialize async Superset client with AsyncAPIClient transport.
|
||||
# @PRE: env is valid.
|
||||
# @POST: Client uses async network transport and inherited projection helpers.
|
||||
def __init__(self, env: Environment):
|
||||
self.env = env
|
||||
auth_payload = {
|
||||
"username": env.username,
|
||||
"password": env.password,
|
||||
"provider": "db",
|
||||
"refresh": "true",
|
||||
}
|
||||
self.network = AsyncAPIClient(
|
||||
config={"base_url": env.url, "auth": auth_payload},
|
||||
verify_ssl=env.verify_ssl,
|
||||
timeout=env.timeout,
|
||||
)
|
||||
self.delete_before_reimport = False
|
||||
# [/DEF:__init__:Function]
|
||||
|
||||
# [DEF:aclose:Function]
|
||||
# @PURPOSE: Close async transport resources.
|
||||
# @POST: Underlying AsyncAPIClient is closed.
|
||||
async def aclose(self) -> None:
|
||||
await self.network.aclose()
|
||||
# [/DEF:aclose:Function]
|
||||
|
||||
# [DEF:get_dashboards_page_async:Function]
|
||||
# @PURPOSE: Fetch one dashboards page asynchronously.
|
||||
# @POST: Returns total count and page result list.
|
||||
async def get_dashboards_page_async(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
|
||||
with belief_scope("AsyncSupersetClient.get_dashboards_page_async"):
|
||||
validated_query = self._validate_query_params(query or {})
|
||||
if "columns" not in validated_query:
|
||||
validated_query["columns"] = [
|
||||
"slug",
|
||||
"id",
|
||||
"url",
|
||||
"changed_on_utc",
|
||||
"dashboard_title",
|
||||
"published",
|
||||
"created_by",
|
||||
"changed_by",
|
||||
"changed_by_name",
|
||||
"owners",
|
||||
]
|
||||
|
||||
response_json = cast(
|
||||
Dict[str, Any],
|
||||
await self.network.request(
|
||||
method="GET",
|
||||
endpoint="/dashboard/",
|
||||
params={"q": json.dumps(validated_query)},
|
||||
),
|
||||
)
|
||||
result = response_json.get("result", [])
|
||||
total_count = response_json.get("count", len(result))
|
||||
return total_count, result
|
||||
# [/DEF:get_dashboards_page_async:Function]
|
||||
|
||||
# [DEF:get_dashboard_async:Function]
|
||||
# @PURPOSE: Fetch one dashboard payload asynchronously.
|
||||
# @POST: Returns raw dashboard payload from Superset API.
|
||||
async def get_dashboard_async(self, dashboard_id: int) -> Dict:
|
||||
with belief_scope("AsyncSupersetClient.get_dashboard_async", f"id={dashboard_id}"):
|
||||
response = await self.network.request(method="GET", endpoint=f"/dashboard/{dashboard_id}")
|
||||
return cast(Dict, response)
|
||||
# [/DEF:get_dashboard_async:Function]
|
||||
|
||||
# [DEF:get_chart_async:Function]
|
||||
# @PURPOSE: Fetch one chart payload asynchronously.
|
||||
# @POST: Returns raw chart payload from Superset API.
|
||||
async def get_chart_async(self, chart_id: int) -> Dict:
|
||||
with belief_scope("AsyncSupersetClient.get_chart_async", f"id={chart_id}"):
|
||||
response = await self.network.request(method="GET", endpoint=f"/chart/{chart_id}")
|
||||
return cast(Dict, response)
|
||||
# [/DEF:get_chart_async:Function]
|
||||
|
||||
# [DEF:get_dashboard_detail_async:Function]
|
||||
# @PURPOSE: Fetch dashboard detail asynchronously with concurrent charts/datasets requests.
|
||||
# @POST: Returns dashboard detail payload for overview page.
|
||||
async def get_dashboard_detail_async(self, dashboard_id: int) -> Dict:
|
||||
with belief_scope("AsyncSupersetClient.get_dashboard_detail_async", f"id={dashboard_id}"):
|
||||
dashboard_response = await self.get_dashboard_async(dashboard_id)
|
||||
dashboard_data = dashboard_response.get("result", dashboard_response)
|
||||
|
||||
charts: List[Dict] = []
|
||||
datasets: List[Dict] = []
|
||||
|
||||
def extract_dataset_id_from_form_data(form_data: Optional[Dict]) -> Optional[int]:
|
||||
if not isinstance(form_data, dict):
|
||||
return None
|
||||
datasource = form_data.get("datasource")
|
||||
if isinstance(datasource, str):
|
||||
matched = re.match(r"^(\d+)__", datasource)
|
||||
if matched:
|
||||
try:
|
||||
return int(matched.group(1))
|
||||
except ValueError:
|
||||
return None
|
||||
if isinstance(datasource, dict):
|
||||
ds_id = datasource.get("id")
|
||||
try:
|
||||
return int(ds_id) if ds_id is not None else None
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
ds_id = form_data.get("datasource_id")
|
||||
try:
|
||||
return int(ds_id) if ds_id is not None else None
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
chart_task = self.network.request(
|
||||
method="GET",
|
||||
endpoint=f"/dashboard/{dashboard_id}/charts",
|
||||
)
|
||||
dataset_task = self.network.request(
|
||||
method="GET",
|
||||
endpoint=f"/dashboard/{dashboard_id}/datasets",
|
||||
)
|
||||
charts_response, datasets_response = await asyncio.gather(
|
||||
chart_task,
|
||||
dataset_task,
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
if not isinstance(charts_response, Exception):
|
||||
charts_payload = charts_response.get("result", []) if isinstance(charts_response, dict) else []
|
||||
for chart_obj in charts_payload:
|
||||
if not isinstance(chart_obj, dict):
|
||||
continue
|
||||
chart_id = chart_obj.get("id")
|
||||
if chart_id is None:
|
||||
continue
|
||||
form_data = chart_obj.get("form_data")
|
||||
if isinstance(form_data, str):
|
||||
try:
|
||||
form_data = json.loads(form_data)
|
||||
except Exception:
|
||||
form_data = {}
|
||||
dataset_id = extract_dataset_id_from_form_data(form_data) or chart_obj.get("datasource_id")
|
||||
charts.append({
|
||||
"id": int(chart_id),
|
||||
"title": chart_obj.get("slice_name") or chart_obj.get("name") or f"Chart {chart_id}",
|
||||
"viz_type": (form_data.get("viz_type") if isinstance(form_data, dict) else None),
|
||||
"dataset_id": int(dataset_id) if dataset_id is not None else None,
|
||||
"last_modified": chart_obj.get("changed_on"),
|
||||
"overview": chart_obj.get("description") or (form_data.get("viz_type") if isinstance(form_data, dict) else None) or "Chart",
|
||||
})
|
||||
else:
|
||||
app_logger.warning("[get_dashboard_detail_async][Warning] Failed to fetch dashboard charts: %s", charts_response)
|
||||
|
||||
if not isinstance(datasets_response, Exception):
|
||||
datasets_payload = datasets_response.get("result", []) if isinstance(datasets_response, dict) else []
|
||||
for dataset_obj in datasets_payload:
|
||||
if not isinstance(dataset_obj, dict):
|
||||
continue
|
||||
dataset_id = dataset_obj.get("id")
|
||||
if dataset_id is None:
|
||||
continue
|
||||
db_payload = dataset_obj.get("database")
|
||||
db_name = db_payload.get("database_name") if isinstance(db_payload, dict) else None
|
||||
table_name = dataset_obj.get("table_name") or dataset_obj.get("datasource_name") or dataset_obj.get("name") or f"Dataset {dataset_id}"
|
||||
schema = dataset_obj.get("schema")
|
||||
fq_name = f"{schema}.{table_name}" if schema else table_name
|
||||
datasets.append({
|
||||
"id": int(dataset_id),
|
||||
"table_name": table_name,
|
||||
"schema": schema,
|
||||
"database": db_name or dataset_obj.get("database_name") or "Unknown",
|
||||
"last_modified": dataset_obj.get("changed_on"),
|
||||
"overview": fq_name,
|
||||
})
|
||||
else:
|
||||
app_logger.warning("[get_dashboard_detail_async][Warning] Failed to fetch dashboard datasets: %s", datasets_response)
|
||||
|
||||
if not charts:
|
||||
raw_position_json = dashboard_data.get("position_json")
|
||||
chart_ids_from_position = set()
|
||||
if isinstance(raw_position_json, str) and raw_position_json:
|
||||
try:
|
||||
parsed_position = json.loads(raw_position_json)
|
||||
chart_ids_from_position.update(self._extract_chart_ids_from_layout(parsed_position))
|
||||
except Exception:
|
||||
pass
|
||||
elif isinstance(raw_position_json, dict):
|
||||
chart_ids_from_position.update(self._extract_chart_ids_from_layout(raw_position_json))
|
||||
|
||||
raw_json_metadata = dashboard_data.get("json_metadata")
|
||||
if isinstance(raw_json_metadata, str) and raw_json_metadata:
|
||||
try:
|
||||
parsed_metadata = json.loads(raw_json_metadata)
|
||||
chart_ids_from_position.update(self._extract_chart_ids_from_layout(parsed_metadata))
|
||||
except Exception:
|
||||
pass
|
||||
elif isinstance(raw_json_metadata, dict):
|
||||
chart_ids_from_position.update(self._extract_chart_ids_from_layout(raw_json_metadata))
|
||||
|
||||
fallback_chart_tasks = [
|
||||
self.get_chart_async(int(chart_id))
|
||||
for chart_id in sorted(chart_ids_from_position)
|
||||
]
|
||||
fallback_chart_responses = await asyncio.gather(
|
||||
*fallback_chart_tasks,
|
||||
return_exceptions=True,
|
||||
)
|
||||
for chart_id, chart_response in zip(sorted(chart_ids_from_position), fallback_chart_responses):
|
||||
if isinstance(chart_response, Exception):
|
||||
app_logger.warning("[get_dashboard_detail_async][Warning] Failed to resolve fallback chart %s: %s", chart_id, chart_response)
|
||||
continue
|
||||
chart_data = chart_response.get("result", chart_response)
|
||||
charts.append({
|
||||
"id": int(chart_id),
|
||||
"title": chart_data.get("slice_name") or chart_data.get("name") or f"Chart {chart_id}",
|
||||
"viz_type": chart_data.get("viz_type"),
|
||||
"dataset_id": chart_data.get("datasource_id"),
|
||||
"last_modified": chart_data.get("changed_on"),
|
||||
"overview": chart_data.get("description") or chart_data.get("viz_type") or "Chart",
|
||||
})
|
||||
|
||||
dataset_ids_from_charts = {
|
||||
c.get("dataset_id")
|
||||
for c in charts
|
||||
if c.get("dataset_id") is not None
|
||||
}
|
||||
known_dataset_ids = {d.get("id") for d in datasets if d.get("id") is not None}
|
||||
missing_dataset_ids = sorted(int(item) for item in dataset_ids_from_charts if item not in known_dataset_ids)
|
||||
if missing_dataset_ids:
|
||||
dataset_fetch_tasks = [
|
||||
self.network.request(method="GET", endpoint=f"/dataset/{dataset_id}")
|
||||
for dataset_id in missing_dataset_ids
|
||||
]
|
||||
dataset_fetch_responses = await asyncio.gather(
|
||||
*dataset_fetch_tasks,
|
||||
return_exceptions=True,
|
||||
)
|
||||
for dataset_id, dataset_response in zip(missing_dataset_ids, dataset_fetch_responses):
|
||||
if isinstance(dataset_response, Exception):
|
||||
app_logger.warning("[get_dashboard_detail_async][Warning] Failed to backfill dataset %s: %s", dataset_id, dataset_response)
|
||||
continue
|
||||
dataset_data = dataset_response.get("result", dataset_response) if isinstance(dataset_response, dict) else {}
|
||||
db_payload = dataset_data.get("database")
|
||||
db_name = db_payload.get("database_name") if isinstance(db_payload, dict) else None
|
||||
table_name = dataset_data.get("table_name") or dataset_data.get("datasource_name") or dataset_data.get("name") or f"Dataset {dataset_id}"
|
||||
schema = dataset_data.get("schema")
|
||||
fq_name = f"{schema}.{table_name}" if schema else table_name
|
||||
datasets.append({
|
||||
"id": int(dataset_id),
|
||||
"table_name": table_name,
|
||||
"schema": schema,
|
||||
"database": db_name or dataset_data.get("database_name") or "Unknown",
|
||||
"last_modified": dataset_data.get("changed_on"),
|
||||
"overview": fq_name,
|
||||
})
|
||||
|
||||
return {
|
||||
"id": int(dashboard_data.get("id") or dashboard_id),
|
||||
"title": dashboard_data.get("dashboard_title") or dashboard_data.get("title") or f"Dashboard {dashboard_id}",
|
||||
"slug": dashboard_data.get("slug"),
|
||||
"url": dashboard_data.get("url"),
|
||||
"description": dashboard_data.get("description"),
|
||||
"last_modified": dashboard_data.get("changed_on_utc") or dashboard_data.get("changed_on"),
|
||||
"published": dashboard_data.get("published"),
|
||||
"charts": charts,
|
||||
"datasets": datasets,
|
||||
"chart_count": len(charts),
|
||||
"dataset_count": len(datasets),
|
||||
}
|
||||
# [/DEF:get_dashboard_detail_async:Function]
|
||||
# [/DEF:AsyncSupersetClient:Class]
|
||||
|
||||
# [/DEF:backend.src.core.async_superset_client:Module]
|
||||
@@ -24,19 +24,19 @@ class Schedule(BaseModel):
|
||||
|
||||
# [DEF:Environment:DataClass]
|
||||
# @PURPOSE: Represents a Superset environment configuration.
|
||||
class Environment(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
url: str
|
||||
username: str
|
||||
password: str # Will be masked in UI
|
||||
stage: str = Field(default="DEV", pattern="^(DEV|PREPROD|PROD)$")
|
||||
verify_ssl: bool = True
|
||||
timeout: int = 30
|
||||
is_default: bool = False
|
||||
is_production: bool = False
|
||||
backup_schedule: Schedule = Field(default_factory=Schedule)
|
||||
# [/DEF:Environment:DataClass]
|
||||
class Environment(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
url: str
|
||||
username: str
|
||||
password: str # Will be masked in UI
|
||||
stage: str = Field(default="DEV", pattern="^(DEV|PREPROD|PROD)$")
|
||||
verify_ssl: bool = True
|
||||
timeout: int = 30
|
||||
is_default: bool = False
|
||||
is_production: bool = False
|
||||
backup_schedule: Schedule = Field(default_factory=Schedule)
|
||||
# [/DEF:Environment:DataClass]
|
||||
|
||||
# [DEF:LoggingConfig:DataClass]
|
||||
# @PURPOSE: Defines the configuration for the application's logging system.
|
||||
@@ -49,10 +49,18 @@ class LoggingConfig(BaseModel):
|
||||
enable_belief_state: bool = True
|
||||
# [/DEF:LoggingConfig:DataClass]
|
||||
|
||||
# [DEF:CleanReleaseConfig:DataClass]
|
||||
# @PURPOSE: Configuration for clean release compliance subsystem.
|
||||
class CleanReleaseConfig(BaseModel):
|
||||
active_policy_id: Optional[str] = None
|
||||
active_registry_id: Optional[str] = None
|
||||
# [/DEF:CleanReleaseConfig:DataClass]
|
||||
|
||||
# [DEF:GlobalSettings:DataClass]
|
||||
# @PURPOSE: Represents global application settings.
|
||||
class GlobalSettings(BaseModel):
|
||||
storage: StorageConfig = Field(default_factory=StorageConfig)
|
||||
clean_release: CleanReleaseConfig = Field(default_factory=CleanReleaseConfig)
|
||||
default_environment_id: Optional[str] = None
|
||||
logging: LoggingConfig = Field(default_factory=LoggingConfig)
|
||||
connections: List[dict] = []
|
||||
|
||||
@@ -21,6 +21,7 @@ from ..models import config as _config_models # noqa: F401
|
||||
from ..models import llm as _llm_models # noqa: F401
|
||||
from ..models import assistant as _assistant_models # noqa: F401
|
||||
from ..models import profile as _profile_models # noqa: F401
|
||||
from ..models import clean_release as _clean_release_models # noqa: F401
|
||||
from .logger import belief_scope, logger
|
||||
from .auth.config import auth_config
|
||||
import os
|
||||
|
||||
237
backend/src/core/utils/async_network.py
Normal file
237
backend/src/core/utils/async_network.py
Normal file
@@ -0,0 +1,237 @@
|
||||
# [DEF:backend.src.core.utils.async_network:Module]
|
||||
#
|
||||
# @TIER: CRITICAL
|
||||
# @SEMANTICS: network, httpx, async, superset, authentication, cache
|
||||
# @PURPOSE: Provides async Superset API client with shared auth-token cache to avoid per-request re-login.
|
||||
# @LAYER: Infra
|
||||
# @RELATION: DEPENDS_ON -> backend.src.core.utils.network.SupersetAuthCache
|
||||
# @INVARIANT: Async client reuses cached auth tokens per environment credentials and invalidates on 401.
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
from typing import Optional, Dict, Any, Union
|
||||
import asyncio
|
||||
|
||||
import httpx
|
||||
|
||||
from ..logger import logger as app_logger, belief_scope
|
||||
from .network import (
|
||||
AuthenticationError,
|
||||
DashboardNotFoundError,
|
||||
NetworkError,
|
||||
PermissionDeniedError,
|
||||
SupersetAPIError,
|
||||
SupersetAuthCache,
|
||||
)
|
||||
# [/SECTION]
|
||||
|
||||
|
||||
# [DEF:AsyncAPIClient:Class]
|
||||
# @PURPOSE: Async Superset API client backed by httpx.AsyncClient with shared auth cache.
|
||||
class AsyncAPIClient:
|
||||
DEFAULT_TIMEOUT = 30
|
||||
_auth_locks: Dict[tuple[str, str, bool], asyncio.Lock] = {}
|
||||
|
||||
# [DEF:__init__:Function]
|
||||
# @PURPOSE: Initialize async API client for one environment.
|
||||
# @PRE: config contains base_url and auth payload.
|
||||
# @POST: Client is ready for async request/authentication flow.
|
||||
def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT):
|
||||
self.base_url: str = self._normalize_base_url(config.get("base_url", ""))
|
||||
self.api_base_url: str = f"{self.base_url}/api/v1"
|
||||
self.auth = config.get("auth")
|
||||
self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout}
|
||||
self._client = httpx.AsyncClient(
|
||||
verify=verify_ssl,
|
||||
timeout=httpx.Timeout(timeout),
|
||||
follow_redirects=True,
|
||||
)
|
||||
self._tokens: Dict[str, str] = {}
|
||||
self._authenticated = False
|
||||
self._auth_cache_key = SupersetAuthCache.build_key(
|
||||
self.base_url,
|
||||
self.auth,
|
||||
verify_ssl,
|
||||
)
|
||||
|
||||
# [/DEF:__init__:Function]
|
||||
|
||||
# [DEF:_normalize_base_url:Function]
|
||||
# @PURPOSE: Normalize base URL for Superset API root construction.
|
||||
# @POST: Returns canonical base URL without trailing slash and duplicate /api/v1 suffix.
|
||||
def _normalize_base_url(self, raw_url: str) -> str:
|
||||
normalized = str(raw_url or "").strip().rstrip("/")
|
||||
if normalized.lower().endswith("/api/v1"):
|
||||
normalized = normalized[:-len("/api/v1")]
|
||||
return normalized.rstrip("/")
|
||||
# [/DEF:_normalize_base_url:Function]
|
||||
|
||||
# [DEF:_build_api_url:Function]
|
||||
# @PURPOSE: Build full API URL from relative Superset endpoint.
|
||||
# @POST: Returns absolute URL for upstream request.
|
||||
def _build_api_url(self, endpoint: str) -> str:
|
||||
normalized_endpoint = str(endpoint or "").strip()
|
||||
if normalized_endpoint.startswith("http://") or normalized_endpoint.startswith("https://"):
|
||||
return normalized_endpoint
|
||||
if not normalized_endpoint.startswith("/"):
|
||||
normalized_endpoint = f"/{normalized_endpoint}"
|
||||
if normalized_endpoint.startswith("/api/v1/") or normalized_endpoint == "/api/v1":
|
||||
return f"{self.base_url}{normalized_endpoint}"
|
||||
return f"{self.api_base_url}{normalized_endpoint}"
|
||||
# [/DEF:_build_api_url:Function]
|
||||
|
||||
# [DEF:_get_auth_lock:Function]
|
||||
# @PURPOSE: Return per-cache-key async lock to serialize fresh login attempts.
|
||||
# @POST: Returns stable asyncio.Lock instance.
|
||||
@classmethod
|
||||
def _get_auth_lock(cls, cache_key: tuple[str, str, bool]) -> asyncio.Lock:
|
||||
existing_lock = cls._auth_locks.get(cache_key)
|
||||
if existing_lock is not None:
|
||||
return existing_lock
|
||||
created_lock = asyncio.Lock()
|
||||
cls._auth_locks[cache_key] = created_lock
|
||||
return created_lock
|
||||
# [/DEF:_get_auth_lock:Function]
|
||||
|
||||
# [DEF:authenticate:Function]
|
||||
# @PURPOSE: Authenticate against Superset and cache access/csrf tokens.
|
||||
# @POST: Client tokens are populated and reusable across requests.
|
||||
async def authenticate(self) -> Dict[str, str]:
|
||||
cached_tokens = SupersetAuthCache.get(self._auth_cache_key)
|
||||
if cached_tokens and cached_tokens.get("access_token") and cached_tokens.get("csrf_token"):
|
||||
self._tokens = cached_tokens
|
||||
self._authenticated = True
|
||||
app_logger.info("[async_authenticate][CacheHit] Reusing cached Superset auth tokens for %s", self.base_url)
|
||||
return self._tokens
|
||||
|
||||
auth_lock = self._get_auth_lock(self._auth_cache_key)
|
||||
async with auth_lock:
|
||||
cached_tokens = SupersetAuthCache.get(self._auth_cache_key)
|
||||
if cached_tokens and cached_tokens.get("access_token") and cached_tokens.get("csrf_token"):
|
||||
self._tokens = cached_tokens
|
||||
self._authenticated = True
|
||||
app_logger.info("[async_authenticate][CacheHitAfterWait] Reusing cached Superset auth tokens for %s", self.base_url)
|
||||
return self._tokens
|
||||
|
||||
with belief_scope("AsyncAPIClient.authenticate"):
|
||||
app_logger.info("[async_authenticate][Enter] Authenticating to %s", self.base_url)
|
||||
try:
|
||||
login_url = f"{self.api_base_url}/security/login"
|
||||
response = await self._client.post(login_url, json=self.auth)
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
|
||||
csrf_url = f"{self.api_base_url}/security/csrf_token/"
|
||||
csrf_response = await self._client.get(
|
||||
csrf_url,
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
)
|
||||
csrf_response.raise_for_status()
|
||||
|
||||
self._tokens = {
|
||||
"access_token": access_token,
|
||||
"csrf_token": csrf_response.json()["result"],
|
||||
}
|
||||
self._authenticated = True
|
||||
SupersetAuthCache.set(self._auth_cache_key, self._tokens)
|
||||
app_logger.info("[async_authenticate][Exit] Authenticated successfully.")
|
||||
return self._tokens
|
||||
except httpx.HTTPStatusError as exc:
|
||||
SupersetAuthCache.invalidate(self._auth_cache_key)
|
||||
status_code = exc.response.status_code if exc.response is not None else None
|
||||
if status_code in [502, 503, 504]:
|
||||
raise NetworkError(
|
||||
f"Environment unavailable during authentication (Status {status_code})",
|
||||
status_code=status_code,
|
||||
) from exc
|
||||
raise AuthenticationError(f"Authentication failed: {exc}") from exc
|
||||
except (httpx.HTTPError, KeyError) as exc:
|
||||
SupersetAuthCache.invalidate(self._auth_cache_key)
|
||||
raise NetworkError(f"Network or parsing error during authentication: {exc}") from exc
|
||||
# [/DEF:authenticate:Function]
|
||||
|
||||
# [DEF:get_headers:Function]
|
||||
# @PURPOSE: Return authenticated Superset headers for async requests.
|
||||
# @POST: Headers include Authorization and CSRF tokens.
|
||||
async def get_headers(self) -> Dict[str, str]:
|
||||
if not self._authenticated:
|
||||
await self.authenticate()
|
||||
return {
|
||||
"Authorization": f"Bearer {self._tokens['access_token']}",
|
||||
"X-CSRFToken": self._tokens.get("csrf_token", ""),
|
||||
"Referer": self.base_url,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
# [/DEF:get_headers:Function]
|
||||
|
||||
# [DEF:request:Function]
|
||||
# @PURPOSE: Perform one authenticated async Superset API request.
|
||||
# @POST: Returns JSON payload or raw httpx.Response when raw_response=true.
|
||||
async def request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
raw_response: bool = False,
|
||||
**kwargs,
|
||||
) -> Union[httpx.Response, Dict[str, Any]]:
|
||||
full_url = self._build_api_url(endpoint)
|
||||
request_headers = await self.get_headers()
|
||||
if headers:
|
||||
request_headers.update(headers)
|
||||
if "allow_redirects" in kwargs and "follow_redirects" not in kwargs:
|
||||
kwargs["follow_redirects"] = bool(kwargs.pop("allow_redirects"))
|
||||
|
||||
try:
|
||||
response = await self._client.request(method, full_url, headers=request_headers, **kwargs)
|
||||
response.raise_for_status()
|
||||
return response if raw_response else response.json()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
if exc.response is not None and exc.response.status_code == 401:
|
||||
self._authenticated = False
|
||||
self._tokens = {}
|
||||
SupersetAuthCache.invalidate(self._auth_cache_key)
|
||||
self._handle_http_error(exc, endpoint)
|
||||
except httpx.HTTPError as exc:
|
||||
self._handle_network_error(exc, full_url)
|
||||
# [/DEF:request:Function]
|
||||
|
||||
# [DEF:_handle_http_error:Function]
|
||||
# @PURPOSE: Translate upstream HTTP errors into stable domain exceptions.
|
||||
# @POST: Raises domain-specific exception for caller flow control.
|
||||
def _handle_http_error(self, exc: httpx.HTTPStatusError, endpoint: str) -> None:
|
||||
with belief_scope("AsyncAPIClient._handle_http_error"):
|
||||
status_code = exc.response.status_code
|
||||
if status_code in [502, 503, 504]:
|
||||
raise NetworkError(f"Environment unavailable (Status {status_code})", status_code=status_code) from exc
|
||||
if status_code == 404:
|
||||
raise DashboardNotFoundError(endpoint) from exc
|
||||
if status_code == 403:
|
||||
raise PermissionDeniedError() from exc
|
||||
if status_code == 401:
|
||||
raise AuthenticationError() from exc
|
||||
raise SupersetAPIError(f"API Error {status_code}: {exc.response.text}") from exc
|
||||
# [/DEF:_handle_http_error:Function]
|
||||
|
||||
# [DEF:_handle_network_error:Function]
|
||||
# @PURPOSE: Translate generic httpx errors into NetworkError.
|
||||
# @POST: Raises NetworkError with URL context.
|
||||
def _handle_network_error(self, exc: httpx.HTTPError, url: str) -> None:
|
||||
with belief_scope("AsyncAPIClient._handle_network_error"):
|
||||
if isinstance(exc, httpx.TimeoutException):
|
||||
message = "Request timeout"
|
||||
elif isinstance(exc, httpx.ConnectError):
|
||||
message = "Connection error"
|
||||
else:
|
||||
message = f"Unknown network error: {exc}"
|
||||
raise NetworkError(message, url=url) from exc
|
||||
# [/DEF:_handle_network_error:Function]
|
||||
|
||||
# [DEF:aclose:Function]
|
||||
# @PURPOSE: Close underlying httpx client.
|
||||
# @POST: Client resources are released.
|
||||
async def aclose(self) -> None:
|
||||
await self._client.aclose()
|
||||
# [/DEF:aclose:Function]
|
||||
# [/DEF:AsyncAPIClient:Class]
|
||||
|
||||
# [/DEF:backend.src.core.utils.async_network:Module]
|
||||
@@ -8,10 +8,12 @@
|
||||
# @PUBLIC_API: APIClient
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
from typing import Optional, Dict, Any, List, Union, cast
|
||||
from typing import Optional, Dict, Any, List, Union, cast, Tuple
|
||||
import json
|
||||
import io
|
||||
from pathlib import Path
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
import urllib3
|
||||
@@ -86,6 +88,62 @@ class NetworkError(Exception):
|
||||
# [/DEF:__init__:Function]
|
||||
# [/DEF:NetworkError:Class]
|
||||
|
||||
|
||||
# [DEF:SupersetAuthCache:Class]
|
||||
# @PURPOSE: Process-local cache for Superset access/csrf tokens keyed by environment credentials.
|
||||
# @PRE: base_url and username are stable strings.
|
||||
# @POST: Cached entries expire automatically by TTL and can be reused across requests.
|
||||
class SupersetAuthCache:
|
||||
TTL_SECONDS = 300
|
||||
|
||||
_lock = threading.Lock()
|
||||
_entries: Dict[Tuple[str, str, bool], Dict[str, Any]] = {}
|
||||
|
||||
@classmethod
|
||||
def build_key(cls, base_url: str, auth: Optional[Dict[str, Any]], verify_ssl: bool) -> Tuple[str, str, bool]:
|
||||
username = ""
|
||||
if isinstance(auth, dict):
|
||||
username = str(auth.get("username") or "").strip()
|
||||
return (str(base_url or "").strip(), username, bool(verify_ssl))
|
||||
|
||||
@classmethod
|
||||
def get(cls, key: Tuple[str, str, bool]) -> Optional[Dict[str, str]]:
|
||||
now = time.time()
|
||||
with cls._lock:
|
||||
payload = cls._entries.get(key)
|
||||
if not payload:
|
||||
return None
|
||||
expires_at = float(payload.get("expires_at") or 0)
|
||||
if expires_at <= now:
|
||||
cls._entries.pop(key, None)
|
||||
return None
|
||||
tokens = payload.get("tokens")
|
||||
if not isinstance(tokens, dict):
|
||||
cls._entries.pop(key, None)
|
||||
return None
|
||||
return {
|
||||
"access_token": str(tokens.get("access_token") or ""),
|
||||
"csrf_token": str(tokens.get("csrf_token") or ""),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def set(cls, key: Tuple[str, str, bool], tokens: Dict[str, str], ttl_seconds: Optional[int] = None) -> None:
|
||||
normalized_ttl = max(int(ttl_seconds or cls.TTL_SECONDS), 1)
|
||||
with cls._lock:
|
||||
cls._entries[key] = {
|
||||
"tokens": {
|
||||
"access_token": str(tokens.get("access_token") or ""),
|
||||
"csrf_token": str(tokens.get("csrf_token") or ""),
|
||||
},
|
||||
"expires_at": time.time() + normalized_ttl,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def invalidate(cls, key: Tuple[str, str, bool]) -> None:
|
||||
with cls._lock:
|
||||
cls._entries.pop(key, None)
|
||||
# [/DEF:SupersetAuthCache:Class]
|
||||
|
||||
# [DEF:APIClient:Class]
|
||||
# @PURPOSE: Инкапсулирует HTTP-логику для работы с API, включая сессии, аутентификацию, и обработку запросов.
|
||||
class APIClient:
|
||||
@@ -107,6 +165,11 @@ class APIClient:
|
||||
self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout}
|
||||
self.session = self._init_session()
|
||||
self._tokens: Dict[str, str] = {}
|
||||
self._auth_cache_key = SupersetAuthCache.build_key(
|
||||
self.base_url,
|
||||
self.auth,
|
||||
verify_ssl,
|
||||
)
|
||||
self._authenticated = False
|
||||
app_logger.info("[APIClient.__init__][Exit] APIClient initialized.")
|
||||
# [/DEF:__init__:Function]
|
||||
@@ -194,6 +257,12 @@ class APIClient:
|
||||
def authenticate(self) -> Dict[str, str]:
|
||||
with belief_scope("authenticate"):
|
||||
app_logger.info("[authenticate][Enter] Authenticating to %s", self.base_url)
|
||||
cached_tokens = SupersetAuthCache.get(self._auth_cache_key)
|
||||
if cached_tokens and cached_tokens.get("access_token") and cached_tokens.get("csrf_token"):
|
||||
self._tokens = cached_tokens
|
||||
self._authenticated = True
|
||||
app_logger.info("[authenticate][CacheHit] Reusing cached Superset auth tokens for %s", self.base_url)
|
||||
return self._tokens
|
||||
try:
|
||||
login_url = f"{self.api_base_url}/security/login"
|
||||
# Log the payload keys and values (masking password)
|
||||
@@ -215,14 +284,17 @@ class APIClient:
|
||||
|
||||
self._tokens = {"access_token": access_token, "csrf_token": csrf_response.json()["result"]}
|
||||
self._authenticated = True
|
||||
SupersetAuthCache.set(self._auth_cache_key, self._tokens)
|
||||
app_logger.info("[authenticate][Exit] Authenticated successfully.")
|
||||
return self._tokens
|
||||
except requests.exceptions.HTTPError as e:
|
||||
SupersetAuthCache.invalidate(self._auth_cache_key)
|
||||
status_code = e.response.status_code if e.response is not None else None
|
||||
if status_code in [502, 503, 504]:
|
||||
raise NetworkError(f"Environment unavailable during authentication (Status {status_code})", status_code=status_code) from e
|
||||
raise AuthenticationError(f"Authentication failed: {e}") from e
|
||||
except (requests.exceptions.RequestException, KeyError) as e:
|
||||
SupersetAuthCache.invalidate(self._auth_cache_key)
|
||||
raise NetworkError(f"Network or parsing error during authentication: {e}") from e
|
||||
# [/DEF:authenticate:Function]
|
||||
|
||||
@@ -263,6 +335,10 @@ class APIClient:
|
||||
response.raise_for_status()
|
||||
return response if raw_response else response.json()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response is not None and e.response.status_code == 401:
|
||||
self._authenticated = False
|
||||
self._tokens = {}
|
||||
SupersetAuthCache.invalidate(self._auth_cache_key)
|
||||
self._handle_http_error(e, endpoint)
|
||||
except requests.exceptions.RequestException as e:
|
||||
self._handle_network_error(e, full_url)
|
||||
|
||||
Reference in New Issue
Block a user