{ "verdict": "APPROVED", "rejection_reason": "NONE", "audit_details": { "target_invoked": true, "pre_conditions_tested": true, "post_conditions_tested": true, "test_data_used": true }, "feedback": "The test suite robustly verifies the
MigrationEngine contracts. It avoids Tautologies by cleanly substituting IdMappingService without mocking the engine itself. Cross-filter parsing asserts against hard-coded, predefined validation dictionaries (no Logic Mirroring). It successfully addresses @PRE negative cases (e.g. invalid zip paths, missing YAMLs) and rigorously validates @POST file transformations (e.g. in-place UUID substitutions and archive reconstruction)." }
This commit is contained in:
@@ -25,6 +25,8 @@ from src.core.logger import logger as app_logger, belief_scope
|
||||
from src.core.config_manager import ConfigManager
|
||||
from src.core.superset_client import SupersetClient
|
||||
from src.core.task_manager.context import TaskContext
|
||||
from src.core.database import SessionLocal
|
||||
from src.core.mapping_service import IdMappingService
|
||||
# [/SECTION]
|
||||
|
||||
# [DEF:GitPlugin:Class]
|
||||
|
||||
@@ -18,6 +18,7 @@ from ..dependencies import get_config_manager
|
||||
from ..core.migration_engine import MigrationEngine
|
||||
from ..core.database import SessionLocal
|
||||
from ..models.mapping import DatabaseMapping, Environment
|
||||
from ..core.mapping_service import IdMappingService
|
||||
from ..core.task_manager.context import TaskContext
|
||||
|
||||
# [DEF:MigrationPlugin:Class]
|
||||
@@ -165,11 +166,11 @@ class MigrationPlugin(PluginBase):
|
||||
superset_log = log.with_source("superset_api") if context else log
|
||||
migration_log = log.with_source("migration") if context else log
|
||||
|
||||
log.info("Starting migration task.")
|
||||
log.debug(f"Params: {params}")
|
||||
|
||||
try:
|
||||
with belief_scope("execute"):
|
||||
log.info("Starting migration task.")
|
||||
log.debug(f"Params: {params}")
|
||||
|
||||
try:
|
||||
with belief_scope("execute"):
|
||||
config_manager = get_config_manager()
|
||||
environments = config_manager.get_environments()
|
||||
|
||||
@@ -192,20 +193,20 @@ class MigrationPlugin(PluginBase):
|
||||
|
||||
from_env_name = src_env.name
|
||||
to_env_name = tgt_env.name
|
||||
|
||||
log.info(f"Resolved environments: {from_env_name} -> {to_env_name}")
|
||||
migration_result = {
|
||||
"status": "SUCCESS",
|
||||
"source_environment": from_env_name,
|
||||
"target_environment": to_env_name,
|
||||
"selected_dashboards": 0,
|
||||
"migrated_dashboards": [],
|
||||
"failed_dashboards": [],
|
||||
"mapping_count": 0
|
||||
}
|
||||
|
||||
from_c = SupersetClient(src_env)
|
||||
to_c = SupersetClient(tgt_env)
|
||||
|
||||
log.info(f"Resolved environments: {from_env_name} -> {to_env_name}")
|
||||
migration_result = {
|
||||
"status": "SUCCESS",
|
||||
"source_environment": from_env_name,
|
||||
"target_environment": to_env_name,
|
||||
"selected_dashboards": 0,
|
||||
"migrated_dashboards": [],
|
||||
"failed_dashboards": [],
|
||||
"mapping_count": 0
|
||||
}
|
||||
|
||||
from_c = SupersetClient(src_env)
|
||||
to_c = SupersetClient(tgt_env)
|
||||
|
||||
if not from_c or not to_c:
|
||||
raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}")
|
||||
@@ -213,24 +214,24 @@ class MigrationPlugin(PluginBase):
|
||||
_, all_dashboards = from_c.get_dashboards()
|
||||
|
||||
dashboards_to_migrate = []
|
||||
if selected_ids:
|
||||
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
|
||||
elif dashboard_regex:
|
||||
regex_str = str(dashboard_regex)
|
||||
dashboards_to_migrate = [
|
||||
if selected_ids:
|
||||
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
|
||||
elif dashboard_regex:
|
||||
regex_str = str(dashboard_regex)
|
||||
dashboards_to_migrate = [
|
||||
d for d in all_dashboards if re.search(regex_str, d["dashboard_title"], re.IGNORECASE)
|
||||
]
|
||||
else:
|
||||
log.warning("No selection criteria provided (selected_ids or dashboard_regex).")
|
||||
migration_result["status"] = "NO_SELECTION"
|
||||
return migration_result
|
||||
|
||||
if not dashboards_to_migrate:
|
||||
log.warning("No dashboards found matching criteria.")
|
||||
migration_result["status"] = "NO_MATCHES"
|
||||
return migration_result
|
||||
|
||||
migration_result["selected_dashboards"] = len(dashboards_to_migrate)
|
||||
else:
|
||||
log.warning("No selection criteria provided (selected_ids or dashboard_regex).")
|
||||
migration_result["status"] = "NO_SELECTION"
|
||||
return migration_result
|
||||
|
||||
if not dashboards_to_migrate:
|
||||
log.warning("No dashboards found matching criteria.")
|
||||
migration_result["status"] = "NO_MATCHES"
|
||||
return migration_result
|
||||
|
||||
migration_result["selected_dashboards"] = len(dashboards_to_migrate)
|
||||
|
||||
# Get mappings from params
|
||||
db_mapping = params.get("db_mappings", {})
|
||||
@@ -251,18 +252,18 @@ class MigrationPlugin(PluginBase):
|
||||
DatabaseMapping.target_env_id == tgt_env_db.id
|
||||
).all()
|
||||
# Provided mappings override stored ones
|
||||
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
|
||||
stored_map_dict.update(db_mapping)
|
||||
db_mapping = stored_map_dict
|
||||
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
migration_result["mapping_count"] = len(db_mapping)
|
||||
engine = MigrationEngine()
|
||||
|
||||
for dash in dashboards_to_migrate:
|
||||
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
|
||||
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
|
||||
stored_map_dict.update(db_mapping)
|
||||
db_mapping = stored_map_dict
|
||||
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
migration_result["mapping_count"] = len(db_mapping)
|
||||
engine = MigrationEngine()
|
||||
|
||||
for dash in dashboards_to_migrate:
|
||||
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
|
||||
|
||||
try:
|
||||
exported_content, _ = from_c.export_dashboard(dash_id)
|
||||
@@ -293,22 +294,22 @@ class MigrationPlugin(PluginBase):
|
||||
db.close()
|
||||
success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping, strip_databases=False)
|
||||
|
||||
if success:
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
else:
|
||||
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": "Failed to transform ZIP"
|
||||
})
|
||||
|
||||
superset_log.info(f"Dashboard {title} imported.")
|
||||
except Exception as exc:
|
||||
if success:
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
else:
|
||||
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": "Failed to transform ZIP"
|
||||
})
|
||||
|
||||
superset_log.info(f"Dashboard {title} imported.")
|
||||
except Exception as exc:
|
||||
# Check for password error
|
||||
error_msg = str(exc)
|
||||
# The error message from Superset is often a JSON string inside a string.
|
||||
@@ -347,34 +348,45 @@ class MigrationPlugin(PluginBase):
|
||||
passwords = task.params.get("passwords", {})
|
||||
|
||||
# Retry import with password
|
||||
if passwords:
|
||||
app_logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
|
||||
app_logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
# Clear passwords from params after use for security
|
||||
if "passwords" in task.params:
|
||||
del task.params["passwords"]
|
||||
continue
|
||||
|
||||
app_logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": str(exc)
|
||||
})
|
||||
|
||||
app_logger.info("[MigrationPlugin][Exit] Migration finished.")
|
||||
if migration_result["failed_dashboards"]:
|
||||
migration_result["status"] = "PARTIAL_SUCCESS"
|
||||
return migration_result
|
||||
except Exception as e:
|
||||
app_logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
|
||||
raise e
|
||||
if passwords:
|
||||
app_logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
|
||||
app_logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
# Clear passwords from params after use for security
|
||||
if "passwords" in task.params:
|
||||
del task.params["passwords"]
|
||||
continue
|
||||
|
||||
app_logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": str(exc)
|
||||
})
|
||||
|
||||
app_logger.info("[MigrationPlugin][Exit] Migration finished.")
|
||||
if migration_result["failed_dashboards"]:
|
||||
migration_result["status"] = "PARTIAL_SUCCESS"
|
||||
|
||||
# Perform incremental sync to rapidly update local mappings with new imported resources
|
||||
try:
|
||||
db_session = SessionLocal()
|
||||
mapping_service = IdMappingService(db_session)
|
||||
mapping_service.sync_environment(tgt_env.id, to_c, incremental=True)
|
||||
db_session.close()
|
||||
log.info(f"[MigrationPlugin][Action] Completed incremental sync for target environment {to_env_name}")
|
||||
except Exception as sync_exc:
|
||||
log.error(f"[MigrationPlugin][Error] Failed incremental sync for {to_env_name}: {sync_exc}")
|
||||
|
||||
return migration_result
|
||||
except Exception as e:
|
||||
app_logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
|
||||
raise e
|
||||
# [/DEF:MigrationPlugin.execute:Action]
|
||||
# [/DEF:execute:Function]
|
||||
# [/DEF:MigrationPlugin:Class]
|
||||
# [/DEF:MigrationPlugin:Module]
|
||||
# [/DEF:MigrationPlugin:Module]
|
||||
|
||||
Reference in New Issue
Block a user