mcp
This commit is contained in:
@@ -1,11 +1,14 @@
|
||||
# [DEF:MigrationPlugin:Module]
|
||||
# @SEMANTICS: migration, superset, automation, dashboard, plugin
|
||||
# @PURPOSE: A plugin that provides functionality to migrate Superset dashboards between environments.
|
||||
# @TIER: CRITICAL
|
||||
# @SEMANTICS: migration, superset, automation, dashboard, plugin, transformation
|
||||
# @PURPOSE: Orchestrates export, DB-mapping transformation, and import of Superset dashboards across environments.
|
||||
# @LAYER: App
|
||||
# @RELATION: IMPLEMENTS -> PluginBase
|
||||
# @RELATION: DEPENDS_ON -> superset_tool.client
|
||||
# @RELATION: DEPENDS_ON -> superset_tool.utils
|
||||
# @RELATION: DEPENDS_ON -> SupersetClient
|
||||
# @RELATION: DEPENDS_ON -> MigrationEngine
|
||||
# @RELATION: DEPENDS_ON -> IdMappingService
|
||||
# @RELATION: USES -> TaskContext
|
||||
# @INVARIANT: Dashboards must never be imported with unmapped/source DB connections to prevent data leaks or cross-environment pollution.
|
||||
|
||||
from typing import Dict, Any, Optional
|
||||
import re
|
||||
@@ -22,7 +25,13 @@ from ..core.mapping_service import IdMappingService
|
||||
from ..core.task_manager.context import TaskContext
|
||||
|
||||
# [DEF:MigrationPlugin:Class]
|
||||
# @PURPOSE: Implementation of the migration plugin logic.
|
||||
# @PURPOSE: Implementation of the migration plugin workflow and transformation orchestration.
|
||||
# @PRE: Plugin loader must register this instance.
|
||||
# @POST: Provides migration UI schema and executes atomic dashboard transfers.
|
||||
# @TEST_FIXTURE: superset_export_zip -> file:backend/tests/fixtures/migration/dashboard_export.zip
|
||||
# @TEST_FIXTURE: db_mapping_payload -> INLINE_JSON: {"db_mappings": {"source_uuid_1": "target_uuid_2"}}
|
||||
# @TEST_FIXTURE: password_inject_payload -> INLINE_JSON: {"passwords": {"PostgreSQL": "secret123"}}
|
||||
# @TEST_INVARIANT: strict_db_isolation -> VERIFIED_BY: [successful_dashboard_transfer, missing_mapping_resolution]
|
||||
class MigrationPlugin(PluginBase):
|
||||
"""
|
||||
A plugin to migrate Superset dashboards between environments.
|
||||
@@ -31,377 +40,345 @@ class MigrationPlugin(PluginBase):
|
||||
@property
|
||||
# [DEF:id:Function]
|
||||
# @PURPOSE: Returns the unique identifier for the migration plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns "superset-migration".
|
||||
# @RETURN: str - "superset-migration"
|
||||
# @PRE: None.
|
||||
# @POST: Returns stable string "superset-migration".
|
||||
# @RETURN: str
|
||||
def id(self) -> str:
|
||||
with belief_scope("id"):
|
||||
with belief_scope("MigrationPlugin.id"):
|
||||
return "superset-migration"
|
||||
# [/DEF:id:Function]
|
||||
|
||||
@property
|
||||
# [DEF:name:Function]
|
||||
# @PURPOSE: Returns the human-readable name of the migration plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns the plugin name.
|
||||
# @RETURN: str - Plugin name.
|
||||
# @PURPOSE: Returns the human-readable name of the plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns "Superset Dashboard Migration".
|
||||
# @RETURN: str
|
||||
def name(self) -> str:
|
||||
with belief_scope("name"):
|
||||
with belief_scope("MigrationPlugin.name"):
|
||||
return "Superset Dashboard Migration"
|
||||
# [/DEF:name:Function]
|
||||
|
||||
@property
|
||||
# [DEF:description:Function]
|
||||
# @PURPOSE: Returns a description of the migration plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns the plugin description.
|
||||
# @RETURN: str - Plugin description.
|
||||
# @PURPOSE: Returns the semantic description of the plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns description string.
|
||||
# @RETURN: str
|
||||
def description(self) -> str:
|
||||
with belief_scope("description"):
|
||||
with belief_scope("MigrationPlugin.description"):
|
||||
return "Migrates dashboards between Superset environments."
|
||||
# [/DEF:description:Function]
|
||||
|
||||
@property
|
||||
# [DEF:version:Function]
|
||||
# @PURPOSE: Returns the version of the migration plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns "1.0.0".
|
||||
# @RETURN: str - "1.0.0"
|
||||
# @PURPOSE: Returns the semantic version of the migration plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns "1.0.0".
|
||||
# @RETURN: str
|
||||
def version(self) -> str:
|
||||
with belief_scope("version"):
|
||||
with belief_scope("MigrationPlugin.version"):
|
||||
return "1.0.0"
|
||||
# [/DEF:version:Function]
|
||||
|
||||
@property
|
||||
# [DEF:ui_route:Function]
|
||||
# @PURPOSE: Returns the frontend route for the migration plugin.
|
||||
# @RETURN: str - "/migration"
|
||||
# @PURPOSE: Returns the frontend routing anchor for the plugin.
|
||||
# @PRE: None.
|
||||
# @POST: Returns "/migration".
|
||||
# @RETURN: str
|
||||
def ui_route(self) -> str:
|
||||
with belief_scope("ui_route"):
|
||||
with belief_scope("MigrationPlugin.ui_route"):
|
||||
return "/migration"
|
||||
# [/DEF:ui_route:Function]
|
||||
|
||||
# [DEF:get_schema:Function]
|
||||
# @PURPOSE: Returns the JSON schema for migration plugin parameters.
|
||||
# @PRE: Config manager is available.
|
||||
# @POST: Returns a valid JSON schema dictionary.
|
||||
# @RETURN: Dict[str, Any] - JSON schema.
|
||||
# @PURPOSE: Generates the JSON Schema for the plugin execution form dynamically.
|
||||
# @PRE: ConfigManager is accessible and environments are defined.
|
||||
# @POST: Returns a JSON Schema dict matching current system environments.
|
||||
# @RETURN: Dict[str, Any]
|
||||
def get_schema(self) -> Dict[str, Any]:
|
||||
with belief_scope("get_schema"):
|
||||
with belief_scope("MigrationPlugin.get_schema"):
|
||||
app_logger.reason("Generating migration UI schema")
|
||||
config_manager = get_config_manager()
|
||||
envs = [e.name for e in config_manager.get_environments()]
|
||||
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"from_env": {
|
||||
"type": "string",
|
||||
"title": "Source Environment",
|
||||
"description": "The environment to migrate from.",
|
||||
"enum": envs if envs else ["dev", "prod"],
|
||||
envs = [e.name for e in config_manager.get_environments()]
|
||||
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"from_env": {
|
||||
"type": "string",
|
||||
"title": "Source Environment",
|
||||
"description": "The environment to migrate from.",
|
||||
"enum": envs if envs else ["dev", "prod"],
|
||||
},
|
||||
"to_env": {
|
||||
"type": "string",
|
||||
"title": "Target Environment",
|
||||
"description": "The environment to migrate to.",
|
||||
"enum": envs if envs else ["dev", "prod"],
|
||||
},
|
||||
"dashboard_regex": {
|
||||
"type": "string",
|
||||
"title": "Dashboard Regex",
|
||||
"description": "A regular expression to filter dashboards to migrate.",
|
||||
},
|
||||
"replace_db_config": {
|
||||
"type": "boolean",
|
||||
"title": "Replace DB Config",
|
||||
"description": "Whether to replace the database configuration.",
|
||||
"default": False,
|
||||
},
|
||||
"from_db_id": {
|
||||
"type": "integer",
|
||||
"title": "Source DB ID",
|
||||
"description": "The ID of the source database to replace (if replacing).",
|
||||
},
|
||||
"to_db_id": {
|
||||
"type": "integer",
|
||||
"title": "Target DB ID",
|
||||
"description": "The ID of the target database to replace with (if replacing).",
|
||||
},
|
||||
},
|
||||
"to_env": {
|
||||
"type": "string",
|
||||
"title": "Target Environment",
|
||||
"description": "The environment to migrate to.",
|
||||
"enum": envs if envs else ["dev", "prod"],
|
||||
},
|
||||
"dashboard_regex": {
|
||||
"type": "string",
|
||||
"title": "Dashboard Regex",
|
||||
"description": "A regular expression to filter dashboards to migrate.",
|
||||
},
|
||||
"replace_db_config": {
|
||||
"type": "boolean",
|
||||
"title": "Replace DB Config",
|
||||
"description": "Whether to replace the database configuration.",
|
||||
"default": False,
|
||||
},
|
||||
"from_db_id": {
|
||||
"type": "integer",
|
||||
"title": "Source DB ID",
|
||||
"description": "The ID of the source database to replace (if replacing).",
|
||||
},
|
||||
"to_db_id": {
|
||||
"type": "integer",
|
||||
"title": "Target DB ID",
|
||||
"description": "The ID of the target database to replace with (if replacing).",
|
||||
},
|
||||
},
|
||||
"required": ["from_env", "to_env", "dashboard_regex"],
|
||||
}
|
||||
"required": ["from_env", "to_env", "dashboard_regex"],
|
||||
}
|
||||
app_logger.reflect("Schema generated successfully", extra={"environments_count": len(envs)})
|
||||
return schema
|
||||
# [/DEF:get_schema:Function]
|
||||
|
||||
# [DEF:execute:Function]
|
||||
# @PURPOSE: Executes the dashboard migration logic with TaskContext support.
|
||||
# @PARAM: params (Dict[str, Any]) - Migration parameters.
|
||||
# @PARAM: context (Optional[TaskContext]) - Task context for logging with source attribution.
|
||||
# @PRE: Source and target environments must be configured.
|
||||
# @POST: Selected dashboards are migrated.
|
||||
# @PURPOSE: Orchestrates the dashboard migration pipeline including extraction, AST mutation, and ingestion.
|
||||
# @PARAM: params (Dict[str, Any]) - Extracted parameters from UI/API execution request.
|
||||
# @PARAM: context (Optional[TaskContext]) - Dependency injected TaskContext for IO tracing.
|
||||
# @PRE: Source and target environments must resolve. Matching dashboards must exist.
|
||||
# @POST: Dashboard ZIP bundles are transformed and imported. ID mappings are synchronized.
|
||||
# @SIDE_EFFECT: Creates temp files, mutates target Superset state, blocks on user input (passwords/mappings).
|
||||
# @TEST_CONTRACT: Dict[str, Any] -> Dict[str, Any]
|
||||
# @TEST_SCENARIO: successful_dashboard_transfer -> ZIP is downloaded, DB mappings applied via AST, target import succeeds.
|
||||
# @TEST_SCENARIO: missing_password_injection -> Target import fails on auth, TaskManager pauses for user input, retries with password successfully.
|
||||
# @TEST_SCENARIO: empty_selection -> Returns NO_MATCHES gracefully when regex finds zero dashboards.
|
||||
# @TEST_EDGE: missing_env_field -> [ValueError: Could not resolve source or target environment]
|
||||
# @TEST_EDGE: invalid_regex_pattern -> [Regex compilation exception is thrown or caught gracefully]
|
||||
# @TEST_EDGE: target_api_timeout -> [Dashboard added to failed_dashboards, task concludes with PARTIAL_SUCCESS]
|
||||
async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None):
|
||||
with belief_scope("MigrationPlugin.execute"):
|
||||
app_logger.reason("Evaluating migration task parameters", extra={"params": params})
|
||||
|
||||
source_env_id = params.get("source_env_id")
|
||||
target_env_id = params.get("target_env_id")
|
||||
selected_ids = params.get("selected_ids")
|
||||
|
||||
# Legacy support or alternative params
|
||||
from_env_name = params.get("from_env")
|
||||
to_env_name = params.get("to_env")
|
||||
dashboard_regex = params.get("dashboard_regex")
|
||||
|
||||
replace_db_config = params.get("replace_db_config", False)
|
||||
fix_cross_filters = params.get("fix_cross_filters", True)
|
||||
params.get("from_db_id")
|
||||
params.get("to_db_id")
|
||||
|
||||
# [DEF:MigrationPlugin.execute:Action]
|
||||
# @PURPOSE: Execute the migration logic with proper task logging.
|
||||
task_id = params.get("_task_id")
|
||||
from ..dependencies import get_task_manager
|
||||
tm = get_task_manager()
|
||||
|
||||
# Use TaskContext logger if available, otherwise fall back to app_logger
|
||||
log = context.logger if context else app_logger
|
||||
|
||||
# Create sub-loggers for different components
|
||||
superset_log = log.with_source("superset_api") if context else log
|
||||
migration_log = log.with_source("migration") if context else log
|
||||
|
||||
log.info("Starting migration task.")
|
||||
log.debug(f"Params: {params}")
|
||||
|
||||
try:
|
||||
with belief_scope("execute"):
|
||||
config_manager = get_config_manager()
|
||||
environments = config_manager.get_environments()
|
||||
|
||||
# Resolve environments
|
||||
src_env = None
|
||||
tgt_env = None
|
||||
from_env_name = params.get("from_env")
|
||||
to_env_name = params.get("to_env")
|
||||
dashboard_regex = params.get("dashboard_regex")
|
||||
replace_db_config = params.get("replace_db_config", False)
|
||||
fix_cross_filters = params.get("fix_cross_filters", True)
|
||||
|
||||
if source_env_id:
|
||||
src_env = next((e for e in environments if e.id == source_env_id), None)
|
||||
elif from_env_name:
|
||||
src_env = next((e for e in environments if e.name == from_env_name), None)
|
||||
|
||||
if target_env_id:
|
||||
tgt_env = next((e for e in environments if e.id == target_env_id), None)
|
||||
elif to_env_name:
|
||||
tgt_env = next((e for e in environments if e.name == to_env_name), None)
|
||||
task_id = params.get("_task_id")
|
||||
from ..dependencies import get_task_manager
|
||||
tm = get_task_manager()
|
||||
|
||||
log = context.logger if context else app_logger
|
||||
superset_log = log.with_source("superset_api") if context else log
|
||||
migration_log = log.with_source("migration") if context else log
|
||||
|
||||
if not src_env or not tgt_env:
|
||||
raise ValueError(f"Could not resolve source or target environment. Source: {source_env_id or from_env_name}, Target: {target_env_id or to_env_name}")
|
||||
log.info("Starting migration task.")
|
||||
|
||||
from_env_name = src_env.name
|
||||
to_env_name = tgt_env.name
|
||||
|
||||
log.info(f"Resolved environments: {from_env_name} -> {to_env_name}")
|
||||
migration_result = {
|
||||
"status": "SUCCESS",
|
||||
"source_environment": from_env_name,
|
||||
"target_environment": to_env_name,
|
||||
"selected_dashboards": 0,
|
||||
"migrated_dashboards": [],
|
||||
"failed_dashboards": [],
|
||||
"mapping_count": 0
|
||||
}
|
||||
|
||||
from_c = SupersetClient(src_env)
|
||||
to_c = SupersetClient(tgt_env)
|
||||
|
||||
if not from_c or not to_c:
|
||||
raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}")
|
||||
|
||||
_, all_dashboards = from_c.get_dashboards()
|
||||
|
||||
dashboards_to_migrate = []
|
||||
if selected_ids:
|
||||
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
|
||||
elif dashboard_regex:
|
||||
regex_pattern = re.compile(str(dashboard_regex), re.IGNORECASE)
|
||||
dashboards_to_migrate = [
|
||||
d for d in all_dashboards if regex_pattern.search(d.get("dashboard_title", ""))
|
||||
]
|
||||
else:
|
||||
log.warning("No selection criteria provided (selected_ids or dashboard_regex).")
|
||||
migration_result["status"] = "NO_SELECTION"
|
||||
return migration_result
|
||||
|
||||
if not dashboards_to_migrate:
|
||||
log.warning("No dashboards found matching criteria.")
|
||||
migration_result["status"] = "NO_MATCHES"
|
||||
return migration_result
|
||||
|
||||
migration_result["selected_dashboards"] = len(dashboards_to_migrate)
|
||||
|
||||
# Get mappings from params
|
||||
db_mapping = params.get("db_mappings", {})
|
||||
if not isinstance(db_mapping, dict):
|
||||
db_mapping = {}
|
||||
|
||||
# Fetch additional mappings from database if requested
|
||||
if replace_db_config:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
# Find environment IDs by name
|
||||
src_env_db = db.query(Environment).filter(Environment.name == from_env_name).first()
|
||||
tgt_env_db = db.query(Environment).filter(Environment.name == to_env_name).first()
|
||||
|
||||
if src_env_db and tgt_env_db:
|
||||
stored_mappings = db.query(DatabaseMapping).filter(
|
||||
DatabaseMapping.source_env_id == src_env_db.id,
|
||||
DatabaseMapping.target_env_id == tgt_env_db.id
|
||||
).all()
|
||||
# Provided mappings override stored ones
|
||||
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
|
||||
stored_map_dict.update(db_mapping)
|
||||
db_mapping = stored_map_dict
|
||||
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
migration_result["mapping_count"] = len(db_mapping)
|
||||
engine = MigrationEngine()
|
||||
|
||||
for dash in dashboards_to_migrate:
|
||||
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
|
||||
|
||||
try:
|
||||
exported_content, _ = from_c.export_dashboard(dash_id)
|
||||
with create_temp_file(content=exported_content, dry_run=True, suffix=".zip") as tmp_zip_path:
|
||||
# Always transform to strip databases to avoid password errors
|
||||
with create_temp_file(suffix=".zip", dry_run=True) as tmp_new_zip:
|
||||
success = engine.transform_zip(
|
||||
str(tmp_zip_path),
|
||||
str(tmp_new_zip),
|
||||
db_mapping,
|
||||
strip_databases=False,
|
||||
target_env_id=tgt_env.id if tgt_env else None,
|
||||
fix_cross_filters=fix_cross_filters
|
||||
)
|
||||
|
||||
if not success and replace_db_config:
|
||||
# Signal missing mapping and wait (only if we care about mappings)
|
||||
if task_id:
|
||||
log.info(f"Pausing for missing mapping in task {task_id}")
|
||||
# In a real scenario, we'd pass the missing DB info to the frontend
|
||||
# For this task, we'll just simulate the wait
|
||||
await tm.wait_for_resolution(task_id)
|
||||
# After resolution, retry transformation with updated mappings
|
||||
# (Mappings would be updated in task.params by resolve_task)
|
||||
db = SessionLocal()
|
||||
try:
|
||||
src_env_rt = db.query(Environment).filter(Environment.name == from_env_name).first()
|
||||
tgt_env_rt = db.query(Environment).filter(Environment.name == to_env_name).first()
|
||||
mappings = db.query(DatabaseMapping).filter(
|
||||
DatabaseMapping.source_env_id == src_env_rt.id,
|
||||
DatabaseMapping.target_env_id == tgt_env_rt.id
|
||||
).all()
|
||||
db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings}
|
||||
finally:
|
||||
db.close()
|
||||
success = engine.transform_zip(
|
||||
str(tmp_zip_path),
|
||||
str(tmp_new_zip),
|
||||
db_mapping,
|
||||
strip_databases=False,
|
||||
target_env_id=tgt_env.id if tgt_env else None,
|
||||
fix_cross_filters=fix_cross_filters
|
||||
)
|
||||
|
||||
if success:
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
else:
|
||||
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": "Failed to transform ZIP"
|
||||
})
|
||||
|
||||
superset_log.info(f"Dashboard {title} imported.")
|
||||
except Exception as exc:
|
||||
# Check for password error
|
||||
error_msg = str(exc)
|
||||
# The error message from Superset is often a JSON string inside a string.
|
||||
# We need to robustly detect the password requirement.
|
||||
# Typical error: "Error importing dashboard: databases/PostgreSQL.yaml: {'_schema': ['Must provide a password for the database']}"
|
||||
|
||||
if "Must provide a password for the database" in error_msg:
|
||||
# Extract database name
|
||||
# Try to find "databases/DBNAME.yaml" pattern
|
||||
import re
|
||||
db_name = "unknown"
|
||||
match = re.search(r"databases/([^.]+)\.yaml", error_msg)
|
||||
if match:
|
||||
db_name = match.group(1)
|
||||
else:
|
||||
# Fallback: try to find 'database 'NAME'' pattern
|
||||
match_alt = re.search(r"database '([^']+)'", error_msg)
|
||||
if match_alt:
|
||||
db_name = match_alt.group(1)
|
||||
|
||||
app_logger.warning(f"[MigrationPlugin][Action] Detected missing password for database: {db_name}")
|
||||
|
||||
if task_id:
|
||||
input_request = {
|
||||
"type": "database_password",
|
||||
"databases": [db_name],
|
||||
"error_message": error_msg
|
||||
}
|
||||
tm.await_input(task_id, input_request)
|
||||
|
||||
# Wait for user input
|
||||
await tm.wait_for_input(task_id)
|
||||
|
||||
# Resume with passwords
|
||||
task = tm.get_task(task_id)
|
||||
passwords = task.params.get("passwords", {})
|
||||
|
||||
# Retry import with password
|
||||
if passwords:
|
||||
app_logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
|
||||
app_logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
# Clear passwords from params after use for security
|
||||
if "passwords" in task.params:
|
||||
del task.params["passwords"]
|
||||
continue
|
||||
|
||||
app_logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": str(exc)
|
||||
})
|
||||
|
||||
app_logger.info("[MigrationPlugin][Exit] Migration finished.")
|
||||
if migration_result["failed_dashboards"]:
|
||||
migration_result["status"] = "PARTIAL_SUCCESS"
|
||||
|
||||
# Perform incremental sync to rapidly update local mappings with new imported resources
|
||||
try:
|
||||
db_session = SessionLocal()
|
||||
mapping_service = IdMappingService(db_session)
|
||||
mapping_service.sync_environment(tgt_env.id, to_c, incremental=True)
|
||||
db_session.close()
|
||||
log.info(f"[MigrationPlugin][Action] Completed incremental sync for target environment {to_env_name}")
|
||||
except Exception as sync_exc:
|
||||
log.error(f"[MigrationPlugin][Error] Failed incremental sync for {to_env_name}: {sync_exc}")
|
||||
config_manager = get_config_manager()
|
||||
environments = config_manager.get_environments()
|
||||
|
||||
# Resolve environments
|
||||
src_env = next((e for e in environments if e.id == source_env_id), None) if source_env_id else next((e for e in environments if e.name == from_env_name), None)
|
||||
tgt_env = next((e for e in environments if e.id == target_env_id), None) if target_env_id else next((e for e in environments if e.name == to_env_name), None)
|
||||
|
||||
if not src_env or not tgt_env:
|
||||
app_logger.explore("Environment resolution failed", extra={"src": source_env_id or from_env_name, "tgt": target_env_id or to_env_name})
|
||||
raise ValueError(f"Could not resolve source or target environment. Source: {source_env_id or from_env_name}, Target: {target_env_id or to_env_name}")
|
||||
|
||||
return migration_result
|
||||
except Exception as e:
|
||||
app_logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
|
||||
raise e
|
||||
# [/DEF:MigrationPlugin.execute:Action]
|
||||
from_env_name = src_env.name
|
||||
to_env_name = tgt_env.name
|
||||
|
||||
app_logger.reason("Environments resolved successfully", extra={"from": from_env_name, "to": to_env_name})
|
||||
|
||||
migration_result = {
|
||||
"status": "SUCCESS",
|
||||
"source_environment": from_env_name,
|
||||
"target_environment": to_env_name,
|
||||
"selected_dashboards": 0,
|
||||
"migrated_dashboards": [],
|
||||
"failed_dashboards": [],
|
||||
"mapping_count": 0
|
||||
}
|
||||
|
||||
from_c = SupersetClient(src_env)
|
||||
to_c = SupersetClient(tgt_env)
|
||||
|
||||
if not from_c or not to_c:
|
||||
raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}")
|
||||
|
||||
_, all_dashboards = from_c.get_dashboards()
|
||||
|
||||
# Selection Logic
|
||||
if selected_ids:
|
||||
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
|
||||
elif dashboard_regex:
|
||||
regex_pattern = re.compile(str(dashboard_regex), re.IGNORECASE)
|
||||
dashboards_to_migrate = [d for d in all_dashboards if regex_pattern.search(d.get("dashboard_title", ""))]
|
||||
else:
|
||||
app_logger.explore("No deterministic selection criteria provided")
|
||||
migration_result["status"] = "NO_SELECTION"
|
||||
return migration_result
|
||||
|
||||
if not dashboards_to_migrate:
|
||||
app_logger.explore("Zero dashboards match selection criteria")
|
||||
migration_result["status"] = "NO_MATCHES"
|
||||
return migration_result
|
||||
|
||||
migration_result["selected_dashboards"] = len(dashboards_to_migrate)
|
||||
|
||||
# Database Mapping Resolution
|
||||
db_mapping = params.get("db_mappings", {})
|
||||
if not isinstance(db_mapping, dict):
|
||||
db_mapping = {}
|
||||
|
||||
if replace_db_config:
|
||||
app_logger.reason("Fetching environment DB mappings from catalog")
|
||||
db = SessionLocal()
|
||||
try:
|
||||
src_env_db = db.query(Environment).filter(Environment.name == from_env_name).first()
|
||||
tgt_env_db = db.query(Environment).filter(Environment.name == to_env_name).first()
|
||||
|
||||
if src_env_db and tgt_env_db:
|
||||
stored_mappings = db.query(DatabaseMapping).filter(
|
||||
DatabaseMapping.source_env_id == src_env_db.id,
|
||||
DatabaseMapping.target_env_id == tgt_env_db.id
|
||||
).all()
|
||||
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
|
||||
stored_map_dict.update(db_mapping)
|
||||
db_mapping = stored_map_dict
|
||||
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
migration_result["mapping_count"] = len(db_mapping)
|
||||
engine = MigrationEngine()
|
||||
|
||||
# Migration Loop
|
||||
for dash in dashboards_to_migrate:
|
||||
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
|
||||
app_logger.reason(f"Starting pipeline for dashboard '{title}'", extra={"dash_id": dash_id})
|
||||
|
||||
try:
|
||||
exported_content, _ = from_c.export_dashboard(dash_id)
|
||||
with create_temp_file(content=exported_content, dry_run=True, suffix=".zip") as tmp_zip_path:
|
||||
with create_temp_file(suffix=".zip", dry_run=True) as tmp_new_zip:
|
||||
|
||||
success = engine.transform_zip(
|
||||
str(tmp_zip_path),
|
||||
str(tmp_new_zip),
|
||||
db_mapping,
|
||||
strip_databases=False,
|
||||
target_env_id=tgt_env.id if tgt_env else None,
|
||||
fix_cross_filters=fix_cross_filters
|
||||
)
|
||||
|
||||
if not success and replace_db_config:
|
||||
if task_id:
|
||||
app_logger.explore("Missing mapping blocks AST transform. Pausing task for user intervention.", extra={"task_id": task_id})
|
||||
await tm.wait_for_resolution(task_id)
|
||||
|
||||
app_logger.reason("Task resumed, re-evaluating mapping states")
|
||||
db = SessionLocal()
|
||||
try:
|
||||
src_env_rt = db.query(Environment).filter(Environment.name == from_env_name).first()
|
||||
tgt_env_rt = db.query(Environment).filter(Environment.name == to_env_name).first()
|
||||
mappings = db.query(DatabaseMapping).filter(
|
||||
DatabaseMapping.source_env_id == src_env_rt.id,
|
||||
DatabaseMapping.target_env_id == tgt_env_rt.id
|
||||
).all()
|
||||
db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
success = engine.transform_zip(
|
||||
str(tmp_zip_path),
|
||||
str(tmp_new_zip),
|
||||
db_mapping,
|
||||
strip_databases=False,
|
||||
target_env_id=tgt_env.id if tgt_env else None,
|
||||
fix_cross_filters=fix_cross_filters
|
||||
)
|
||||
|
||||
if success:
|
||||
app_logger.reason("Pushing transformed ZIP to target Superset")
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
|
||||
migration_result["migrated_dashboards"].append({"id": dash_id, "title": title})
|
||||
app_logger.reflect("Import successful", extra={"title": title})
|
||||
else:
|
||||
app_logger.explore("Transformation strictly failed, bypassing ingestion")
|
||||
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id, "title": title, "error": "Failed to transform ZIP"
|
||||
})
|
||||
|
||||
except Exception as exc:
|
||||
error_msg = str(exc)
|
||||
if "Must provide a password for the database" in error_msg:
|
||||
db_name = "unknown"
|
||||
match = re.search(r"databases/([^.]+)\.yaml", error_msg)
|
||||
if match:
|
||||
db_name = match.group(1)
|
||||
else:
|
||||
match_alt = re.search(r"database '([^']+)'", error_msg)
|
||||
if match_alt:
|
||||
db_name = match_alt.group(1)
|
||||
|
||||
app_logger.explore(f"Missing DB password detected during ingestion. Escalating to UI.", extra={"db_name": db_name})
|
||||
|
||||
if task_id:
|
||||
tm.await_input(task_id, {
|
||||
"type": "database_password",
|
||||
"databases": [db_name],
|
||||
"error_message": error_msg
|
||||
})
|
||||
|
||||
await tm.wait_for_input(task_id)
|
||||
task = tm.get_task(task_id)
|
||||
passwords = task.params.get("passwords", {})
|
||||
|
||||
if passwords:
|
||||
app_logger.reason(f"Retrying import for {title} with injected credentials")
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
|
||||
migration_result["migrated_dashboards"].append({"id": dash_id, "title": title})
|
||||
app_logger.reflect("Password injection unblocked import")
|
||||
if "passwords" in task.params:
|
||||
del task.params["passwords"]
|
||||
continue
|
||||
|
||||
app_logger.explore(f"Catastrophic dashboard ingestion failure: {exc}")
|
||||
migration_result["failed_dashboards"].append({"id": dash_id, "title": title, "error": str(exc)})
|
||||
|
||||
if migration_result["failed_dashboards"]:
|
||||
migration_result["status"] = "PARTIAL_SUCCESS"
|
||||
|
||||
# Post-Migration ID Mapping Synchronization
|
||||
try:
|
||||
app_logger.reason("Executing incremental ID catalog sync on target")
|
||||
db_session = SessionLocal()
|
||||
mapping_service = IdMappingService(db_session)
|
||||
mapping_service.sync_environment(tgt_env.id, to_c, incremental=True)
|
||||
db_session.close()
|
||||
app_logger.reflect("Incremental catalog sync closed out cleanly")
|
||||
except Exception as sync_exc:
|
||||
app_logger.explore(f"ID Mapping sync failed, mapping state might be degraded: {sync_exc}")
|
||||
|
||||
app_logger.reflect("Migration cycle fully resolved", extra={"result": migration_result})
|
||||
return migration_result
|
||||
|
||||
except Exception as e:
|
||||
app_logger.explore(f"Fatal plugin failure: {e}", exc_info=True)
|
||||
raise e
|
||||
# [/DEF:execute:Function]
|
||||
# [/DEF:MigrationPlugin:Class]
|
||||
# [/DEF:MigrationPlugin:Module]
|
||||
# [/DEF:MigrationPlugin:Module]
|
||||
Reference in New Issue
Block a user