# [DEF:MigrationPlugin:Module] # @SEMANTICS: migration, superset, automation, dashboard, plugin # @PURPOSE: A plugin that provides functionality to migrate Superset dashboards between environments. # @LAYER: App # @RELATION: IMPLEMENTS -> PluginBase # @RELATION: DEPENDS_ON -> superset_tool.client # @RELATION: DEPENDS_ON -> superset_tool.utils from typing import Dict, Any, List from pathlib import Path import zipfile import re from ..core.plugin_base import PluginBase from superset_tool.client import SupersetClient from superset_tool.utils.init_clients import setup_clients from superset_tool.utils.fileio import create_temp_file, update_yamls, create_dashboard_export from ..dependencies import get_config_manager from superset_tool.utils.logger import SupersetLogger from ..core.migration_engine import MigrationEngine from ..core.database import SessionLocal from ..models.mapping import DatabaseMapping, Environment class MigrationPlugin(PluginBase): """ A plugin to migrate Superset dashboards between environments. """ @property def id(self) -> str: return "superset-migration" @property def name(self) -> str: return "Superset Dashboard Migration" @property def description(self) -> str: return "Migrates dashboards between Superset environments." @property def version(self) -> str: return "1.0.0" def get_schema(self) -> Dict[str, Any]: config_manager = get_config_manager() envs = [e.name for e in config_manager.get_environments()] return { "type": "object", "properties": { "from_env": { "type": "string", "title": "Source Environment", "description": "The environment to migrate from.", "enum": envs if envs else ["dev", "prod"], }, "to_env": { "type": "string", "title": "Target Environment", "description": "The environment to migrate to.", "enum": envs if envs else ["dev", "prod"], }, "dashboard_regex": { "type": "string", "title": "Dashboard Regex", "description": "A regular expression to filter dashboards to migrate.", }, "replace_db_config": { "type": "boolean", "title": "Replace DB Config", "description": "Whether to replace the database configuration.", "default": False, }, "from_db_id": { "type": "integer", "title": "Source DB ID", "description": "The ID of the source database to replace (if replacing).", }, "to_db_id": { "type": "integer", "title": "Target DB ID", "description": "The ID of the target database to replace with (if replacing).", }, }, "required": ["from_env", "to_env", "dashboard_regex"], } async def execute(self, params: Dict[str, Any]): source_env_id = params.get("source_env_id") target_env_id = params.get("target_env_id") selected_ids = params.get("selected_ids") # Legacy support or alternative params from_env_name = params.get("from_env") to_env_name = params.get("to_env") dashboard_regex = params.get("dashboard_regex") replace_db_config = params.get("replace_db_config", False) from_db_id = params.get("from_db_id") to_db_id = params.get("to_db_id") # [DEF:MigrationPlugin.execute:Action] # @PURPOSE: Execute the migration logic with proper task logging. task_id = params.get("_task_id") from ..dependencies import get_task_manager tm = get_task_manager() class TaskLoggerProxy(SupersetLogger): def __init__(self): # Initialize parent with dummy values since we override methods super().__init__(console=False) def debug(self, msg, *args, extra=None, **kwargs): if task_id: tm._add_log(task_id, "DEBUG", msg, extra or {}) def info(self, msg, *args, extra=None, **kwargs): if task_id: tm._add_log(task_id, "INFO", msg, extra or {}) def warning(self, msg, *args, extra=None, **kwargs): if task_id: tm._add_log(task_id, "WARNING", msg, extra or {}) def error(self, msg, *args, extra=None, **kwargs): if task_id: tm._add_log(task_id, "ERROR", msg, extra or {}) def critical(self, msg, *args, extra=None, **kwargs): if task_id: tm._add_log(task_id, "ERROR", msg, extra or {}) def exception(self, msg, *args, **kwargs): if task_id: tm._add_log(task_id, "ERROR", msg, {"exception": True}) logger = TaskLoggerProxy() logger.info(f"[MigrationPlugin][Entry] Starting migration task.") logger.info(f"[MigrationPlugin][Action] Params: {params}") try: config_manager = get_config_manager() environments = config_manager.get_environments() # Resolve environments src_env = None tgt_env = None if source_env_id: src_env = next((e for e in environments if e.id == source_env_id), None) elif from_env_name: src_env = next((e for e in environments if e.name == from_env_name), None) if target_env_id: tgt_env = next((e for e in environments if e.id == target_env_id), None) elif to_env_name: tgt_env = next((e for e in environments if e.name == to_env_name), None) if not src_env or not tgt_env: raise ValueError(f"Could not resolve source or target environment. Source: {source_env_id or from_env_name}, Target: {target_env_id or to_env_name}") from_env_name = src_env.name to_env_name = tgt_env.name logger.info(f"[MigrationPlugin][State] Resolved environments: {from_env_name} -> {to_env_name}") all_clients = setup_clients(logger, custom_envs=environments) from_c = all_clients.get(from_env_name) to_c = all_clients.get(to_env_name) if not from_c or not to_c: raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}") _, all_dashboards = from_c.get_dashboards() dashboards_to_migrate = [] if selected_ids: dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids] elif dashboard_regex: regex_str = str(dashboard_regex) dashboards_to_migrate = [ d for d in all_dashboards if re.search(regex_str, d["dashboard_title"], re.IGNORECASE) ] else: logger.warning("[MigrationPlugin][State] No selection criteria provided (selected_ids or dashboard_regex).") return if not dashboards_to_migrate: logger.warning("[MigrationPlugin][State] No dashboards found matching criteria.") return # Fetch mappings from database db_mapping = {} if replace_db_config: db = SessionLocal() try: # Find environment IDs by name src_env = db.query(Environment).filter(Environment.name == from_env_name).first() tgt_env = db.query(Environment).filter(Environment.name == to_env_name).first() if src_env and tgt_env: mappings = db.query(DatabaseMapping).filter( DatabaseMapping.source_env_id == src_env.id, DatabaseMapping.target_env_id == tgt_env.id ).all() db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings} logger.info(f"[MigrationPlugin][State] Loaded {len(db_mapping)} database mappings.") finally: db.close() engine = MigrationEngine() for dash in dashboards_to_migrate: dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"] try: exported_content, _ = from_c.export_dashboard(dash_id) with create_temp_file(content=exported_content, dry_run=True, suffix=".zip", logger=logger) as tmp_zip_path: # Always transform to strip databases to avoid password errors with create_temp_file(suffix=".zip", dry_run=True, logger=logger) as tmp_new_zip: success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping, strip_databases=False) if not success and replace_db_config: # Signal missing mapping and wait (only if we care about mappings) if task_id: logger.info(f"[MigrationPlugin][Action] Pausing for missing mapping in task {task_id}") # In a real scenario, we'd pass the missing DB info to the frontend # For this task, we'll just simulate the wait await tm.wait_for_resolution(task_id) # After resolution, retry transformation with updated mappings # (Mappings would be updated in task.params by resolve_task) db = SessionLocal() try: src_env = db.query(Environment).filter(Environment.name == from_env_name).first() tgt_env = db.query(Environment).filter(Environment.name == to_env_name).first() mappings = db.query(DatabaseMapping).filter( DatabaseMapping.source_env_id == src_env.id, DatabaseMapping.target_env_id == tgt_env.id ).all() db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings} finally: db.close() success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping, strip_databases=False) if success: to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug) else: logger.error(f"[MigrationPlugin][Failure] Failed to transform ZIP for dashboard {title}") logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported.") except Exception as exc: # Check for password error error_msg = str(exc) # The error message from Superset is often a JSON string inside a string. # We need to robustly detect the password requirement. # Typical error: "Error importing dashboard: databases/PostgreSQL.yaml: {'_schema': ['Must provide a password for the database']}" if "Must provide a password for the database" in error_msg: # Extract database name # Try to find "databases/DBNAME.yaml" pattern import re db_name = "unknown" match = re.search(r"databases/([^.]+)\.yaml", error_msg) if match: db_name = match.group(1) else: # Fallback: try to find 'database 'NAME'' pattern match_alt = re.search(r"database '([^']+)'", error_msg) if match_alt: db_name = match_alt.group(1) logger.warning(f"[MigrationPlugin][Action] Detected missing password for database: {db_name}") if task_id: input_request = { "type": "database_password", "databases": [db_name], "error_message": error_msg } tm.await_input(task_id, input_request) # Wait for user input await tm.wait_for_input(task_id) # Resume with passwords task = tm.get_task(task_id) passwords = task.params.get("passwords", {}) # Retry import with password if passwords: logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.") to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords) logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.") # Clear passwords from params after use for security if "passwords" in task.params: del task.params["passwords"] continue logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True) logger.info("[MigrationPlugin][Exit] Migration finished.") except Exception as e: logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True) raise e # [/DEF:MigrationPlugin]