Files
ss-tools/backend/src/core/superset_client.py
2026-03-20 20:01:58 +03:00

2277 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [DEF:SupersetClientModule:Module]
#
# @COMPLEXITY: 3
# @SEMANTICS: superset, api, client, rest, http, dashboard, dataset, import, export
# @PURPOSE: Предоставляет высокоуровневый клиент для взаимодействия с Superset REST API, инкапсулируя логику запросов, обработку ошибок и пагинацию.
# @LAYER: Core
# @RELATION: [DEPENDS_ON] ->[APIClient.__init__]
#
# @INVARIANT: All network operations must use the internal APIClient instance.
# @PUBLIC_API: SupersetClient
# [SECTION: IMPORTS]
import json
import re
import zipfile
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from requests import Response
from datetime import datetime
from .logger import logger as app_logger, belief_scope
from .utils.network import APIClient, SupersetAPIError
from .utils.fileio import get_filename_from_headers
from .config_models import Environment
app_logger = cast(Any, app_logger)
# [/SECTION]
# [DEF:SupersetClient:Class]
# @COMPLEXITY: 3
# @PURPOSE: Класс-обёртка над Superset REST API, предоставляющий методы для работы с дашбордами и датасетами.
# @RELATION: [DEPENDS_ON] ->[APIClient]
class SupersetClient:
# [DEF:SupersetClient.__init__:Function]
# @COMPLEXITY: 3
# @PURPOSE: Инициализирует клиент, проверяет конфигурацию и создает сетевой клиент.
# @PRE: `env` должен быть валидным объектом Environment.
# @POST: Атрибуты `env` и `network` созданы и готовы к работе.
# @DATA_CONTRACT: Input[Environment] -> self.network[APIClient]
# @RELATION: [DEPENDS_ON] ->[Environment]
# @RELATION: [DEPENDS_ON] ->[APIClient.__init__]
def __init__(self, env: Environment):
with belief_scope("__init__"):
app_logger.info(
"[SupersetClient.__init__][Enter] Initializing SupersetClient for env %s.",
env.name,
)
self.env = env
# Construct auth payload expected by Superset API
auth_payload = {
"username": env.username,
"password": env.password,
"provider": "db",
"refresh": "true",
}
self.network = APIClient(
config={"base_url": env.url, "auth": auth_payload},
verify_ssl=env.verify_ssl,
timeout=env.timeout,
)
self.delete_before_reimport: bool = False
app_logger.info("[SupersetClient.__init__][Exit] SupersetClient initialized.")
# [/DEF:SupersetClient.__init__:Function]
# [DEF:SupersetClient.authenticate:Function]
# @COMPLEXITY: 3
# @PURPOSE: Authenticates the client using the configured credentials.
# @PRE: self.network must be initialized with valid auth configuration.
# @POST: Client is authenticated and tokens are stored.
# @DATA_CONTRACT: None -> Output[Dict[str, str]]
# @RELATION: [CALLS] ->[APIClient.authenticate]
def authenticate(self) -> Dict[str, str]:
with belief_scope("SupersetClient.authenticate"):
return self.network.authenticate()
# [/DEF:SupersetClient.authenticate:Function]
@property
# [DEF:SupersetClient.headers:Function]
# @COMPLEXITY: 1
# @PURPOSE: Возвращает базовые HTTP-заголовки, используемые сетевым клиентом.
# @PRE: APIClient is initialized and authenticated.
# @POST: Returns a dictionary of HTTP headers.
def headers(self) -> dict:
with belief_scope("headers"):
return self.network.headers
# [/DEF:SupersetClient.headers:Function]
# [SECTION: DASHBOARD OPERATIONS]
# [DEF:SupersetClient.get_dashboards:Function]
# @COMPLEXITY: 3
# @PURPOSE: Получает полный список дашбордов, автоматически обрабатывая пагинацию.
# @PRE: Client is authenticated.
# @POST: Returns a tuple with total count and list of dashboards.
# @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]]
# @RELATION: [CALLS] ->[_fetch_all_pages]
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_dashboards"):
app_logger.info("[get_dashboards][Enter] Fetching dashboards.")
validated_query = self._validate_query_params(query or {})
if "columns" not in validated_query:
validated_query["columns"] = [
"slug",
"id",
"url",
"changed_on_utc",
"dashboard_title",
"published",
"created_by",
"changed_by",
"changed_by_name",
"owners",
]
paginated_data = self._fetch_all_pages(
endpoint="/dashboard/",
pagination_options={
"base_query": validated_query,
"results_field": "result",
},
)
total_count = len(paginated_data)
app_logger.info("[get_dashboards][Exit] Found %d dashboards.", total_count)
return total_count, paginated_data
# [/DEF:SupersetClient.get_dashboards:Function]
# [DEF:SupersetClient.get_dashboards_page:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches a single dashboards page from Superset without iterating all pages.
# @PRE: Client is authenticated.
# @POST: Returns total count and one page of dashboards.
# @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]]
# @RELATION: [CALLS] ->[request]
def get_dashboards_page(
self, query: Optional[Dict] = None
) -> Tuple[int, List[Dict]]:
with belief_scope("get_dashboards_page"):
validated_query = self._validate_query_params(query or {})
if "columns" not in validated_query:
validated_query["columns"] = [
"slug",
"id",
"url",
"changed_on_utc",
"dashboard_title",
"published",
"created_by",
"changed_by",
"changed_by_name",
"owners",
]
response_json = cast(
Dict[str, Any],
self.network.request(
method="GET",
endpoint="/dashboard/",
params={"q": json.dumps(validated_query)},
),
)
result = response_json.get("result", [])
total_count = response_json.get("count", len(result))
return total_count, result
# [/DEF:SupersetClient.get_dashboards_page:Function]
# [DEF:SupersetClient.get_dashboards_summary:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches dashboard metadata optimized for the grid.
# @PRE: Client is authenticated.
# @POST: Returns a list of dashboard metadata summaries.
# @DATA_CONTRACT: None -> Output[List[Dict]]
# @RELATION: [CALLS] ->[SupersetClient.get_dashboards]
def get_dashboards_summary(self, require_slug: bool = False) -> List[Dict]:
with belief_scope("SupersetClient.get_dashboards_summary"):
# Rely on list endpoint default projection to stay compatible
# across Superset versions and preserve owners in one request.
query: Dict[str, Any] = {}
if require_slug:
query["filters"] = [
{
"col": "slug",
"opr": "neq",
"value": "",
}
]
_, dashboards = self.get_dashboards(query=query)
# Map fields to DashboardMetadata schema
result = []
max_debug_samples = 12
for index, dash in enumerate(dashboards):
raw_owners = dash.get("owners")
raw_created_by = dash.get("created_by")
raw_changed_by = dash.get("changed_by")
raw_changed_by_name = dash.get("changed_by_name")
owners = self._extract_owner_labels(raw_owners)
# No per-dashboard detail requests here: keep list endpoint O(1).
if not owners:
owners = self._extract_owner_labels(
[raw_created_by, raw_changed_by],
)
projected_created_by = self._extract_user_display(
None,
raw_created_by,
)
projected_modified_by = self._extract_user_display(
raw_changed_by_name,
raw_changed_by,
)
raw_owner_usernames: List[str] = []
if isinstance(raw_owners, list):
for owner_payload in raw_owners:
if isinstance(owner_payload, dict):
owner_username = self._sanitize_user_text(
owner_payload.get("username")
)
if owner_username:
raw_owner_usernames.append(owner_username)
result.append(
{
"id": dash.get("id"),
"slug": dash.get("slug"),
"title": dash.get("dashboard_title"),
"url": dash.get("url"),
"last_modified": dash.get("changed_on_utc"),
"status": "published" if dash.get("published") else "draft",
"created_by": projected_created_by,
"modified_by": projected_modified_by,
"owners": owners,
}
)
if index < max_debug_samples:
app_logger.reflect(
"[REFLECT] Dashboard actor projection sample "
f"(env={getattr(self.env, 'id', None)}, dashboard_id={dash.get('id')}, "
f"raw_owners={raw_owners!r}, raw_owner_usernames={raw_owner_usernames!r}, "
f"raw_created_by={raw_created_by!r}, raw_changed_by={raw_changed_by!r}, "
f"raw_changed_by_name={raw_changed_by_name!r}, projected_owners={owners!r}, "
f"projected_created_by={projected_created_by!r}, projected_modified_by={projected_modified_by!r})"
)
app_logger.reflect(
"[REFLECT] Dashboard actor projection summary "
f"(env={getattr(self.env, 'id', None)}, dashboards={len(result)}, "
f"sampled={min(len(result), max_debug_samples)})"
)
return result
# [/DEF:SupersetClient.get_dashboards_summary:Function]
# [DEF:SupersetClient.get_dashboards_summary_page:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches one page of dashboard metadata optimized for the grid.
# @PRE: page >= 1 and page_size > 0.
# @POST: Returns mapped summaries and total dashboard count.
# @DATA_CONTRACT: Input[page: int, page_size: int] -> Output[Tuple[int, List[Dict]]]
# @RELATION: [CALLS] ->[SupersetClient.get_dashboards_page]
def get_dashboards_summary_page(
self,
page: int,
page_size: int,
search: Optional[str] = None,
require_slug: bool = False,
) -> Tuple[int, List[Dict]]:
with belief_scope("SupersetClient.get_dashboards_summary_page"):
query: Dict[str, Any] = {
"page": max(page - 1, 0),
"page_size": page_size,
}
filters: List[Dict[str, Any]] = []
if require_slug:
filters.append(
{
"col": "slug",
"opr": "neq",
"value": "",
}
)
normalized_search = (search or "").strip()
if normalized_search:
# Superset list API supports filter objects with `opr` operator.
# `ct` -> contains (ILIKE on most Superset backends).
filters.append(
{
"col": "dashboard_title",
"opr": "ct",
"value": normalized_search,
}
)
if filters:
query["filters"] = filters
total_count, dashboards = self.get_dashboards_page(query=query)
result = []
for dash in dashboards:
owners = self._extract_owner_labels(dash.get("owners"))
if not owners:
owners = self._extract_owner_labels(
[dash.get("created_by"), dash.get("changed_by")],
)
result.append(
{
"id": dash.get("id"),
"slug": dash.get("slug"),
"title": dash.get("dashboard_title"),
"url": dash.get("url"),
"last_modified": dash.get("changed_on_utc"),
"status": "published" if dash.get("published") else "draft",
"created_by": self._extract_user_display(
None,
dash.get("created_by"),
),
"modified_by": self._extract_user_display(
dash.get("changed_by_name"),
dash.get("changed_by"),
),
"owners": owners,
}
)
return total_count, result
# [/DEF:SupersetClient.get_dashboards_summary_page:Function]
# [DEF:SupersetClient._extract_owner_labels:Function]
# @COMPLEXITY: 1
# @PURPOSE: Normalize dashboard owners payload to stable display labels.
# @PRE: owners payload can be scalar, object or list.
# @POST: Returns deduplicated non-empty owner labels preserving order.
# @DATA_CONTRACT: Input[Any] -> Output[List[str]]
def _extract_owner_labels(self, owners_payload: Any) -> List[str]:
if owners_payload is None:
return []
owners_list: List[Any]
if isinstance(owners_payload, list):
owners_list = owners_payload
else:
owners_list = [owners_payload]
normalized: List[str] = []
for owner in owners_list:
label: Optional[str] = None
if isinstance(owner, dict):
label = self._extract_user_display(None, owner)
else:
label = self._sanitize_user_text(owner)
if label and label not in normalized:
normalized.append(label)
return normalized
# [/DEF:SupersetClient._extract_owner_labels:Function]
# [DEF:SupersetClient._extract_user_display:Function]
# @COMPLEXITY: 1
# @PURPOSE: Normalize user payload to a stable display name.
# @PRE: user payload can be string, dict or None.
# @POST: Returns compact non-empty display value or None.
# @DATA_CONTRACT: Input[Optional[str], Optional[Dict]] -> Output[Optional[str]]
def _extract_user_display(
self, preferred_value: Optional[str], user_payload: Optional[Dict]
) -> Optional[str]:
preferred = self._sanitize_user_text(preferred_value)
if preferred:
return preferred
if isinstance(user_payload, dict):
full_name = self._sanitize_user_text(user_payload.get("full_name"))
if full_name:
return full_name
first_name = self._sanitize_user_text(user_payload.get("first_name")) or ""
last_name = self._sanitize_user_text(user_payload.get("last_name")) or ""
combined = " ".join(
part for part in [first_name, last_name] if part
).strip()
if combined:
return combined
username = self._sanitize_user_text(user_payload.get("username"))
if username:
return username
email = self._sanitize_user_text(user_payload.get("email"))
if email:
return email
return None
# [/DEF:SupersetClient._extract_user_display:Function]
# [DEF:SupersetClient._sanitize_user_text:Function]
# @COMPLEXITY: 1
# @PURPOSE: Convert scalar value to non-empty user-facing text.
# @PRE: value can be any scalar type.
# @POST: Returns trimmed string or None.
def _sanitize_user_text(self, value: Optional[Union[str, int]]) -> Optional[str]:
if value is None:
return None
normalized = str(value).strip()
if not normalized:
return None
return normalized
# [/DEF:SupersetClient._sanitize_user_text:Function]
# [DEF:SupersetClient.get_dashboard:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches a single dashboard by ID or slug.
# @PRE: Client is authenticated and dashboard_ref exists.
# @POST: Returns dashboard payload from Superset API.
# @DATA_CONTRACT: Input[dashboard_ref: Union[int, str]] -> Output[Dict]
# @RELATION: [CALLS] ->[request]
def get_dashboard(self, dashboard_ref: Union[int, str]) -> Dict:
with belief_scope("SupersetClient.get_dashboard", f"ref={dashboard_ref}"):
response = self.network.request(
method="GET", endpoint=f"/dashboard/{dashboard_ref}"
)
return cast(Dict, response)
# [/DEF:SupersetClient.get_dashboard:Function]
# [DEF:SupersetClient.get_dashboard_permalink_state:Function]
# @COMPLEXITY: 2
# @PURPOSE: Fetches stored dashboard permalink state by permalink key.
# @PRE: Client is authenticated and permalink key exists.
# @POST: Returns dashboard permalink state payload from Superset API.
# @DATA_CONTRACT: Input[permalink_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[request]
def get_dashboard_permalink_state(self, permalink_key: str) -> Dict:
with belief_scope(
"SupersetClient.get_dashboard_permalink_state", f"key={permalink_key}"
):
response = self.network.request(
method="GET", endpoint=f"/dashboard/permalink/{permalink_key}"
)
return cast(Dict, response)
# [/DEF:SupersetClient.get_dashboard_permalink_state:Function]
# [DEF:SupersetClient.get_native_filter_state:Function]
# @COMPLEXITY: 2
# @PURPOSE: Fetches stored native filter state by filter state key.
# @PRE: Client is authenticated and filter_state_key exists.
# @POST: Returns native filter state payload from Superset API.
# @DATA_CONTRACT: Input[dashboard_id: Union[int, str], filter_state_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[request]
def get_native_filter_state(
self, dashboard_id: Union[int, str], filter_state_key: str
) -> Dict:
with belief_scope(
"SupersetClient.get_native_filter_state",
f"dashboard={dashboard_id}, key={filter_state_key}",
):
response = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id}/filter_state/{filter_state_key}",
)
return cast(Dict, response)
# [/DEF:SupersetClient.get_native_filter_state:Function]
# [DEF:SupersetClient.extract_native_filters_from_permalink:Function]
# @COMPLEXITY: 3
# @PURPOSE: Extract native filters dataMask from a permalink key.
# @PRE: Client is authenticated and permalink_key exists.
# @POST: Returns extracted dataMask with filter states.
# @DATA_CONTRACT: Input[permalink_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[SupersetClient.get_dashboard_permalink_state]
def extract_native_filters_from_permalink(self, permalink_key: str) -> Dict:
with belief_scope(
"SupersetClient.extract_native_filters_from_permalink",
f"key={permalink_key}",
):
permalink_response = self.get_dashboard_permalink_state(permalink_key)
# Permalink response structure: { "result": { "state": { "dataMask": {...}, ... } } }
# or directly: { "state": { "dataMask": {...}, ... } }
result = permalink_response.get("result", permalink_response)
state = result.get("state", result)
data_mask = state.get("dataMask", {})
extracted_filters = {}
for filter_id, filter_data in data_mask.items():
if not isinstance(filter_data, dict):
continue
extracted_filters[filter_id] = {
"extraFormData": filter_data.get("extraFormData", {}),
"filterState": filter_data.get("filterState", {}),
"ownState": filter_data.get("ownState", {}),
}
return {
"dataMask": extracted_filters,
"activeTabs": state.get("activeTabs", []),
"anchor": state.get("anchor"),
"chartStates": state.get("chartStates", {}),
"permalink_key": permalink_key,
}
# [/DEF:SupersetClient.extract_native_filters_from_permalink:Function]
# [DEF:SupersetClient.extract_native_filters_from_key:Function]
# @COMPLEXITY: 3
# @PURPOSE: Extract native filters from a native_filters_key URL parameter.
# @PRE: Client is authenticated, dashboard_id and filter_state_key exist.
# @POST: Returns extracted filter state with extraFormData.
# @DATA_CONTRACT: Input[dashboard_id: Union[int, str], filter_state_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[SupersetClient.get_native_filter_state]
def extract_native_filters_from_key(
self, dashboard_id: Union[int, str], filter_state_key: str
) -> Dict:
with belief_scope(
"SupersetClient.extract_native_filters_from_key",
f"dashboard={dashboard_id}, key={filter_state_key}",
):
filter_response = self.get_native_filter_state(
dashboard_id, filter_state_key
)
# Filter state response structure: { "result": { "value": "{...json...}" } }
# or: { "value": "{...json...}" }
result = filter_response.get("result", filter_response)
value = result.get("value")
if isinstance(value, str):
try:
parsed_value = json.loads(value)
except json.JSONDecodeError as e:
app_logger.warning(
"[extract_native_filters_from_key][Warning] Failed to parse filter state JSON: %s",
e,
)
parsed_value = {}
elif isinstance(value, dict):
parsed_value = value
else:
parsed_value = {}
# The parsed value contains filter state with structure:
# { "filter_id": { "id": "...", "extraFormData": {...}, "filterState": {...} } }
# or a single filter: { "id": "...", "extraFormData": {...}, "filterState": {...} }
extracted_filters = {}
if "id" in parsed_value and "extraFormData" in parsed_value:
# Single filter format
filter_id = parsed_value.get("id", filter_state_key)
extracted_filters[filter_id] = {
"extraFormData": parsed_value.get("extraFormData", {}),
"filterState": parsed_value.get("filterState", {}),
"ownState": parsed_value.get("ownState", {}),
}
else:
# Multiple filters format
for filter_id, filter_data in parsed_value.items():
if not isinstance(filter_data, dict):
continue
extracted_filters[filter_id] = {
"extraFormData": filter_data.get("extraFormData", {}),
"filterState": filter_data.get("filterState", {}),
"ownState": filter_data.get("ownState", {}),
}
return {
"dataMask": extracted_filters,
"dashboard_id": dashboard_id,
"filter_state_key": filter_state_key,
}
# [/DEF:SupersetClient.extract_native_filters_from_key:Function]
# [DEF:SupersetClient.parse_dashboard_url_for_filters:Function]
# @COMPLEXITY: 3
# @PURPOSE: Parse a Superset dashboard URL and extract native filter state if present.
# @PRE: url must be a valid Superset dashboard URL with optional permalink or native_filters_key.
# @POST: Returns extracted filter state or empty dict if no filters found.
# @DATA_CONTRACT: Input[url: str] -> Output[Dict]
# @RELATION: [CALLS] ->[SupersetClient.extract_native_filters_from_permalink]
# @RELATION: [CALLS] ->[SupersetClient.extract_native_filters_from_key]
def parse_dashboard_url_for_filters(self, url: str) -> Dict:
with belief_scope(
"SupersetClient.parse_dashboard_url_for_filters", f"url={url}"
):
import urllib.parse
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
path_parts = parsed_url.path.rstrip("/").split("/")
result = {
"url": url,
"dashboard_id": None,
"filter_type": None,
"filters": {},
}
# Check for permalink URL: /dashboard/p/{key}/ or /superset/dashboard/p/{key}/
if "p" in path_parts:
try:
p_index = path_parts.index("p")
if p_index + 1 < len(path_parts):
permalink_key = path_parts[p_index + 1]
filter_data = self.extract_native_filters_from_permalink(
permalink_key
)
result["filter_type"] = "permalink"
result["filters"] = filter_data
return result
except ValueError:
pass
# Check for native_filters_key in query params
native_filters_key = query_params.get("native_filters_key", [None])[0]
if native_filters_key:
# Extract dashboard ID or slug from URL path
dashboard_ref = None
if "dashboard" in path_parts:
try:
dash_index = path_parts.index("dashboard")
if dash_index + 1 < len(path_parts):
potential_id = path_parts[dash_index + 1]
# Skip if it's a reserved word
if potential_id not in ("p", "list", "new"):
dashboard_ref = potential_id
except ValueError:
pass
if dashboard_ref:
# Resolve slug to numeric ID — the filter_state API requires a numeric ID
resolved_id = None
try:
resolved_id = int(dashboard_ref)
except (ValueError, TypeError):
try:
dash_resp = self.get_dashboard(dashboard_ref)
dash_data = (
dash_resp.get("result", dash_resp)
if isinstance(dash_resp, dict)
else {}
)
raw_id = dash_data.get("id")
if raw_id is not None:
resolved_id = int(raw_id)
except Exception as e:
app_logger.warning(
"[parse_dashboard_url_for_filters][Warning] Failed to resolve dashboard slug '%s' to ID: %s",
dashboard_ref,
e,
)
if resolved_id is not None:
filter_data = self.extract_native_filters_from_key(
resolved_id, native_filters_key
)
result["filter_type"] = "native_filters_key"
result["dashboard_id"] = resolved_id
result["filters"] = filter_data
return result
else:
app_logger.warning(
"[parse_dashboard_url_for_filters][Warning] Could not resolve dashboard_id from URL for native_filters_key"
)
# Check for native_filters in query params (direct filter values)
native_filters = query_params.get("native_filters", [None])[0]
if native_filters:
try:
parsed_filters = json.loads(native_filters)
result["filter_type"] = "native_filters"
result["filters"] = {"dataMask": parsed_filters}
return result
except json.JSONDecodeError as e:
app_logger.warning(
"[parse_dashboard_url_for_filters][Warning] Failed to parse native_filters JSON: %s",
e,
)
return result
# [/DEF:SupersetClient.parse_dashboard_url_for_filters:Function]
# [DEF:SupersetClient.get_chart:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches a single chart by ID.
# @PRE: Client is authenticated and chart_id exists.
# @POST: Returns chart payload from Superset API.
# @DATA_CONTRACT: Input[chart_id: int] -> Output[Dict]
# @RELATION: [CALLS] ->[APIClient.request]
def get_chart(self, chart_id: int) -> Dict:
with belief_scope("SupersetClient.get_chart", f"id={chart_id}"):
response = self.network.request(method="GET", endpoint=f"/chart/{chart_id}")
return cast(Dict, response)
# [/DEF:SupersetClient.get_chart:Function]
# [DEF:SupersetClient.get_dashboard_detail:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches detailed dashboard information including related charts and datasets.
# @PRE: Client is authenticated and dashboard reference exists.
# @POST: Returns dashboard metadata with charts and datasets lists.
# @DATA_CONTRACT: Input[dashboard_ref: Union[int, str]] -> Output[Dict]
# @RELATION: [CALLS] ->[SupersetClient.get_dashboard]
# @RELATION: [CALLS] ->[SupersetClient.get_chart]
def get_dashboard_detail(self, dashboard_ref: Union[int, str]) -> Dict:
with belief_scope(
"SupersetClient.get_dashboard_detail", f"ref={dashboard_ref}"
):
dashboard_response = self.get_dashboard(dashboard_ref)
dashboard_data = dashboard_response.get("result", dashboard_response)
charts: List[Dict] = []
datasets: List[Dict] = []
# [DEF:extract_dataset_id_from_form_data:Function]
def extract_dataset_id_from_form_data(
form_data: Optional[Dict],
) -> Optional[int]:
if not isinstance(form_data, dict):
return None
datasource = form_data.get("datasource")
if isinstance(datasource, str):
matched = re.match(r"^(\d+)__", datasource)
if matched:
try:
return int(matched.group(1))
except ValueError:
return None
if isinstance(datasource, dict):
ds_id = datasource.get("id")
try:
return int(ds_id) if ds_id is not None else None
except (TypeError, ValueError):
return None
ds_id = form_data.get("datasource_id")
try:
return int(ds_id) if ds_id is not None else None
except (TypeError, ValueError):
return None
# [/DEF:extract_dataset_id_from_form_data:Function]
# Canonical endpoints from Superset OpenAPI:
# /dashboard/{id_or_slug}/charts and /dashboard/{id_or_slug}/datasets.
try:
charts_response = self.network.request(
method="GET", endpoint=f"/dashboard/{dashboard_ref}/charts"
)
charts_payload = (
charts_response.get("result", [])
if isinstance(charts_response, dict)
else []
)
for chart_obj in charts_payload:
if not isinstance(chart_obj, dict):
continue
chart_id = chart_obj.get("id")
if chart_id is None:
continue
form_data = chart_obj.get("form_data")
if isinstance(form_data, str):
try:
form_data = json.loads(form_data)
except Exception:
form_data = {}
dataset_id = extract_dataset_id_from_form_data(
form_data
) or chart_obj.get("datasource_id")
charts.append(
{
"id": int(chart_id),
"title": chart_obj.get("slice_name")
or chart_obj.get("name")
or f"Chart {chart_id}",
"viz_type": (
form_data.get("viz_type")
if isinstance(form_data, dict)
else None
),
"dataset_id": int(dataset_id)
if dataset_id is not None
else None,
"last_modified": chart_obj.get("changed_on"),
"overview": chart_obj.get("description")
or (
form_data.get("viz_type")
if isinstance(form_data, dict)
else None
)
or "Chart",
}
)
except Exception as e:
app_logger.warning(
"[get_dashboard_detail][Warning] Failed to fetch dashboard charts: %s",
e,
)
try:
datasets_response = self.network.request(
method="GET", endpoint=f"/dashboard/{dashboard_ref}/datasets"
)
datasets_payload = (
datasets_response.get("result", [])
if isinstance(datasets_response, dict)
else []
)
for dataset_obj in datasets_payload:
if not isinstance(dataset_obj, dict):
continue
dataset_id = dataset_obj.get("id")
if dataset_id is None:
continue
db_payload = dataset_obj.get("database")
db_name = (
db_payload.get("database_name")
if isinstance(db_payload, dict)
else None
)
table_name = (
dataset_obj.get("table_name")
or dataset_obj.get("datasource_name")
or dataset_obj.get("name")
or f"Dataset {dataset_id}"
)
schema = dataset_obj.get("schema")
fq_name = f"{schema}.{table_name}" if schema else table_name
datasets.append(
{
"id": int(dataset_id),
"table_name": table_name,
"schema": schema,
"database": db_name
or dataset_obj.get("database_name")
or "Unknown",
"last_modified": dataset_obj.get("changed_on"),
"overview": fq_name,
}
)
except Exception as e:
app_logger.warning(
"[get_dashboard_detail][Warning] Failed to fetch dashboard datasets: %s",
e,
)
# Fallback: derive chart IDs from layout metadata if dashboard charts endpoint fails.
if not charts:
raw_position_json = dashboard_data.get("position_json")
chart_ids_from_position = set()
if isinstance(raw_position_json, str) and raw_position_json:
try:
parsed_position = json.loads(raw_position_json)
chart_ids_from_position.update(
self._extract_chart_ids_from_layout(parsed_position)
)
except Exception:
pass
elif isinstance(raw_position_json, dict):
chart_ids_from_position.update(
self._extract_chart_ids_from_layout(raw_position_json)
)
raw_json_metadata = dashboard_data.get("json_metadata")
if isinstance(raw_json_metadata, str) and raw_json_metadata:
try:
parsed_metadata = json.loads(raw_json_metadata)
chart_ids_from_position.update(
self._extract_chart_ids_from_layout(parsed_metadata)
)
except Exception:
pass
elif isinstance(raw_json_metadata, dict):
chart_ids_from_position.update(
self._extract_chart_ids_from_layout(raw_json_metadata)
)
app_logger.info(
"[get_dashboard_detail][State] Extracted %s fallback chart IDs from layout (dashboard_id=%s)",
len(chart_ids_from_position),
dashboard_ref,
)
for chart_id in sorted(chart_ids_from_position):
try:
chart_response = self.get_chart(int(chart_id))
chart_data = chart_response.get("result", chart_response)
charts.append(
{
"id": int(chart_id),
"title": chart_data.get("slice_name")
or chart_data.get("name")
or f"Chart {chart_id}",
"viz_type": chart_data.get("viz_type"),
"dataset_id": chart_data.get("datasource_id"),
"last_modified": chart_data.get("changed_on"),
"overview": chart_data.get("description")
or chart_data.get("viz_type")
or "Chart",
}
)
except Exception as e:
app_logger.warning(
"[get_dashboard_detail][Warning] Failed to resolve fallback chart %s: %s",
chart_id,
e,
)
# Backfill datasets from chart datasource IDs.
dataset_ids_from_charts = {
c.get("dataset_id") for c in charts if c.get("dataset_id") is not None
}
known_dataset_ids = {
d.get("id") for d in datasets if d.get("id") is not None
}
missing_dataset_ids: List[int] = []
for raw_dataset_id in dataset_ids_from_charts:
if raw_dataset_id is None or raw_dataset_id in known_dataset_ids:
continue
try:
missing_dataset_ids.append(int(raw_dataset_id))
except (TypeError, ValueError):
continue
for dataset_id in missing_dataset_ids:
try:
dataset_response = self.get_dataset(int(dataset_id))
dataset_data = dataset_response.get("result", dataset_response)
db_payload = dataset_data.get("database")
db_name = (
db_payload.get("database_name")
if isinstance(db_payload, dict)
else None
)
table_name = (
dataset_data.get("table_name") or f"Dataset {dataset_id}"
)
schema = dataset_data.get("schema")
fq_name = f"{schema}.{table_name}" if schema else table_name
datasets.append(
{
"id": int(dataset_id),
"table_name": table_name,
"schema": schema,
"database": db_name or "Unknown",
"last_modified": dataset_data.get("changed_on_utc")
or dataset_data.get("changed_on"),
"overview": fq_name,
}
)
except Exception as e:
app_logger.warning(
"[get_dashboard_detail][Warning] Failed to resolve dataset %s: %s",
dataset_id,
e,
)
unique_charts = {}
for chart in charts:
unique_charts[chart["id"]] = chart
unique_datasets = {}
for dataset in datasets:
unique_datasets[dataset["id"]] = dataset
resolved_dashboard_id = dashboard_data.get("id", dashboard_ref)
return {
"id": resolved_dashboard_id,
"title": dashboard_data.get("dashboard_title")
or dashboard_data.get("title")
or f"Dashboard {resolved_dashboard_id}",
"slug": dashboard_data.get("slug"),
"url": dashboard_data.get("url"),
"description": dashboard_data.get("description") or "",
"last_modified": dashboard_data.get("changed_on_utc")
or dashboard_data.get("changed_on"),
"published": dashboard_data.get("published"),
"charts": list(unique_charts.values()),
"datasets": list(unique_datasets.values()),
"chart_count": len(unique_charts),
"dataset_count": len(unique_datasets),
}
# [/DEF:SupersetClient.get_dashboard_detail:Function]
# [DEF:SupersetClient.get_charts:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches all charts with pagination support.
# @PRE: Client is authenticated.
# @POST: Returns total count and charts list.
# @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]]
# @RELATION: [CALLS] ->[SupersetClient._fetch_all_pages]
def get_charts(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_charts"):
validated_query = self._validate_query_params(query or {})
if "columns" not in validated_query:
validated_query["columns"] = ["id", "uuid", "slice_name", "viz_type"]
paginated_data = self._fetch_all_pages(
endpoint="/chart/",
pagination_options={
"base_query": validated_query,
"results_field": "result",
},
)
return len(paginated_data), paginated_data
# [/DEF:SupersetClient.get_charts:Function]
# [DEF:SupersetClient._extract_chart_ids_from_layout:Function]
# @COMPLEXITY: 1
# @PURPOSE: Traverses dashboard layout metadata and extracts chart IDs from common keys.
# @PRE: payload can be dict/list/scalar.
# @POST: Returns a set of chart IDs found in nested structures.
def _extract_chart_ids_from_layout(
self, payload: Union[Dict, List, str, int, None]
) -> set:
with belief_scope("_extract_chart_ids_from_layout"):
found = set()
def walk(node):
if isinstance(node, dict):
for key, value in node.items():
if key in ("chartId", "chart_id", "slice_id", "sliceId"):
try:
found.add(int(value))
except (TypeError, ValueError):
pass
if key == "id" and isinstance(value, str):
match = re.match(r"^CHART-(\d+)$", value)
if match:
try:
found.add(int(match.group(1)))
except ValueError:
pass
walk(value)
elif isinstance(node, list):
for item in node:
walk(item)
walk(payload)
return found
# [/DEF:SupersetClient._extract_chart_ids_from_layout:Function]
# [DEF:export_dashboard:Function]
# @COMPLEXITY: 3
# @PURPOSE: Экспортирует дашборд в виде ZIP-архива.
# @PRE: dashboard_id must exist in Superset.
# @POST: Returns ZIP content and filename.
# @DATA_CONTRACT: Input[dashboard_id: int] -> Output[Tuple[bytes, str]]
# @SIDE_EFFECT: Performs network I/O to download archive.
# @RELATION: [CALLS] ->[request]
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
with belief_scope("export_dashboard"):
app_logger.info(
"[export_dashboard][Enter] Exporting dashboard %s.", dashboard_id
)
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True,
raw_response=True,
)
response = cast(Response, response)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
app_logger.info(
"[export_dashboard][Exit] Exported dashboard %s to %s.",
dashboard_id,
filename,
)
return response.content, filename
# [/DEF:export_dashboard:Function]
# [DEF:import_dashboard:Function]
# @COMPLEXITY: 3
# @PURPOSE: Импортирует дашборд из ZIP-файла.
# @PRE: file_name must be a valid ZIP dashboard export.
# @POST: Dashboard is imported or re-imported after deletion.
# @DATA_CONTRACT: Input[file_name: Union[str, Path]] -> Output[Dict]
# @SIDE_EFFECT: Performs network I/O to upload archive.
# @RELATION: [CALLS] ->[SupersetClient._do_import]
# @RELATION: [CALLS] ->[delete_dashboard]
def import_dashboard(
self,
file_name: Union[str, Path],
dash_id: Optional[int] = None,
dash_slug: Optional[str] = None,
) -> Dict:
with belief_scope("import_dashboard"):
if file_name is None:
raise ValueError("file_name cannot be None")
file_path = str(file_name)
self._validate_import_file(file_path)
try:
return self._do_import(file_path)
except Exception as exc:
app_logger.error(
"[import_dashboard][Failure] First import attempt failed: %s",
exc,
exc_info=True,
)
if not self.delete_before_reimport:
raise
target_id = self._resolve_target_id_for_delete(dash_id, dash_slug)
if target_id is None:
app_logger.error(
"[import_dashboard][Failure] No ID available for delete-retry."
)
raise
self.delete_dashboard(target_id)
app_logger.info(
"[import_dashboard][State] Deleted dashboard ID %s, retrying import.",
target_id,
)
return self._do_import(file_path)
# [/DEF:import_dashboard:Function]
# [DEF:delete_dashboard:Function]
# @COMPLEXITY: 3
# @PURPOSE: Удаляет дашборд по его ID или slug.
# @PRE: dashboard_id must exist.
# @POST: Dashboard is removed from Superset.
# @SIDE_EFFECT: Deletes resource from upstream Superset environment.
# @RELATION: [CALLS] ->[request]
def delete_dashboard(self, dashboard_id: Union[int, str]) -> None:
with belief_scope("delete_dashboard"):
app_logger.info(
"[delete_dashboard][Enter] Deleting dashboard %s.", dashboard_id
)
response = self.network.request(
method="DELETE", endpoint=f"/dashboard/{dashboard_id}"
)
response = cast(Dict, response)
if response.get("result", True) is not False:
app_logger.info(
"[delete_dashboard][Success] Dashboard %s deleted.", dashboard_id
)
else:
app_logger.warning(
"[delete_dashboard][Warning] Unexpected response while deleting %s: %s",
dashboard_id,
response,
)
# [/DEF:delete_dashboard:Function]
# [DEF:SupersetClient.get_datasets:Function]
# @COMPLEXITY: 3
# @PURPOSE: Получает полный список датасетов, автоматически обрабатывая пагинацию.
# @PRE: Client is authenticated.
# @POST: Returns total count and list of datasets.
# @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]]
# @RELATION: [CALLS] ->[_fetch_all_pages]
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_datasets"):
app_logger.info("[get_datasets][Enter] Fetching datasets.")
validated_query = self._validate_query_params(query)
paginated_data = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={
"base_query": validated_query,
"results_field": "result",
},
)
total_count = len(paginated_data)
app_logger.info("[get_datasets][Exit] Found %d datasets.", total_count)
return total_count, paginated_data
# [/DEF:SupersetClient.get_datasets:Function]
# [DEF:SupersetClient.get_datasets_summary:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches dataset metadata optimized for the Dataset Hub grid.
# @PRE: Client is authenticated.
# @POST: Returns a list of dataset metadata summaries.
# @RETURN: List[Dict]
# @RELATION: [CALLS] ->[SupersetClient.get_datasets]
def get_datasets_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_datasets_summary"):
query = {"columns": ["id", "table_name", "schema", "database"]}
_, datasets = self.get_datasets(query=query)
# Map fields to match the contracts
result = []
for ds in datasets:
result.append(
{
"id": ds.get("id"),
"table_name": ds.get("table_name"),
"schema": ds.get("schema"),
"database": ds.get("database", {}).get(
"database_name", "Unknown"
),
}
)
return result
# [/DEF:SupersetClient.get_datasets_summary:Function]
# [DEF:get_dataset_detail:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches detailed dataset information including columns and linked dashboards
# @PRE: Client is authenticated and dataset_id exists.
# @POST: Returns detailed dataset info with columns and linked dashboards.
# @PARAM: dataset_id (int) - The dataset ID to fetch details for.
# @RETURN: Dict - Dataset details with columns and linked_dashboards.
# @RELATION: [CALLS] ->[SupersetClient.get_dataset]
# @RELATION: [CALLS] ->[request]
def get_dataset_detail(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset_detail", f"id={dataset_id}"):
def as_bool(value, default=False):
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in ("1", "true", "yes", "y", "on")
return bool(value)
# Get base dataset info
response = self.get_dataset(dataset_id)
# If the response is a dict and has a 'result' key, use that (standard Superset API)
if isinstance(response, dict) and "result" in response:
dataset = response["result"]
else:
dataset = response
# Extract columns information
columns = dataset.get("columns", [])
column_info = []
for col in columns:
col_id = col.get("id")
if col_id is None:
continue
column_info.append(
{
"id": int(col_id),
"name": col.get("column_name"),
"type": col.get("type"),
"is_dttm": as_bool(col.get("is_dttm"), default=False),
"is_active": as_bool(col.get("is_active"), default=True),
"description": col.get("description", ""),
}
)
# Get linked dashboards using related_objects endpoint
linked_dashboards = []
try:
related_objects = self.network.request(
method="GET", endpoint=f"/dataset/{dataset_id}/related_objects"
)
# Handle different response formats
if isinstance(related_objects, dict):
if "dashboards" in related_objects:
dashboards_data = related_objects["dashboards"]
elif "result" in related_objects and isinstance(
related_objects["result"], dict
):
dashboards_data = related_objects["result"].get(
"dashboards", []
)
else:
dashboards_data = []
for dash in dashboards_data:
if isinstance(dash, dict):
dash_id = dash.get("id")
if dash_id is None:
continue
linked_dashboards.append(
{
"id": int(dash_id),
"title": dash.get("dashboard_title")
or dash.get("title", f"Dashboard {dash_id}"),
"slug": dash.get("slug"),
}
)
else:
try:
dash_id = int(dash)
except (TypeError, ValueError):
continue
linked_dashboards.append(
{
"id": dash_id,
"title": f"Dashboard {dash_id}",
"slug": None,
}
)
except Exception as e:
app_logger.warning(
f"[get_dataset_detail][Warning] Failed to fetch related dashboards: {e}"
)
linked_dashboards = []
# Extract SQL table information
sql = dataset.get("sql", "")
result = {
"id": dataset.get("id"),
"table_name": dataset.get("table_name"),
"schema": dataset.get("schema"),
"database": (
dataset.get("database", {}).get("database_name", "Unknown")
if isinstance(dataset.get("database"), dict)
else dataset.get("database_name") or "Unknown"
),
"description": dataset.get("description", ""),
"columns": column_info,
"column_count": len(column_info),
"sql": sql,
"linked_dashboards": linked_dashboards,
"linked_dashboard_count": len(linked_dashboards),
"is_sqllab_view": as_bool(dataset.get("is_sqllab_view"), default=False),
"created_on": dataset.get("created_on"),
"changed_on": dataset.get("changed_on"),
}
app_logger.info(
f"[get_dataset_detail][Exit] Got dataset {dataset_id} with {len(column_info)} columns and {len(linked_dashboards)} linked dashboards"
)
return result
# [/DEF:get_dataset_detail:Function]
# [DEF:SupersetClient.get_dataset:Function]
# @COMPLEXITY: 3
# @PURPOSE: Получает информацию о конкретном датасете по его ID.
# @PRE: dataset_id must exist.
# @POST: Returns dataset details.
# @DATA_CONTRACT: Input[dataset_id: int] -> Output[Dict]
# @RELATION: [CALLS] ->[request]
def get_dataset(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset", f"id={dataset_id}"):
app_logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id)
response = self.network.request(
method="GET", endpoint=f"/dataset/{dataset_id}"
)
response = cast(Dict, response)
app_logger.info("[get_dataset][Exit] Got dataset %s.", dataset_id)
return response
# [/DEF:SupersetClient.get_dataset:Function]
# [DEF:SupersetClient.compile_dataset_preview:Function]
# @COMPLEXITY: 4
# @PURPOSE: Compile dataset preview SQL through the strongest supported Superset preview endpoint family and return normalized SQL output.
# @PRE: dataset_id must be valid and template_params/effective_filters must represent the current preview session inputs.
# @POST: Returns normalized compiled SQL plus raw upstream response, preferring legacy form_data transport with explicit fallback to chart-data.
# @DATA_CONTRACT: Input[dataset_id:int, template_params:Dict, effective_filters:List[Dict]] -> Output[Dict[str, Any]]
# @RELATION: [CALLS] ->[SupersetClient.get_dataset]
# @RELATION: [CALLS] ->[SupersetClient.build_dataset_preview_query_context]
# @RELATION: [CALLS] ->[SupersetClient.build_dataset_preview_legacy_form_data]
# @RELATION: [CALLS] ->[request]
# @RELATION: [CALLS] ->[_extract_compiled_sql_from_preview_response]
# @SIDE_EFFECT: Performs upstream dataset lookup and preview network I/O against Superset.
def compile_dataset_preview(
self,
dataset_id: int,
template_params: Optional[Dict[str, Any]] = None,
effective_filters: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
with belief_scope("SupersetClient.compile_dataset_preview", f"id={dataset_id}"):
dataset_response = self.get_dataset(dataset_id)
dataset_record = (
dataset_response.get("result", dataset_response)
if isinstance(dataset_response, dict)
else {}
)
query_context = self.build_dataset_preview_query_context(
dataset_id=dataset_id,
dataset_record=dataset_record,
template_params=template_params or {},
effective_filters=effective_filters or [],
)
legacy_form_data = self.build_dataset_preview_legacy_form_data(
dataset_id=dataset_id,
dataset_record=dataset_record,
template_params=template_params or {},
effective_filters=effective_filters or [],
)
legacy_form_data_payload = json.dumps(
legacy_form_data, sort_keys=True, default=str
)
request_payload = json.dumps(query_context)
strategy_attempts: List[Dict[str, Any]] = []
strategy_candidates: List[Dict[str, Any]] = [
{
"endpoint_kind": "legacy_explore_form_data",
"endpoint": "/explore_json/form_data",
"request_transport": "query_param_form_data",
"params": {"form_data": legacy_form_data_payload},
},
{
"endpoint_kind": "legacy_data_form_data",
"endpoint": "/data",
"request_transport": "query_param_form_data",
"params": {"form_data": legacy_form_data_payload},
},
{
"endpoint_kind": "v1_chart_data",
"endpoint": "/chart/data",
"request_transport": "json_body",
"data": request_payload,
"headers": {"Content-Type": "application/json"},
},
]
for candidate in strategy_candidates:
endpoint_kind = candidate["endpoint_kind"]
endpoint_path = candidate["endpoint"]
request_transport = candidate["request_transport"]
request_params = deepcopy(candidate.get("params") or {})
request_body = candidate.get("data")
request_headers = deepcopy(candidate.get("headers") or {})
request_param_keys = sorted(request_params.keys())
request_payload_keys: List[str] = []
if isinstance(request_body, str):
try:
decoded_request_body = json.loads(request_body)
if isinstance(decoded_request_body, dict):
request_payload_keys = sorted(decoded_request_body.keys())
except json.JSONDecodeError:
request_payload_keys = []
elif isinstance(request_body, dict):
request_payload_keys = sorted(request_body.keys())
strategy_diagnostics = {
"endpoint": endpoint_path,
"endpoint_kind": endpoint_kind,
"request_transport": request_transport,
"contains_root_datasource": endpoint_kind == "v1_chart_data"
and "datasource" in query_context,
"contains_form_datasource": endpoint_kind.startswith("legacy_")
and "datasource" in legacy_form_data,
"contains_query_object_datasource": bool(
query_context.get("queries")
)
and isinstance(query_context["queries"][0], dict)
and "datasource" in query_context["queries"][0],
"request_param_keys": request_param_keys,
"request_payload_keys": request_payload_keys,
}
app_logger.reason(
"Attempting Superset dataset preview compilation strategy",
extra={
"dataset_id": dataset_id,
**strategy_diagnostics,
"request_params": request_params,
"request_payload": request_body,
"legacy_form_data": legacy_form_data
if endpoint_kind.startswith("legacy_")
else None,
"query_context": query_context
if endpoint_kind == "v1_chart_data"
else None,
"template_param_count": len(template_params or {}),
"filter_count": len(effective_filters or []),
},
)
try:
response = self.network.request(
method="POST",
endpoint=endpoint_path,
params=request_params or None,
data=request_body,
headers=request_headers or None,
)
normalized = self._extract_compiled_sql_from_preview_response(
response
)
normalized["query_context"] = query_context
normalized["legacy_form_data"] = legacy_form_data
normalized["endpoint"] = endpoint_path
normalized["endpoint_kind"] = endpoint_kind
normalized["dataset_id"] = dataset_id
normalized["strategy_attempts"] = strategy_attempts + [
{
**strategy_diagnostics,
"success": True,
}
]
app_logger.reflect(
"Dataset preview compilation returned normalized SQL payload",
extra={
"dataset_id": dataset_id,
**strategy_diagnostics,
"success": True,
"compiled_sql_length": len(
str(normalized.get("compiled_sql") or "")
),
"response_diagnostics": normalized.get(
"response_diagnostics"
),
},
)
return normalized
except Exception as exc:
failure_diagnostics = {
**strategy_diagnostics,
"success": False,
"error": str(exc),
}
strategy_attempts.append(failure_diagnostics)
app_logger.explore(
"Superset dataset preview compilation strategy failed",
extra={
"dataset_id": dataset_id,
**failure_diagnostics,
"request_params": request_params,
"request_payload": request_body,
},
)
raise SupersetAPIError(
"Superset preview compilation failed for all known strategies "
f"(attempts={strategy_attempts!r})"
)
# [/DEF:SupersetClient.compile_dataset_preview:Function]
# [DEF:SupersetClient.build_dataset_preview_legacy_form_data:Function]
# @COMPLEXITY: 4
# @PURPOSE: Build browser-style legacy form_data payload for Superset preview endpoints inferred from observed deployment traffic.
# @PRE: dataset_record should come from Superset dataset detail when possible.
# @POST: Returns one serialized-ready form_data structure preserving native filter clauses in legacy transport fields.
# @DATA_CONTRACT: Input[dataset_id:int,dataset_record:Dict,template_params:Dict,effective_filters:List[Dict]] -> Output[Dict[str, Any]]
# @RELATION: [CALLS] ->[SupersetClient.build_dataset_preview_query_context]
# @SIDE_EFFECT: Emits reasoning diagnostics describing the inferred legacy payload shape.
def build_dataset_preview_legacy_form_data(
self,
dataset_id: int,
dataset_record: Dict[str, Any],
template_params: Dict[str, Any],
effective_filters: List[Dict[str, Any]],
) -> Dict[str, Any]:
with belief_scope(
"SupersetClient.build_dataset_preview_legacy_form_data", f"id={dataset_id}"
):
query_context = self.build_dataset_preview_query_context(
dataset_id=dataset_id,
dataset_record=dataset_record,
template_params=template_params,
effective_filters=effective_filters,
)
query_object = deepcopy(
query_context.get("queries", [{}])[0]
if query_context.get("queries")
else {}
)
legacy_form_data = deepcopy(query_context.get("form_data", {}))
legacy_form_data.pop("datasource", None)
legacy_form_data["metrics"] = deepcopy(
query_object.get("metrics", ["count"])
)
legacy_form_data["columns"] = deepcopy(query_object.get("columns", []))
legacy_form_data["orderby"] = deepcopy(query_object.get("orderby", []))
legacy_form_data["annotation_layers"] = deepcopy(
query_object.get("annotation_layers", [])
)
legacy_form_data["row_limit"] = query_object.get("row_limit", 1000)
legacy_form_data["series_limit"] = query_object.get("series_limit", 0)
legacy_form_data["url_params"] = deepcopy(
query_object.get("url_params", template_params)
)
legacy_form_data["applied_time_extras"] = deepcopy(
query_object.get("applied_time_extras", {})
)
legacy_form_data["result_format"] = query_context.get(
"result_format", "json"
)
legacy_form_data["result_type"] = query_context.get("result_type", "query")
legacy_form_data["force"] = bool(query_context.get("force", True))
extras = query_object.get("extras")
if isinstance(extras, dict):
legacy_form_data["extras"] = deepcopy(extras)
time_range = query_object.get("time_range")
if time_range:
legacy_form_data["time_range"] = time_range
app_logger.reflect(
"Built Superset legacy preview form_data payload from browser-observed request shape",
extra={
"dataset_id": dataset_id,
"legacy_endpoint_inference": "POST /explore_json/form_data?form_data=... primary, POST /data?form_data=... fallback, based on observed browser traffic",
"contains_form_datasource": "datasource" in legacy_form_data,
"legacy_form_data_keys": sorted(legacy_form_data.keys()),
"legacy_extra_filters": legacy_form_data.get("extra_filters", []),
"legacy_extra_form_data": legacy_form_data.get(
"extra_form_data", {}
),
},
)
return legacy_form_data
# [/DEF:SupersetClient.build_dataset_preview_legacy_form_data:Function]
# [DEF:SupersetClient.build_dataset_preview_query_context:Function]
# @COMPLEXITY: 4
# @PURPOSE: Build a reduced-scope chart-data query context for deterministic dataset preview compilation.
# @PRE: dataset_record should come from Superset dataset detail when possible.
# @POST: Returns an explicit chart-data payload based on current session inputs and dataset metadata.
# @DATA_CONTRACT: Input[dataset_id:int,dataset_record:Dict,template_params:Dict,effective_filters:List[Dict]] -> Output[Dict[str, Any]]
# @RELATION: [CALLS] ->[_normalize_effective_filters_for_query_context]
# @SIDE_EFFECT: Emits reasoning and reflection logs for deterministic preview payload construction.
def build_dataset_preview_query_context(
self,
dataset_id: int,
dataset_record: Dict[str, Any],
template_params: Dict[str, Any],
effective_filters: List[Dict[str, Any]],
) -> Dict[str, Any]:
with belief_scope(
"SupersetClient.build_dataset_preview_query_context", f"id={dataset_id}"
):
normalized_template_params = deepcopy(template_params or {})
normalized_filter_payload = (
self._normalize_effective_filters_for_query_context(
effective_filters or []
)
)
normalized_filters = normalized_filter_payload["filters"]
normalized_extra_form_data = normalized_filter_payload["extra_form_data"]
datasource_payload: Dict[str, Any] = {
"id": dataset_id,
"type": "table",
}
datasource = dataset_record.get("datasource")
if isinstance(datasource, dict):
datasource_id = datasource.get("id")
datasource_type = datasource.get("type")
if datasource_id is not None:
datasource_payload["id"] = datasource_id
if datasource_type:
datasource_payload["type"] = datasource_type
serialized_dataset_template_params = dataset_record.get("template_params")
if (
isinstance(serialized_dataset_template_params, str)
and serialized_dataset_template_params.strip()
):
try:
parsed_dataset_template_params = json.loads(
serialized_dataset_template_params
)
if isinstance(parsed_dataset_template_params, dict):
for key, value in parsed_dataset_template_params.items():
normalized_template_params.setdefault(str(key), value)
except json.JSONDecodeError:
app_logger.explore(
"Dataset template_params could not be parsed while building preview query context",
extra={"dataset_id": dataset_id},
)
extra_form_data: Dict[str, Any] = deepcopy(normalized_extra_form_data)
if normalized_filters:
extra_form_data["filters"] = deepcopy(normalized_filters)
query_object: Dict[str, Any] = {
"filters": normalized_filters,
"extras": {"where": ""},
"columns": [],
"metrics": ["count"],
"orderby": [],
"annotation_layers": [],
"row_limit": 1000,
"series_limit": 0,
"url_params": normalized_template_params,
"applied_time_extras": {},
"result_type": "query",
}
schema = dataset_record.get("schema")
if schema:
query_object["schema"] = schema
time_range = extra_form_data.get("time_range") or dataset_record.get(
"default_time_range"
)
if time_range:
query_object["time_range"] = time_range
extra_form_data["time_range"] = time_range
result_format = dataset_record.get("result_format") or "json"
result_type = "query"
form_data: Dict[str, Any] = {
"datasource": f"{datasource_payload['id']}__{datasource_payload['type']}",
"datasource_id": datasource_payload["id"],
"datasource_type": datasource_payload["type"],
"viz_type": "table",
"slice_id": None,
"query_mode": "raw",
"url_params": normalized_template_params,
"extra_filters": deepcopy(normalized_filters),
"adhoc_filters": [],
}
if extra_form_data:
form_data["extra_form_data"] = extra_form_data
payload = {
"datasource": datasource_payload,
"queries": [query_object],
"form_data": form_data,
"result_format": result_format,
"result_type": result_type,
"force": True,
}
app_logger.reflect(
"Built Superset dataset preview query context",
extra={
"dataset_id": dataset_id,
"datasource": datasource_payload,
"normalized_effective_filters": normalized_filters,
"normalized_filter_diagnostics": normalized_filter_payload[
"diagnostics"
],
"result_type": result_type,
"result_format": result_format,
},
)
return payload
# [/DEF:SupersetClient.build_dataset_preview_query_context:Function]
# [DEF:_normalize_effective_filters_for_query_context:Function]
# @COMPLEXITY: 3
# @PURPOSE: Convert execution mappings into Superset chart-data filter objects.
# @PRE: effective_filters may contain mapping metadata and arbitrary scalar/list values.
# @POST: Returns only valid filter dictionaries suitable for the chart-data query payload.
# @RELATION: [DEPENDS_ON] ->[FilterStateModels]
def _normalize_effective_filters_for_query_context(
self,
effective_filters: List[Dict[str, Any]],
) -> Dict[str, Any]:
with belief_scope(
"SupersetClient._normalize_effective_filters_for_query_context"
):
normalized_filters: List[Dict[str, Any]] = []
merged_extra_form_data: Dict[str, Any] = {}
diagnostics: List[Dict[str, Any]] = []
for item in effective_filters:
if not isinstance(item, dict):
continue
display_name = str(
item.get("display_name")
or item.get("filter_name")
or item.get("variable_name")
or "unresolved_filter"
).strip()
value = item.get("effective_value")
normalized_payload = item.get("normalized_filter_payload")
preserved_clauses: List[Dict[str, Any]] = []
preserved_extra_form_data: Dict[str, Any] = {}
used_preserved_clauses = False
if isinstance(normalized_payload, dict):
raw_clauses = normalized_payload.get("filter_clauses")
if isinstance(raw_clauses, list):
preserved_clauses = [
deepcopy(clause)
for clause in raw_clauses
if isinstance(clause, dict)
]
raw_extra_form_data = normalized_payload.get("extra_form_data")
if isinstance(raw_extra_form_data, dict):
preserved_extra_form_data = deepcopy(raw_extra_form_data)
if isinstance(preserved_extra_form_data, dict):
for key, extra_value in preserved_extra_form_data.items():
if key == "filters":
continue
merged_extra_form_data[key] = deepcopy(extra_value)
outgoing_clauses: List[Dict[str, Any]] = []
if preserved_clauses:
for clause in preserved_clauses:
clause_copy = deepcopy(clause)
if "val" not in clause_copy and value is not None:
clause_copy["val"] = deepcopy(value)
outgoing_clauses.append(clause_copy)
used_preserved_clauses = True
elif preserved_extra_form_data:
outgoing_clauses = []
else:
column = str(
item.get("variable_name") or item.get("filter_name") or ""
).strip()
if column and value is not None:
operator = "IN" if isinstance(value, list) else "=="
outgoing_clauses.append(
{
"col": column,
"op": operator,
"val": value,
}
)
normalized_filters.extend(outgoing_clauses)
diagnostics.append(
{
"filter_name": display_name,
"value_origin": (
normalized_payload.get("value_origin")
if isinstance(normalized_payload, dict)
else None
),
"used_preserved_clauses": used_preserved_clauses,
"outgoing_clauses": deepcopy(outgoing_clauses),
}
)
app_logger.reason(
"Normalized effective preview filter for Superset query context",
extra={
"filter_name": display_name,
"used_preserved_clauses": used_preserved_clauses,
"outgoing_clauses": outgoing_clauses,
"value_origin": (
normalized_payload.get("value_origin")
if isinstance(normalized_payload, dict)
else "heuristic_reconstruction"
),
},
)
return {
"filters": normalized_filters,
"extra_form_data": merged_extra_form_data,
"diagnostics": diagnostics,
}
# [/DEF:_normalize_effective_filters_for_query_context:Function]
# [DEF:_extract_compiled_sql_from_preview_response:Function]
# @COMPLEXITY: 3
# @PURPOSE: Normalize compiled SQL from either chart-data or legacy form_data preview responses.
# @PRE: response must be the decoded preview response body from a supported Superset endpoint.
# @POST: Returns compiled SQL and raw response or raises SupersetAPIError when the endpoint does not expose query text.
# @RELATION: [DEPENDS_ON] ->[SupersetAPIError]
def _extract_compiled_sql_from_preview_response(
self, response: Any
) -> Dict[str, Any]:
with belief_scope("SupersetClient._extract_compiled_sql_from_preview_response"):
if not isinstance(response, dict):
raise SupersetAPIError(
"Superset preview response was not a JSON object"
)
response_diagnostics: List[Dict[str, Any]] = []
result_payload = response.get("result")
if isinstance(result_payload, list):
for index, item in enumerate(result_payload):
if not isinstance(item, dict):
continue
compiled_sql = str(
item.get("query")
or item.get("sql")
or item.get("compiled_sql")
or ""
).strip()
response_diagnostics.append(
{
"index": index,
"status": item.get("status"),
"applied_filters": item.get("applied_filters"),
"rejected_filters": item.get("rejected_filters"),
"has_query": bool(compiled_sql),
"source": "result_list",
}
)
if compiled_sql:
return {
"compiled_sql": compiled_sql,
"raw_response": response,
"response_diagnostics": response_diagnostics,
}
top_level_candidates: List[Tuple[str, Any]] = [
("query", response.get("query")),
("sql", response.get("sql")),
("compiled_sql", response.get("compiled_sql")),
]
if isinstance(result_payload, dict):
top_level_candidates.extend(
[
("result.query", result_payload.get("query")),
("result.sql", result_payload.get("sql")),
("result.compiled_sql", result_payload.get("compiled_sql")),
]
)
for source, candidate in top_level_candidates:
compiled_sql = str(candidate or "").strip()
response_diagnostics.append(
{
"source": source,
"has_query": bool(compiled_sql),
}
)
if compiled_sql:
return {
"compiled_sql": compiled_sql,
"raw_response": response,
"response_diagnostics": response_diagnostics,
}
raise SupersetAPIError(
"Superset preview response did not expose compiled SQL "
f"(diagnostics={response_diagnostics!r})"
)
# [/DEF:_extract_compiled_sql_from_preview_response:Function]
# [DEF:SupersetClient.update_dataset:Function]
# @COMPLEXITY: 3
# @PURPOSE: Обновляет данные датасета по его ID.
# @PRE: dataset_id must exist.
# @POST: Dataset is updated in Superset.
# @DATA_CONTRACT: Input[dataset_id: int, data: Dict] -> Output[Dict]
# @SIDE_EFFECT: Modifies resource in upstream Superset environment.
# @RELATION: [CALLS] ->[request]
def update_dataset(self, dataset_id: int, data: Dict) -> Dict:
with belief_scope("SupersetClient.update_dataset", f"id={dataset_id}"):
app_logger.info("[update_dataset][Enter] Updating dataset %s.", dataset_id)
response = self.network.request(
method="PUT",
endpoint=f"/dataset/{dataset_id}",
data=json.dumps(data),
headers={"Content-Type": "application/json"},
)
response = cast(Dict, response)
app_logger.info("[update_dataset][Exit] Updated dataset %s.", dataset_id)
return response
# [/DEF:SupersetClient.update_dataset:Function]
# [DEF:SupersetClient.get_databases:Function]
# @COMPLEXITY: 3
# @PURPOSE: Получает полный список баз данных.
# @PRE: Client is authenticated.
# @POST: Returns total count and list of databases.
# @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]]
# @RELATION: [CALLS] ->[_fetch_all_pages]
def get_databases(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_databases"):
app_logger.info("[get_databases][Enter] Fetching databases.")
validated_query = self._validate_query_params(query or {})
if "columns" not in validated_query:
validated_query["columns"] = []
paginated_data = self._fetch_all_pages(
endpoint="/database/",
pagination_options={
"base_query": validated_query,
"results_field": "result",
},
)
total_count = len(paginated_data)
app_logger.info("[get_databases][Exit] Found %d databases.", total_count)
return total_count, paginated_data
# [/DEF:SupersetClient.get_databases:Function]
# [DEF:get_database:Function]
# @COMPLEXITY: 3
# @PURPOSE: Получает информацию о конкретной базе данных по её ID.
# @PRE: database_id must exist.
# @POST: Returns database details.
# @DATA_CONTRACT: Input[database_id: int] -> Output[Dict]
# @RELATION: [CALLS] ->[request]
def get_database(self, database_id: int) -> Dict:
with belief_scope("get_database"):
app_logger.info("[get_database][Enter] Fetching database %s.", database_id)
response = self.network.request(
method="GET", endpoint=f"/database/{database_id}"
)
response = cast(Dict, response)
app_logger.info("[get_database][Exit] Got database %s.", database_id)
return response
# [/DEF:get_database:Function]
# [DEF:get_databases_summary:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetch a summary of databases including uuid, name, and engine.
# @PRE: Client is authenticated.
# @POST: Returns list of database summaries.
# @DATA_CONTRACT: None -> Output[List[Dict]]
# @RELATION: [CALLS] ->[SupersetClient.get_databases]
def get_databases_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_databases_summary"):
query = {"columns": ["uuid", "database_name", "backend"]}
_, databases = self.get_databases(query=query)
# Map 'backend' to 'engine' for consistency with contracts
for db in databases:
db["engine"] = db.pop("backend", None)
return databases
# [/DEF:get_databases_summary:Function]
# [DEF:get_database_by_uuid:Function]
# @COMPLEXITY: 3
# @PURPOSE: Find a database by its UUID.
# @PRE: db_uuid must be a valid UUID string.
# @POST: Returns database info or None.
# @DATA_CONTRACT: Input[db_uuid: str] -> Output[Optional[Dict]]
# @RELATION: [CALLS] ->[SupersetClient.get_databases]
def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]:
with belief_scope("SupersetClient.get_database_by_uuid", f"uuid={db_uuid}"):
query = {"filters": [{"col": "uuid", "op": "eq", "value": db_uuid}]}
_, databases = self.get_databases(query=query)
return databases[0] if databases else None
# [/DEF:get_database_by_uuid:Function]
# [DEF:SupersetClient._resolve_target_id_for_delete:Function]
# @COMPLEXITY: 1
# @PURPOSE: Resolves a dashboard ID from either an ID or a slug.
# @PRE: Either dash_id or dash_slug should be provided.
# @POST: Returns the resolved ID or None.
# @RELATION: [CALLS] ->[SupersetClient.get_dashboards]
def _resolve_target_id_for_delete(
self, dash_id: Optional[int], dash_slug: Optional[str]
) -> Optional[int]:
with belief_scope("_resolve_target_id_for_delete"):
if dash_id is not None:
return dash_id
if dash_slug is not None:
app_logger.debug(
"[_resolve_target_id_for_delete][State] Resolving ID by slug '%s'.",
dash_slug,
)
try:
_, candidates = self.get_dashboards(
query={
"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]
}
)
if candidates:
target_id = candidates[0]["id"]
app_logger.debug(
"[_resolve_target_id_for_delete][Success] Resolved slug to ID %s.",
target_id,
)
return target_id
except Exception as e:
app_logger.warning(
"[_resolve_target_id_for_delete][Warning] Could not resolve slug '%s' to ID: %s",
dash_slug,
e,
)
return None
# [/DEF:SupersetClient._resolve_target_id_for_delete:Function]
# [DEF:SupersetClient._do_import:Function]
# @COMPLEXITY: 1
# @PURPOSE: Performs the actual multipart upload for import.
# @PRE: file_name must be a path to an existing ZIP file.
# @POST: Returns the API response from the upload.
# @RELATION: [CALLS] ->[APIClient.upload_file]
def _do_import(self, file_name: Union[str, Path]) -> Dict:
with belief_scope("_do_import"):
app_logger.debug(f"[_do_import][State] Uploading file: {file_name}")
file_path = Path(file_name)
if not file_path.exists():
app_logger.error(
f"[_do_import][Failure] File does not exist: {file_name}"
)
raise FileNotFoundError(f"File does not exist: {file_name}")
return self.network.upload_file(
endpoint="/dashboard/import/",
file_info={
"file_obj": file_path,
"file_name": file_path.name,
"form_field": "formData",
},
extra_data={"overwrite": "true"},
timeout=self.env.timeout * 2,
)
# [/DEF:SupersetClient._do_import:Function]
# [DEF:_validate_export_response:Function]
# @COMPLEXITY: 1
# @PURPOSE: Validates that the export response is a non-empty ZIP archive.
# @PRE: response must be a valid requests.Response object.
# @POST: Raises SupersetAPIError if validation fails.
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
with belief_scope("_validate_export_response"):
content_type = response.headers.get("Content-Type", "")
if "application/zip" not in content_type:
raise SupersetAPIError(
f"Получен не ZIP-архив (Content-Type: {content_type})"
)
if not response.content:
raise SupersetAPIError("Получены пустые данные при экспорте")
# [/DEF:_validate_export_response:Function]
# [DEF:_resolve_export_filename:Function]
# @COMPLEXITY: 1
# @PURPOSE: Determines the filename for an exported dashboard.
# @PRE: response must contain Content-Disposition header or dashboard_id must be provided.
# @POST: Returns a sanitized filename string.
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
with belief_scope("_resolve_export_filename"):
filename = get_filename_from_headers(dict(response.headers))
if not filename:
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
app_logger.warning(
"[_resolve_export_filename][Warning] Generated filename: %s",
filename,
)
return filename
# [/DEF:_resolve_export_filename:Function]
# [DEF:_validate_query_params:Function]
# @COMPLEXITY: 1
# @PURPOSE: Ensures query parameters have default page and page_size.
# @PRE: query can be None or a dictionary.
# @POST: Returns a dictionary with at least page and page_size.
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
with belief_scope("_validate_query_params"):
# Superset list endpoints commonly cap page_size at 100.
# Using 100 avoids partial fetches when larger values are silently truncated.
base_query = {"page": 0, "page_size": 100}
return {**base_query, **(query or {})}
# [/DEF:_validate_query_params:Function]
# [DEF:_fetch_total_object_count:Function]
# @COMPLEXITY: 1
# @PURPOSE: Fetches the total number of items for a given endpoint.
# @PRE: endpoint must be a valid Superset API path.
# @POST: Returns the total count as an integer.
# @RELATION: [CALLS] ->[fetch_paginated_count]
def _fetch_total_object_count(self, endpoint: str) -> int:
with belief_scope("_fetch_total_object_count"):
return self.network.fetch_paginated_count(
endpoint=endpoint,
query_params={"page": 0, "page_size": 1},
count_field="count",
)
# [/DEF:_fetch_total_object_count:Function]
# [DEF:_fetch_all_pages:Function]
# @COMPLEXITY: 1
# @PURPOSE: Iterates through all pages to collect all data items.
# @PRE: pagination_options must contain base_query, total_count, and results_field.
# @POST: Returns a combined list of all items.
# @RELATION: [CALLS] ->[fetch_paginated_data]
def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]:
with belief_scope("_fetch_all_pages"):
return self.network.fetch_paginated_data(
endpoint=endpoint, pagination_options=pagination_options
)
# [/DEF:_fetch_all_pages:Function]
# [DEF:_validate_import_file:Function]
# @COMPLEXITY: 1
# @PURPOSE: Validates that the file to be imported is a valid ZIP with metadata.yaml.
# @PRE: zip_path must be a path to a file.
# @POST: Raises error if file is missing, not a ZIP, or missing metadata.
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
with belief_scope("_validate_import_file"):
path = Path(zip_path)
if not path.exists():
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path):
raise SupersetAPIError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path, "r") as zf:
if not any(n.endswith("metadata.yaml") for n in zf.namelist()):
raise SupersetAPIError(
f"Архив {zip_path} не содержит 'metadata.yaml'"
)
# [/DEF:_validate_import_file:Function]
# [DEF:get_all_resources:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches all resources of a given type with id, uuid, and name columns.
# @PARAM: resource_type (str) - One of "chart", "dataset", "dashboard".
# @PRE: Client is authenticated. resource_type is valid.
# @POST: Returns a list of resource dicts with at minimum id, uuid, and name fields.
# @RETURN: List[Dict]
# @RELATION: [CALLS] ->[_fetch_all_pages]
def get_all_resources(
self, resource_type: str, since_dttm: Optional[datetime] = None
) -> List[Dict]:
with belief_scope(
"SupersetClient.get_all_resources",
f"type={resource_type}, since={since_dttm}",
):
column_map = {
"chart": {
"endpoint": "/chart/",
"columns": ["id", "uuid", "slice_name"],
},
"dataset": {
"endpoint": "/dataset/",
"columns": ["id", "uuid", "table_name"],
},
"dashboard": {
"endpoint": "/dashboard/",
"columns": ["id", "uuid", "slug", "dashboard_title"],
},
}
config = column_map.get(resource_type)
if not config:
app_logger.warning(
"[get_all_resources][Warning] Unknown resource type: %s",
resource_type,
)
return []
query = {"columns": config["columns"]}
if since_dttm:
import math
# Use int milliseconds to be safe
timestamp_ms = math.floor(since_dttm.timestamp() * 1000)
query["filters"] = [
{"col": "changed_on_dttm", "opr": "gt", "value": timestamp_ms}
]
validated = self._validate_query_params(query)
data = self._fetch_all_pages(
endpoint=config["endpoint"],
pagination_options={"base_query": validated, "results_field": "result"},
)
app_logger.info(
"[get_all_resources][Exit] Fetched %d %s resources.",
len(data),
resource_type,
)
return data
# [/DEF:get_all_resources:Function]
# [/DEF:SupersetClient:Class]
# [/DEF:SupersetClientModule:Module]