feat: restore legacy data and add typed task result views

This commit is contained in:
2026-02-21 23:17:56 +03:00
parent d67d24e7e6
commit 5f70a239a7
10 changed files with 411 additions and 195 deletions

View File

@@ -165,11 +165,11 @@ class MigrationPlugin(PluginBase):
superset_log = log.with_source("superset_api") if context else log
migration_log = log.with_source("migration") if context else log
log.info("Starting migration task.")
log.debug(f"Params: {params}")
try:
with belief_scope("execute"):
log.info("Starting migration task.")
log.debug(f"Params: {params}")
try:
with belief_scope("execute"):
config_manager = get_config_manager()
environments = config_manager.get_environments()
@@ -192,11 +192,20 @@ class MigrationPlugin(PluginBase):
from_env_name = src_env.name
to_env_name = tgt_env.name
log.info(f"Resolved environments: {from_env_name} -> {to_env_name}")
from_c = SupersetClient(src_env)
to_c = SupersetClient(tgt_env)
log.info(f"Resolved environments: {from_env_name} -> {to_env_name}")
migration_result = {
"status": "SUCCESS",
"source_environment": from_env_name,
"target_environment": to_env_name,
"selected_dashboards": 0,
"migrated_dashboards": [],
"failed_dashboards": [],
"mapping_count": 0
}
from_c = SupersetClient(src_env)
to_c = SupersetClient(tgt_env)
if not from_c or not to_c:
raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}")
@@ -204,20 +213,24 @@ class MigrationPlugin(PluginBase):
_, all_dashboards = from_c.get_dashboards()
dashboards_to_migrate = []
if selected_ids:
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
elif dashboard_regex:
regex_str = str(dashboard_regex)
dashboards_to_migrate = [
if selected_ids:
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
elif dashboard_regex:
regex_str = str(dashboard_regex)
dashboards_to_migrate = [
d for d in all_dashboards if re.search(regex_str, d["dashboard_title"], re.IGNORECASE)
]
else:
log.warning("No selection criteria provided (selected_ids or dashboard_regex).")
return
if not dashboards_to_migrate:
log.warning("No dashboards found matching criteria.")
return
else:
log.warning("No selection criteria provided (selected_ids or dashboard_regex).")
migration_result["status"] = "NO_SELECTION"
return migration_result
if not dashboards_to_migrate:
log.warning("No dashboards found matching criteria.")
migration_result["status"] = "NO_MATCHES"
return migration_result
migration_result["selected_dashboards"] = len(dashboards_to_migrate)
# Get mappings from params
db_mapping = params.get("db_mappings", {})
@@ -238,17 +251,18 @@ class MigrationPlugin(PluginBase):
DatabaseMapping.target_env_id == tgt_env_db.id
).all()
# Provided mappings override stored ones
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
stored_map_dict.update(db_mapping)
db_mapping = stored_map_dict
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
finally:
db.close()
engine = MigrationEngine()
for dash in dashboards_to_migrate:
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
stored_map_dict.update(db_mapping)
db_mapping = stored_map_dict
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
finally:
db.close()
migration_result["mapping_count"] = len(db_mapping)
engine = MigrationEngine()
for dash in dashboards_to_migrate:
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
try:
exported_content, _ = from_c.export_dashboard(dash_id)
@@ -279,13 +293,22 @@ class MigrationPlugin(PluginBase):
db.close()
success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping, strip_databases=False)
if success:
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
else:
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
superset_log.info(f"Dashboard {title} imported.")
except Exception as exc:
if success:
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
migration_result["migrated_dashboards"].append({
"id": dash_id,
"title": title
})
else:
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
migration_result["failed_dashboards"].append({
"id": dash_id,
"title": title,
"error": "Failed to transform ZIP"
})
superset_log.info(f"Dashboard {title} imported.")
except Exception as exc:
# Check for password error
error_msg = str(exc)
# The error message from Superset is often a JSON string inside a string.
@@ -324,22 +347,34 @@ class MigrationPlugin(PluginBase):
passwords = task.params.get("passwords", {})
# Retry import with password
if passwords:
app_logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
app_logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
# Clear passwords from params after use for security
if "passwords" in task.params:
del task.params["passwords"]
continue
app_logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
app_logger.info("[MigrationPlugin][Exit] Migration finished.")
except Exception as e:
app_logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
raise e
if passwords:
app_logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
app_logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
migration_result["migrated_dashboards"].append({
"id": dash_id,
"title": title
})
# Clear passwords from params after use for security
if "passwords" in task.params:
del task.params["passwords"]
continue
app_logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
migration_result["failed_dashboards"].append({
"id": dash_id,
"title": title,
"error": str(exc)
})
app_logger.info("[MigrationPlugin][Exit] Migration finished.")
if migration_result["failed_dashboards"]:
migration_result["status"] = "PARTIAL_SUCCESS"
return migration_result
except Exception as e:
app_logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
raise e
# [/DEF:MigrationPlugin.execute:Action]
# [/DEF:execute:Function]
# [/DEF:MigrationPlugin:Class]
# [/DEF:MigrationPlugin:Module]
# [/DEF:MigrationPlugin:Module]