# [DEF:backend.src.scripts.seed_superset_load_test:Module] # # @COMPLEXITY: 3 # @SEMANTICS: superset, load-test, charts, dashboards, seed, stress # @PURPOSE: Creates randomized load-test data in Superset by cloning chart configurations and creating dashboards in target environments. # @LAYER: Scripts # @RELATION: USES -> backend.src.core.config_manager.ConfigManager # @RELATION: USES -> backend.src.core.superset_client.SupersetClient # @INVARIANT: Created chart and dashboard names are globally unique for one script run. # [SECTION: IMPORTS] import argparse import json import random import sys import uuid from pathlib import Path from typing import Dict, List, Optional sys.path.append(str(Path(__file__).parent.parent.parent)) from src.core.config_manager import ConfigManager from src.core.config_models import Environment from src.core.logger import belief_scope, logger from src.core.superset_client import SupersetClient # [/SECTION] # [DEF:_parse_args:Function] # @PURPOSE: Parses CLI arguments for load-test data generation. # @PRE: Script is called from CLI. # @POST: Returns validated argument namespace. def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Seed Superset with load-test charts and dashboards") parser.add_argument("--envs", nargs="+", default=["ss1", "ss2"], help="Target environment IDs") parser.add_argument("--charts", type=int, default=10000, help="Target number of charts to create") parser.add_argument("--dashboards", type=int, default=500, help="Target number of dashboards to create") parser.add_argument("--template-pool-size", type=int, default=200, help="How many source charts to sample as templates per env") parser.add_argument("--seed", type=int, default=None, help="Optional RNG seed for reproducibility") parser.add_argument("--max-errors", type=int, default=100, help="Stop early if errors exceed this threshold") parser.add_argument("--dry-run", action="store_true", help="Do not write data, only validate setup") return parser.parse_args() # [/DEF:_parse_args:Function] # [DEF:_extract_result_payload:Function] # @PURPOSE: Normalizes Superset API payloads that may be wrapped in `result`. # @PRE: payload is a JSON-decoded API response. # @POST: Returns the unwrapped object when present. def _extract_result_payload(payload: Dict) -> Dict: result = payload.get("result") if isinstance(result, dict): return result return payload # [/DEF:_extract_result_payload:Function] # [DEF:_extract_created_id:Function] # @PURPOSE: Extracts object ID from create/update API response. # @PRE: payload is a JSON-decoded API response. # @POST: Returns integer object ID or None if missing. def _extract_created_id(payload: Dict) -> Optional[int]: direct_id = payload.get("id") if isinstance(direct_id, int): return direct_id result = payload.get("result") if isinstance(result, dict) and isinstance(result.get("id"), int): return int(result["id"]) return None # [/DEF:_extract_created_id:Function] # [DEF:_generate_unique_name:Function] # @PURPOSE: Generates globally unique random names for charts/dashboards. # @PRE: used_names is mutable set for collision tracking. # @POST: Returns a unique string and stores it in used_names. def _generate_unique_name(prefix: str, used_names: set[str], rng: random.Random) -> str: adjectives = ["amber", "rapid", "frozen", "delta", "lunar", "vector", "cobalt", "silent", "neon", "solar"] nouns = ["falcon", "matrix", "signal", "harbor", "stream", "vertex", "bridge", "orbit", "pulse", "forge"] while True: token = uuid.uuid4().hex[:8] candidate = f"{prefix}_{rng.choice(adjectives)}_{rng.choice(nouns)}_{rng.randint(100, 999)}_{token}" if candidate not in used_names: used_names.add(candidate) return candidate # [/DEF:_generate_unique_name:Function] # [DEF:_resolve_target_envs:Function] # @PURPOSE: Resolves requested environment IDs from configuration. # @PRE: env_ids is non-empty. # @POST: Returns mapping env_id -> configured environment object. def _resolve_target_envs(env_ids: List[str]) -> Dict[str, Environment]: config_manager = ConfigManager() configured = {env.id: env for env in config_manager.get_environments()} resolved: Dict[str, Environment] = {} if not configured: for config_path in [Path("config.json"), Path("backend/config.json")]: if not config_path.exists(): continue try: payload = json.loads(config_path.read_text(encoding="utf-8")) env_rows = payload.get("environments", []) for row in env_rows: env = Environment(**row) configured[env.id] = env except Exception as exc: logger.warning(f"[REFLECT] Failed loading environments from {config_path}: {exc}") for env_id in env_ids: env = configured.get(env_id) if env is None: raise ValueError(f"Environment '{env_id}' not found in configuration") resolved[env_id] = env return resolved # [/DEF:_resolve_target_envs:Function] # [DEF:_build_chart_template_pool:Function] # @PURPOSE: Builds a pool of source chart templates to clone in one environment. # @PRE: Client is authenticated. # @POST: Returns non-empty list of chart payload templates. def _build_chart_template_pool(client: SupersetClient, pool_size: int, rng: random.Random) -> List[Dict]: list_query = { "page": 0, "page_size": 1000, "columns": ["id", "slice_name", "datasource_id", "datasource_type", "viz_type", "params", "query_context"], } rows = client.network.fetch_paginated_data( endpoint="/chart/", pagination_options={"base_query": list_query, "results_field": "result"}, ) candidates = [row for row in rows if isinstance(row, dict) and row.get("id")] if not candidates: raise RuntimeError("No source charts available for templating") selected = candidates if len(candidates) <= pool_size else rng.sample(candidates, pool_size) templates: List[Dict] = [] for row in selected: chart_id = int(row["id"]) detail_payload = client.get_chart(chart_id) detail = _extract_result_payload(detail_payload) datasource_id = detail.get("datasource_id") datasource_type = detail.get("datasource_type") or row.get("datasource_type") or "table" if datasource_id is None: continue params_value = detail.get("params") if isinstance(params_value, dict): params_value = json.dumps(params_value) query_context_value = detail.get("query_context") if isinstance(query_context_value, dict): query_context_value = json.dumps(query_context_value) templates.append( { "datasource_id": int(datasource_id), "datasource_type": str(datasource_type), "viz_type": detail.get("viz_type") or row.get("viz_type"), "params": params_value, "query_context": query_context_value, } ) if not templates: raise RuntimeError("Could not build templates with datasource metadata") return templates # [/DEF:_build_chart_template_pool:Function] # [DEF:seed_superset_load_data:Function] # @PURPOSE: Creates dashboards and cloned charts for load testing across target environments. # @PRE: Target environments must be reachable and authenticated. # @POST: Returns execution statistics dictionary. # @SIDE_EFFECT: Creates objects in Superset environments. def seed_superset_load_data(args: argparse.Namespace) -> Dict: rng = random.Random(args.seed) env_map = _resolve_target_envs(args.envs) clients: Dict[str, SupersetClient] = {} templates_by_env: Dict[str, List[Dict]] = {} created_dashboards: Dict[str, List[int]] = {env_id: [] for env_id in env_map} created_charts: Dict[str, List[int]] = {env_id: [] for env_id in env_map} used_chart_names: set[str] = set() used_dashboard_names: set[str] = set() for env_id, env in env_map.items(): client = SupersetClient(env) client.authenticate() clients[env_id] = client templates_by_env[env_id] = _build_chart_template_pool(client, args.template_pool_size, rng) logger.info(f"[REASON] Environment {env_id}: loaded {len(templates_by_env[env_id])} chart templates") errors = 0 env_ids = list(env_map.keys()) for idx in range(args.dashboards): env_id = env_ids[idx % len(env_ids)] if idx < len(env_ids) else rng.choice(env_ids) dashboard_title = _generate_unique_name("lt_dash", used_dashboard_names, rng) if args.dry_run: logger.info(f"[REFLECT] Dry-run dashboard create: env={env_id}, title={dashboard_title}") continue try: payload = {"dashboard_title": dashboard_title, "published": False} created = clients[env_id].network.request("POST", "/dashboard/", data=json.dumps(payload)) dashboard_id = _extract_created_id(created) if dashboard_id is None: raise RuntimeError(f"Dashboard create response missing id: {created}") created_dashboards[env_id].append(dashboard_id) except Exception as exc: errors += 1 logger.error(f"[EXPLORE] Failed creating dashboard in {env_id}: {exc}") if errors >= args.max_errors: raise RuntimeError(f"Stopping due to max errors reached ({errors})") from exc if args.dry_run: return { "dry_run": True, "templates_by_env": {k: len(v) for k, v in templates_by_env.items()}, "charts_target": args.charts, "dashboards_target": args.dashboards, } for env_id in env_ids: if not created_dashboards[env_id]: raise RuntimeError(f"No dashboards created in environment {env_id}; cannot bind charts") for index in range(args.charts): env_id = rng.choice(env_ids) client = clients[env_id] template = rng.choice(templates_by_env[env_id]) dashboard_id = rng.choice(created_dashboards[env_id]) chart_name = _generate_unique_name("lt_chart", used_chart_names, rng) payload = { "slice_name": chart_name, "datasource_id": template["datasource_id"], "datasource_type": template["datasource_type"], "dashboards": [dashboard_id], } if template.get("viz_type"): payload["viz_type"] = template["viz_type"] if template.get("params"): payload["params"] = template["params"] if template.get("query_context"): payload["query_context"] = template["query_context"] try: created = client.network.request("POST", "/chart/", data=json.dumps(payload)) chart_id = _extract_created_id(created) if chart_id is None: raise RuntimeError(f"Chart create response missing id: {created}") created_charts[env_id].append(chart_id) if (index + 1) % 500 == 0: logger.info(f"[REASON] Created {index + 1}/{args.charts} charts") except Exception as exc: errors += 1 logger.error(f"[EXPLORE] Failed creating chart in {env_id}: {exc}") if errors >= args.max_errors: raise RuntimeError(f"Stopping due to max errors reached ({errors})") from exc return { "dry_run": False, "errors": errors, "dashboards": {env_id: len(ids) for env_id, ids in created_dashboards.items()}, "charts": {env_id: len(ids) for env_id, ids in created_charts.items()}, "total_dashboards": sum(len(ids) for ids in created_dashboards.values()), "total_charts": sum(len(ids) for ids in created_charts.values()), } # [/DEF:seed_superset_load_data:Function] # [DEF:main:Function] # @PURPOSE: CLI entrypoint for Superset load-test data seeding. # @PRE: Command line arguments are valid. # @POST: Prints summary and exits with non-zero status on failure. def main() -> None: with belief_scope("seed_superset_load_test.main"): args = _parse_args() result = seed_superset_load_data(args) logger.info(f"[COHERENCE:OK] Result summary: {json.dumps(result, ensure_ascii=True)}") # [/DEF:main:Function] if __name__ == "__main__": main() # [/DEF:backend.src.scripts.seed_superset_load_test:Module]