semantic update
This commit is contained in:
@@ -26,7 +26,7 @@ from ...dependencies import get_config_manager, get_task_manager, has_permission
|
||||
from ...core.database import get_db
|
||||
from ...models.dashboard import DashboardMetadata, DashboardSelection
|
||||
from ...core.superset_client import SupersetClient
|
||||
from ...core.logger import belief_scope
|
||||
from ...core.logger import logger, belief_scope
|
||||
from ...core.migration.dry_run_orchestrator import MigrationDryRunService
|
||||
from ...core.mapping_service import IdMappingService
|
||||
from ...models.mapping import ResourceMapping
|
||||
@@ -46,14 +46,18 @@ async def get_dashboards(
|
||||
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
|
||||
):
|
||||
with belief_scope("get_dashboards", f"env_id={env_id}"):
|
||||
logger.reason(f"Fetching dashboards for environment: {env_id}")
|
||||
environments = config_manager.get_environments()
|
||||
env = next((e for e in environments if e.id == env_id), None)
|
||||
if not env:
|
||||
raise HTTPException(status_code=404, detail="Environment not found")
|
||||
env = next((e for e in environments if e.id == env_id), None)
|
||||
|
||||
if not env:
|
||||
logger.explore(f"Environment {env_id} not found in configuration")
|
||||
raise HTTPException(status_code=404, detail="Environment not found")
|
||||
|
||||
client = SupersetClient(env)
|
||||
dashboards = client.get_dashboards_summary()
|
||||
return dashboards
|
||||
client = SupersetClient(env)
|
||||
dashboards = client.get_dashboards_summary()
|
||||
logger.reflect(f"Retrieved {len(dashboards)} dashboards from {env_id}")
|
||||
return dashboards
|
||||
# [/DEF:get_dashboards:Function]
|
||||
|
||||
# [DEF:execute_migration:Function]
|
||||
@@ -70,31 +74,30 @@ async def execute_migration(
|
||||
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
|
||||
):
|
||||
with belief_scope("execute_migration"):
|
||||
logger.reason(f"Initiating migration from {selection.source_env_id} to {selection.target_env_id}")
|
||||
|
||||
# Validate environments exist
|
||||
environments = config_manager.get_environments()
|
||||
env_ids = {e.id for e in environments}
|
||||
if selection.source_env_id not in env_ids or selection.target_env_id not in env_ids:
|
||||
raise HTTPException(status_code=400, detail="Invalid source or target environment")
|
||||
env_ids = {e.id for e in environments}
|
||||
|
||||
if selection.source_env_id not in env_ids or selection.target_env_id not in env_ids:
|
||||
logger.explore("Invalid environment selection", extra={"source": selection.source_env_id, "target": selection.target_env_id})
|
||||
raise HTTPException(status_code=400, detail="Invalid source or target environment")
|
||||
|
||||
# Create migration task with debug logging
|
||||
from ...core.logger import logger
|
||||
|
||||
# Include replace_db_config and fix_cross_filters in the task parameters
|
||||
task_params = selection.dict()
|
||||
task_params['replace_db_config'] = selection.replace_db_config
|
||||
task_params['fix_cross_filters'] = selection.fix_cross_filters
|
||||
|
||||
logger.info(f"Creating migration task with params: {task_params}")
|
||||
logger.info(f"Available environments: {env_ids}")
|
||||
logger.info(f"Source env: {selection.source_env_id}, Target env: {selection.target_env_id}")
|
||||
|
||||
try:
|
||||
task = await task_manager.create_task("superset-migration", task_params)
|
||||
logger.info(f"Task created successfully: {task.id}")
|
||||
return {"task_id": task.id, "message": "Migration initiated"}
|
||||
except Exception as e:
|
||||
logger.error(f"Task creation failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to create migration task: {str(e)}")
|
||||
# Include replace_db_config and fix_cross_filters in the task parameters
|
||||
task_params = selection.dict()
|
||||
task_params['replace_db_config'] = selection.replace_db_config
|
||||
task_params['fix_cross_filters'] = selection.fix_cross_filters
|
||||
|
||||
logger.reason(f"Creating migration task with {len(selection.selected_ids)} dashboards")
|
||||
|
||||
try:
|
||||
task = await task_manager.create_task("superset-migration", task_params)
|
||||
logger.reflect(f"Migration task created: {task.id}")
|
||||
return {"task_id": task.id, "message": "Migration initiated"}
|
||||
except Exception as e:
|
||||
logger.explore(f"Task creation failed: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to create migration task: {str(e)}")
|
||||
# [/DEF:execute_migration:Function]
|
||||
|
||||
|
||||
@@ -112,29 +115,41 @@ async def dry_run_migration(
|
||||
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
|
||||
):
|
||||
with belief_scope("dry_run_migration"):
|
||||
logger.reason(f"Starting dry run: {selection.source_env_id} -> {selection.target_env_id}")
|
||||
|
||||
environments = config_manager.get_environments()
|
||||
env_map = {env.id: env for env in environments}
|
||||
source_env = env_map.get(selection.source_env_id)
|
||||
target_env = env_map.get(selection.target_env_id)
|
||||
if not source_env or not target_env:
|
||||
raise HTTPException(status_code=400, detail="Invalid source or target environment")
|
||||
if selection.source_env_id == selection.target_env_id:
|
||||
raise HTTPException(status_code=400, detail="Source and target environments must be different")
|
||||
if not selection.selected_ids:
|
||||
raise HTTPException(status_code=400, detail="No dashboards selected for dry run")
|
||||
env_map = {env.id: env for env in environments}
|
||||
source_env = env_map.get(selection.source_env_id)
|
||||
target_env = env_map.get(selection.target_env_id)
|
||||
|
||||
if not source_env or not target_env:
|
||||
logger.explore("Invalid environment selection for dry run")
|
||||
raise HTTPException(status_code=400, detail="Invalid source or target environment")
|
||||
|
||||
if selection.source_env_id == selection.target_env_id:
|
||||
logger.explore("Source and target environments are identical")
|
||||
raise HTTPException(status_code=400, detail="Source and target environments must be different")
|
||||
|
||||
if not selection.selected_ids:
|
||||
logger.explore("No dashboards selected for dry run")
|
||||
raise HTTPException(status_code=400, detail="No dashboards selected for dry run")
|
||||
|
||||
service = MigrationDryRunService()
|
||||
source_client = SupersetClient(source_env)
|
||||
target_client = SupersetClient(target_env)
|
||||
try:
|
||||
return service.run(
|
||||
selection=selection,
|
||||
source_client=source_client,
|
||||
target_client=target_client,
|
||||
db=db,
|
||||
)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||||
service = MigrationDryRunService()
|
||||
source_client = SupersetClient(source_env)
|
||||
target_client = SupersetClient(target_env)
|
||||
|
||||
try:
|
||||
result = service.run(
|
||||
selection=selection,
|
||||
source_client=source_client,
|
||||
target_client=target_client,
|
||||
db=db,
|
||||
)
|
||||
logger.reflect("Dry run analysis complete")
|
||||
return result
|
||||
except ValueError as exc:
|
||||
logger.explore(f"Dry run orchestrator failed: {exc}")
|
||||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||||
# [/DEF:dry_run_migration:Function]
|
||||
|
||||
# [DEF:get_migration_settings:Function]
|
||||
|
||||
@@ -32,7 +32,13 @@ class AuthRepository:
|
||||
# @DATA_CONTRACT: Input[Session] -> Output[None]
|
||||
def __init__(self, db: Session):
|
||||
with belief_scope("AuthRepository.__init__"):
|
||||
if not isinstance(db, Session):
|
||||
logger.explore("Invalid session provided to AuthRepository", extra={"type": type(db)})
|
||||
raise TypeError("db must be an instance of sqlalchemy.orm.Session")
|
||||
|
||||
logger.reason("Binding AuthRepository to database session")
|
||||
self.db = db
|
||||
logger.reflect("AuthRepository initialized")
|
||||
# [/DEF:__init__:Function]
|
||||
|
||||
# [DEF:get_user_by_username:Function]
|
||||
@@ -43,7 +49,17 @@ class AuthRepository:
|
||||
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
|
||||
def get_user_by_username(self, username: str) -> Optional[User]:
|
||||
with belief_scope("AuthRepository.get_user_by_username"):
|
||||
return self.db.query(User).filter(User.username == username).first()
|
||||
if not username or not isinstance(username, str):
|
||||
raise ValueError("username must be a non-empty string")
|
||||
|
||||
logger.reason(f"Querying user by username: {username}")
|
||||
user = self.db.query(User).filter(User.username == username).first()
|
||||
|
||||
if user:
|
||||
logger.reflect(f"User found: {username}")
|
||||
else:
|
||||
logger.explore(f"User not found: {username}")
|
||||
return user
|
||||
# [/DEF:get_user_by_username:Function]
|
||||
|
||||
# [DEF:get_user_by_id:Function]
|
||||
@@ -54,7 +70,17 @@ class AuthRepository:
|
||||
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
|
||||
def get_user_by_id(self, user_id: str) -> Optional[User]:
|
||||
with belief_scope("AuthRepository.get_user_by_id"):
|
||||
return self.db.query(User).filter(User.id == user_id).first()
|
||||
if not user_id or not isinstance(user_id, str):
|
||||
raise ValueError("user_id must be a non-empty string")
|
||||
|
||||
logger.reason(f"Querying user by ID: {user_id}")
|
||||
user = self.db.query(User).filter(User.id == user_id).first()
|
||||
|
||||
if user:
|
||||
logger.reflect(f"User found by ID: {user_id}")
|
||||
else:
|
||||
logger.explore(f"User not found by ID: {user_id}")
|
||||
return user
|
||||
# [/DEF:get_user_by_id:Function]
|
||||
|
||||
# [DEF:get_role_by_name:Function]
|
||||
@@ -76,10 +102,15 @@ class AuthRepository:
|
||||
# @DATA_CONTRACT: Input[User] -> Output[None]
|
||||
def update_last_login(self, user: User):
|
||||
with belief_scope("AuthRepository.update_last_login"):
|
||||
if not isinstance(user, User):
|
||||
raise TypeError("user must be an instance of User")
|
||||
|
||||
from datetime import datetime
|
||||
logger.reason(f"Updating last login for user: {user.username}")
|
||||
user.last_login = datetime.utcnow()
|
||||
self.db.add(user)
|
||||
self.db.commit()
|
||||
logger.reflect(f"Last login updated and committed for user: {user.username}")
|
||||
# [/DEF:update_last_login:Function]
|
||||
|
||||
# [DEF:get_role_by_id:Function]
|
||||
@@ -144,9 +175,14 @@ class AuthRepository:
|
||||
preference: UserDashboardPreference,
|
||||
) -> UserDashboardPreference:
|
||||
with belief_scope("AuthRepository.save_user_dashboard_preference"):
|
||||
if not isinstance(preference, UserDashboardPreference):
|
||||
raise TypeError("preference must be an instance of UserDashboardPreference")
|
||||
|
||||
logger.reason(f"Saving dashboard preference for user: {preference.user_id}")
|
||||
self.db.add(preference)
|
||||
self.db.commit()
|
||||
self.db.refresh(preference)
|
||||
logger.reflect(f"Dashboard preference saved and refreshed for user: {preference.user_id}")
|
||||
return preference
|
||||
# [/DEF:save_user_dashboard_preference:Function]
|
||||
|
||||
|
||||
@@ -36,18 +36,23 @@ class ConfigManager:
|
||||
# @SIDE_EFFECT: Reads config sources and updates logging configuration.
|
||||
# @DATA_CONTRACT: Input(str config_path) -> Output(None; self.config: AppConfig)
|
||||
def __init__(self, config_path: str = "config.json"):
|
||||
with belief_scope("__init__"):
|
||||
assert isinstance(config_path, str) and config_path, "config_path must be a non-empty string"
|
||||
with belief_scope("ConfigManager.__init__"):
|
||||
if not isinstance(config_path, str) or not config_path:
|
||||
logger.explore("Invalid config_path provided", extra={"path": config_path})
|
||||
raise ValueError("config_path must be a non-empty string")
|
||||
|
||||
logger.info(f"[ConfigManager][Entry] Initializing with legacy path {config_path}")
|
||||
logger.reason(f"Initializing ConfigManager with legacy path: {config_path}")
|
||||
|
||||
self.config_path = Path(config_path)
|
||||
self.config: AppConfig = self._load_config()
|
||||
|
||||
configure_logger(self.config.settings.logging)
|
||||
assert isinstance(self.config, AppConfig), "self.config must be an instance of AppConfig"
|
||||
|
||||
if not isinstance(self.config, AppConfig):
|
||||
logger.explore("Config loading resulted in invalid type", extra={"type": type(self.config)})
|
||||
raise TypeError("self.config must be an instance of AppConfig")
|
||||
|
||||
logger.info("[ConfigManager][Exit] Initialized")
|
||||
logger.reflect("ConfigManager initialization complete")
|
||||
# [/DEF:__init__:Function]
|
||||
|
||||
# [DEF:_default_config:Function]
|
||||
@@ -104,20 +109,23 @@ class ConfigManager:
|
||||
# @SIDE_EFFECT: Database read/write, possible migration write, logging.
|
||||
# @DATA_CONTRACT: Input(None) -> Output(AppConfig)
|
||||
def _load_config(self) -> AppConfig:
|
||||
with belief_scope("_load_config"):
|
||||
with belief_scope("ConfigManager._load_config"):
|
||||
session: Session = SessionLocal()
|
||||
try:
|
||||
record = self._get_record(session)
|
||||
if record and record.payload:
|
||||
logger.info("[_load_config][Coherence:OK] Configuration loaded from database")
|
||||
return AppConfig(**record.payload)
|
||||
logger.reason("Configuration found in database")
|
||||
config = AppConfig(**record.payload)
|
||||
logger.reflect("Database configuration validated")
|
||||
return config
|
||||
|
||||
logger.info("[_load_config][Action] No database config found, migrating legacy config")
|
||||
logger.reason("No database config found, initiating legacy migration")
|
||||
config = self._load_from_legacy_file()
|
||||
self._save_config_to_db(config, session=session)
|
||||
logger.reflect("Legacy configuration migrated to database")
|
||||
return config
|
||||
except Exception as e:
|
||||
logger.error(f"[_load_config][Coherence:Failed] Error loading config from DB: {e}")
|
||||
logger.explore(f"Error loading config from DB: {e}")
|
||||
return self._default_config()
|
||||
finally:
|
||||
session.close()
|
||||
@@ -130,8 +138,9 @@ class ConfigManager:
|
||||
# @SIDE_EFFECT: Database insert/update, commit/rollback, logging.
|
||||
# @DATA_CONTRACT: Input(AppConfig, Optional[Session]) -> Output(None)
|
||||
def _save_config_to_db(self, config: AppConfig, session: Optional[Session] = None):
|
||||
with belief_scope("_save_config_to_db"):
|
||||
assert isinstance(config, AppConfig), "config must be an instance of AppConfig"
|
||||
with belief_scope("ConfigManager._save_config_to_db"):
|
||||
if not isinstance(config, AppConfig):
|
||||
raise TypeError("config must be an instance of AppConfig")
|
||||
|
||||
owns_session = session is None
|
||||
db = session or SessionLocal()
|
||||
@@ -139,15 +148,17 @@ class ConfigManager:
|
||||
record = self._get_record(db)
|
||||
payload = config.model_dump()
|
||||
if record is None:
|
||||
logger.reason("Creating new global configuration record")
|
||||
record = AppConfigRecord(id="global", payload=payload)
|
||||
db.add(record)
|
||||
else:
|
||||
logger.reason("Updating existing global configuration record")
|
||||
record.payload = payload
|
||||
db.commit()
|
||||
logger.info("[_save_config_to_db][Action] Configuration saved to database")
|
||||
logger.reflect("Configuration successfully committed to database")
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"[_save_config_to_db][Coherence:Failed] Failed to save: {e}")
|
||||
logger.explore(f"Failed to save configuration: {e}")
|
||||
raise
|
||||
finally:
|
||||
if owns_session:
|
||||
@@ -183,14 +194,15 @@ class ConfigManager:
|
||||
# @SIDE_EFFECT: Mutates self.config, DB write, logger reconfiguration, logging.
|
||||
# @DATA_CONTRACT: Input(GlobalSettings) -> Output(None)
|
||||
def update_global_settings(self, settings: GlobalSettings):
|
||||
with belief_scope("update_global_settings"):
|
||||
logger.info("[update_global_settings][Entry] Updating settings")
|
||||
|
||||
assert isinstance(settings, GlobalSettings), "settings must be an instance of GlobalSettings"
|
||||
with belief_scope("ConfigManager.update_global_settings"):
|
||||
if not isinstance(settings, GlobalSettings):
|
||||
raise TypeError("settings must be an instance of GlobalSettings")
|
||||
|
||||
logger.reason("Updating global settings and persisting")
|
||||
self.config.settings = settings
|
||||
self.save()
|
||||
configure_logger(settings.logging)
|
||||
logger.info("[update_global_settings][Exit] Settings updated")
|
||||
logger.reflect("Global settings updated and logger reconfigured")
|
||||
# [/DEF:update_global_settings:Function]
|
||||
|
||||
# [DEF:validate_path:Function]
|
||||
@@ -257,14 +269,15 @@ class ConfigManager:
|
||||
# @SIDE_EFFECT: Mutates environment list, DB write, logging.
|
||||
# @DATA_CONTRACT: Input(Environment) -> Output(None)
|
||||
def add_environment(self, env: Environment):
|
||||
with belief_scope("add_environment"):
|
||||
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
|
||||
assert isinstance(env, Environment), "env must be an instance of Environment"
|
||||
with belief_scope("ConfigManager.add_environment"):
|
||||
if not isinstance(env, Environment):
|
||||
raise TypeError("env must be an instance of Environment")
|
||||
|
||||
logger.reason(f"Adding/Updating environment: {env.id}")
|
||||
self.config.environments = [e for e in self.config.environments if e.id != env.id]
|
||||
self.config.environments.append(env)
|
||||
self.save()
|
||||
logger.info("[add_environment][Exit] Environment added")
|
||||
logger.reflect(f"Environment {env.id} persisted")
|
||||
# [/DEF:add_environment:Function]
|
||||
|
||||
# [DEF:update_environment:Function]
|
||||
@@ -274,22 +287,25 @@ class ConfigManager:
|
||||
# @SIDE_EFFECT: May mutate environment list, DB write, logging.
|
||||
# @DATA_CONTRACT: Input(str env_id, Environment updated_env) -> Output(bool)
|
||||
def update_environment(self, env_id: str, updated_env: Environment) -> bool:
|
||||
with belief_scope("update_environment"):
|
||||
logger.info(f"[update_environment][Entry] Updating {env_id}")
|
||||
assert env_id and isinstance(env_id, str), "env_id must be a non-empty string"
|
||||
assert isinstance(updated_env, Environment), "updated_env must be an instance of Environment"
|
||||
with belief_scope("ConfigManager.update_environment"):
|
||||
if not env_id or not isinstance(env_id, str):
|
||||
raise ValueError("env_id must be a non-empty string")
|
||||
if not isinstance(updated_env, Environment):
|
||||
raise TypeError("updated_env must be an instance of Environment")
|
||||
|
||||
logger.reason(f"Attempting to update environment: {env_id}")
|
||||
for i, env in enumerate(self.config.environments):
|
||||
if env.id == env_id:
|
||||
if updated_env.password == "********":
|
||||
logger.reason("Preserving existing password for masked update")
|
||||
updated_env.password = env.password
|
||||
|
||||
self.config.environments[i] = updated_env
|
||||
self.save()
|
||||
logger.info(f"[update_environment][Coherence:OK] Updated {env_id}")
|
||||
logger.reflect(f"Environment {env_id} updated and saved")
|
||||
return True
|
||||
|
||||
logger.warning(f"[update_environment][Coherence:Failed] Environment {env_id} not found")
|
||||
logger.explore(f"Environment {env_id} not found for update")
|
||||
return False
|
||||
# [/DEF:update_environment:Function]
|
||||
|
||||
@@ -300,18 +316,19 @@ class ConfigManager:
|
||||
# @SIDE_EFFECT: May mutate environment list, conditional DB write, logging.
|
||||
# @DATA_CONTRACT: Input(str env_id) -> Output(None)
|
||||
def delete_environment(self, env_id: str):
|
||||
with belief_scope("delete_environment"):
|
||||
logger.info(f"[delete_environment][Entry] Deleting {env_id}")
|
||||
assert env_id and isinstance(env_id, str), "env_id must be a non-empty string"
|
||||
with belief_scope("ConfigManager.delete_environment"):
|
||||
if not env_id or not isinstance(env_id, str):
|
||||
raise ValueError("env_id must be a non-empty string")
|
||||
|
||||
logger.reason(f"Attempting to delete environment: {env_id}")
|
||||
original_count = len(self.config.environments)
|
||||
self.config.environments = [e for e in self.config.environments if e.id != env_id]
|
||||
|
||||
if len(self.config.environments) < original_count:
|
||||
self.save()
|
||||
logger.info(f"[delete_environment][Action] Deleted {env_id}")
|
||||
logger.reflect(f"Environment {env_id} deleted and configuration saved")
|
||||
else:
|
||||
logger.warning(f"[delete_environment][Coherence:Failed] Environment {env_id} not found")
|
||||
logger.explore(f"Environment {env_id} not found for deletion")
|
||||
# [/DEF:delete_environment:Function]
|
||||
|
||||
|
||||
|
||||
@@ -38,7 +38,9 @@ class MigrationEngine:
|
||||
# @PARAM: mapping_service (Optional[IdMappingService]) - Used for resolving target environment integer IDs.
|
||||
def __init__(self, mapping_service: Optional[IdMappingService] = None):
|
||||
with belief_scope("MigrationEngine.__init__"):
|
||||
logger.reason("Initializing MigrationEngine")
|
||||
self.mapping_service = mapping_service
|
||||
logger.reflect("MigrationEngine initialized")
|
||||
# [/DEF:__init__:Function]
|
||||
|
||||
# [DEF:transform_zip:Function]
|
||||
@@ -59,12 +61,14 @@ class MigrationEngine:
|
||||
Transform a Superset export ZIP by replacing database UUIDs and optionally fixing cross-filters.
|
||||
"""
|
||||
with belief_scope("MigrationEngine.transform_zip"):
|
||||
logger.reason(f"Starting ZIP transformation: {zip_path} -> {output_path}")
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir_str:
|
||||
temp_dir = Path(temp_dir_str)
|
||||
|
||||
try:
|
||||
# 1. Extract
|
||||
logger.info(f"[MigrationEngine.transform_zip][Action] Extracting ZIP: {zip_path}")
|
||||
logger.reason(f"Extracting source archive to {temp_dir}")
|
||||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||||
zf.extractall(temp_dir)
|
||||
|
||||
@@ -72,33 +76,33 @@ class MigrationEngine:
|
||||
dataset_files = list(temp_dir.glob("**/datasets/**/*.yaml")) + list(temp_dir.glob("**/datasets/*.yaml"))
|
||||
dataset_files = list(set(dataset_files))
|
||||
|
||||
logger.info(f"[MigrationEngine.transform_zip][State] Found {len(dataset_files)} dataset files.")
|
||||
logger.reason(f"Transforming {len(dataset_files)} dataset YAML files")
|
||||
for ds_file in dataset_files:
|
||||
logger.info(f"[MigrationEngine.transform_zip][Action] Transforming dataset: {ds_file}")
|
||||
self._transform_yaml(ds_file, db_mapping)
|
||||
|
||||
# 2.5 Patch Cross-Filters (Dashboards)
|
||||
if fix_cross_filters and self.mapping_service and target_env_id:
|
||||
dash_files = list(temp_dir.glob("**/dashboards/**/*.yaml")) + list(temp_dir.glob("**/dashboards/*.yaml"))
|
||||
dash_files = list(set(dash_files))
|
||||
|
||||
logger.info(f"[MigrationEngine.transform_zip][State] Found {len(dash_files)} dashboard files for patching.")
|
||||
|
||||
# Gather all source UUID-to-ID mappings from the archive first
|
||||
source_id_to_uuid_map = self._extract_chart_uuids_from_archive(temp_dir)
|
||||
|
||||
for dash_file in dash_files:
|
||||
logger.info(f"[MigrationEngine.transform_zip][Action] Patching dashboard: {dash_file}")
|
||||
self._patch_dashboard_metadata(dash_file, target_env_id, source_id_to_uuid_map)
|
||||
if fix_cross_filters:
|
||||
if self.mapping_service and target_env_id:
|
||||
dash_files = list(temp_dir.glob("**/dashboards/**/*.yaml")) + list(temp_dir.glob("**/dashboards/*.yaml"))
|
||||
dash_files = list(set(dash_files))
|
||||
|
||||
logger.reason(f"Patching cross-filters for {len(dash_files)} dashboards")
|
||||
|
||||
# Gather all source UUID-to-ID mappings from the archive first
|
||||
source_id_to_uuid_map = self._extract_chart_uuids_from_archive(temp_dir)
|
||||
|
||||
for dash_file in dash_files:
|
||||
self._patch_dashboard_metadata(dash_file, target_env_id, source_id_to_uuid_map)
|
||||
else:
|
||||
logger.explore("Cross-filter patching requested but mapping service or target_env_id is missing")
|
||||
|
||||
# 3. Re-package
|
||||
logger.info(f"[MigrationEngine.transform_zip][Action] Re-packaging ZIP to: {output_path} (strip_databases={strip_databases})")
|
||||
logger.reason(f"Re-packaging transformed archive (strip_databases={strip_databases})")
|
||||
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||||
for root, dirs, files in os.walk(temp_dir):
|
||||
rel_root = Path(root).relative_to(temp_dir)
|
||||
|
||||
if strip_databases and "databases" in rel_root.parts:
|
||||
logger.info(f"[MigrationEngine.transform_zip][Action] Skipping file in databases directory: {rel_root}")
|
||||
continue
|
||||
|
||||
for file in files:
|
||||
@@ -106,9 +110,10 @@ class MigrationEngine:
|
||||
arcname = file_path.relative_to(temp_dir)
|
||||
zf.write(file_path, arcname)
|
||||
|
||||
logger.reflect("ZIP transformation completed successfully")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"[MigrationEngine.transform_zip][Coherence:Failed] Error transforming ZIP: {e}")
|
||||
logger.explore(f"Error transforming ZIP: {e}")
|
||||
return False
|
||||
# [/DEF:transform_zip:Function]
|
||||
|
||||
@@ -122,19 +127,23 @@ class MigrationEngine:
|
||||
# @DATA_CONTRACT: Input[(Path file_path, Dict[str,str] db_mapping)] -> Output[None]
|
||||
def _transform_yaml(self, file_path: Path, db_mapping: Dict[str, str]):
|
||||
with belief_scope("MigrationEngine._transform_yaml"):
|
||||
if not file_path.exists():
|
||||
logger.explore(f"YAML file not found: {file_path}")
|
||||
return
|
||||
|
||||
with open(file_path, 'r') as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data:
|
||||
return
|
||||
|
||||
# Superset dataset YAML structure:
|
||||
# database_uuid: ...
|
||||
source_uuid = data.get('database_uuid')
|
||||
if source_uuid in db_mapping:
|
||||
logger.reason(f"Replacing database UUID in {file_path.name}")
|
||||
data['database_uuid'] = db_mapping[source_uuid]
|
||||
with open(file_path, 'w') as f:
|
||||
yaml.dump(data, f)
|
||||
logger.reflect(f"Database UUID patched in {file_path.name}")
|
||||
# [/DEF:_transform_yaml:Function]
|
||||
|
||||
# [DEF:_extract_chart_uuids_from_archive:Function]
|
||||
@@ -176,6 +185,9 @@ class MigrationEngine:
|
||||
def _patch_dashboard_metadata(self, file_path: Path, target_env_id: str, source_map: Dict[int, str]):
|
||||
with belief_scope("MigrationEngine._patch_dashboard_metadata"):
|
||||
try:
|
||||
if not file_path.exists():
|
||||
return
|
||||
|
||||
with open(file_path, 'r') as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
@@ -186,18 +198,13 @@ class MigrationEngine:
|
||||
if not metadata_str:
|
||||
return
|
||||
|
||||
metadata = json.loads(metadata_str)
|
||||
modified = False
|
||||
|
||||
# We need to deeply traverse and replace. For MVP, string replacement over the raw JSON is an option,
|
||||
# but careful dict traversal is safer.
|
||||
|
||||
# Fetch target UUIDs for everything we know:
|
||||
uuids_needed = list(source_map.values())
|
||||
logger.reason(f"Resolving {len(uuids_needed)} remote IDs for dashboard metadata patching")
|
||||
target_ids = self.mapping_service.get_remote_ids_batch(target_env_id, ResourceType.CHART, uuids_needed)
|
||||
|
||||
if not target_ids:
|
||||
logger.info("[MigrationEngine._patch_dashboard_metadata][Reflect] No remote target IDs found in mapping database.")
|
||||
logger.reflect("No remote target IDs found in mapping database for this dashboard.")
|
||||
return
|
||||
|
||||
# Map Source Int -> Target Int
|
||||
@@ -210,21 +217,16 @@ class MigrationEngine:
|
||||
missing_targets.append(s_id)
|
||||
|
||||
if missing_targets:
|
||||
logger.warning(f"[MigrationEngine._patch_dashboard_metadata][Coherence:Recoverable] Missing target IDs for source IDs: {missing_targets}. Cross-filters for these IDs might break.")
|
||||
logger.explore(f"Missing target IDs for source IDs: {missing_targets}. Cross-filters might break.")
|
||||
|
||||
if not source_to_target:
|
||||
logger.info("[MigrationEngine._patch_dashboard_metadata][Reflect] No source IDs matched remotely. Skipping patch.")
|
||||
logger.reflect("No source IDs matched remotely. Skipping patch.")
|
||||
return
|
||||
|
||||
# Complex metadata traversal would go here (e.g. for native_filter_configuration)
|
||||
# We use regex replacement over the string for safety over unknown nested dicts.
|
||||
|
||||
logger.reason(f"Patching {len(source_to_target)} ID references in json_metadata")
|
||||
new_metadata_str = metadata_str
|
||||
|
||||
# Replace chartId and datasetId assignments explicitly.
|
||||
# Pattern: "datasetId": 42 or "chartId": 42
|
||||
for s_id, t_id in source_to_target.items():
|
||||
# Replace in native_filter_configuration targets
|
||||
new_metadata_str = re.sub(r'("datasetId"\s*:\s*)' + str(s_id) + r'(\b)', r'\g<1>' + str(t_id) + r'\g<2>', new_metadata_str)
|
||||
new_metadata_str = re.sub(r'("chartId"\s*:\s*)' + str(s_id) + r'(\b)', r'\g<1>' + str(t_id) + r'\g<2>', new_metadata_str)
|
||||
|
||||
@@ -233,10 +235,10 @@ class MigrationEngine:
|
||||
|
||||
with open(file_path, 'w') as f:
|
||||
yaml.dump(data, f)
|
||||
logger.info(f"[MigrationEngine._patch_dashboard_metadata][Reason] Re-serialized modified JSON metadata for dashboard.")
|
||||
logger.reflect(f"Dashboard metadata patched and saved: {file_path.name}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[MigrationEngine._patch_dashboard_metadata][Coherence:Failed] Metadata patch failed: {e}")
|
||||
logger.explore(f"Metadata patch failed for {file_path.name}: {e}")
|
||||
|
||||
# [/DEF:_patch_dashboard_metadata:Function]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user