semantic update

This commit is contained in:
2026-03-10 21:33:09 +03:00
parent 4f1c33e02b
commit 5e527a8af2
10 changed files with 2084 additions and 1499 deletions

View File

@@ -26,7 +26,7 @@ from ...dependencies import get_config_manager, get_task_manager, has_permission
from ...core.database import get_db
from ...models.dashboard import DashboardMetadata, DashboardSelection
from ...core.superset_client import SupersetClient
from ...core.logger import belief_scope
from ...core.logger import logger, belief_scope
from ...core.migration.dry_run_orchestrator import MigrationDryRunService
from ...core.mapping_service import IdMappingService
from ...models.mapping import ResourceMapping
@@ -46,14 +46,18 @@ async def get_dashboards(
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
):
with belief_scope("get_dashboards", f"env_id={env_id}"):
logger.reason(f"Fetching dashboards for environment: {env_id}")
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == env_id), None)
if not env:
raise HTTPException(status_code=404, detail="Environment not found")
env = next((e for e in environments if e.id == env_id), None)
if not env:
logger.explore(f"Environment {env_id} not found in configuration")
raise HTTPException(status_code=404, detail="Environment not found")
client = SupersetClient(env)
dashboards = client.get_dashboards_summary()
return dashboards
client = SupersetClient(env)
dashboards = client.get_dashboards_summary()
logger.reflect(f"Retrieved {len(dashboards)} dashboards from {env_id}")
return dashboards
# [/DEF:get_dashboards:Function]
# [DEF:execute_migration:Function]
@@ -70,31 +74,30 @@ async def execute_migration(
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
):
with belief_scope("execute_migration"):
logger.reason(f"Initiating migration from {selection.source_env_id} to {selection.target_env_id}")
# Validate environments exist
environments = config_manager.get_environments()
env_ids = {e.id for e in environments}
if selection.source_env_id not in env_ids or selection.target_env_id not in env_ids:
raise HTTPException(status_code=400, detail="Invalid source or target environment")
env_ids = {e.id for e in environments}
if selection.source_env_id not in env_ids or selection.target_env_id not in env_ids:
logger.explore("Invalid environment selection", extra={"source": selection.source_env_id, "target": selection.target_env_id})
raise HTTPException(status_code=400, detail="Invalid source or target environment")
# Create migration task with debug logging
from ...core.logger import logger
# Include replace_db_config and fix_cross_filters in the task parameters
task_params = selection.dict()
task_params['replace_db_config'] = selection.replace_db_config
task_params['fix_cross_filters'] = selection.fix_cross_filters
logger.info(f"Creating migration task with params: {task_params}")
logger.info(f"Available environments: {env_ids}")
logger.info(f"Source env: {selection.source_env_id}, Target env: {selection.target_env_id}")
try:
task = await task_manager.create_task("superset-migration", task_params)
logger.info(f"Task created successfully: {task.id}")
return {"task_id": task.id, "message": "Migration initiated"}
except Exception as e:
logger.error(f"Task creation failed: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create migration task: {str(e)}")
# Include replace_db_config and fix_cross_filters in the task parameters
task_params = selection.dict()
task_params['replace_db_config'] = selection.replace_db_config
task_params['fix_cross_filters'] = selection.fix_cross_filters
logger.reason(f"Creating migration task with {len(selection.selected_ids)} dashboards")
try:
task = await task_manager.create_task("superset-migration", task_params)
logger.reflect(f"Migration task created: {task.id}")
return {"task_id": task.id, "message": "Migration initiated"}
except Exception as e:
logger.explore(f"Task creation failed: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create migration task: {str(e)}")
# [/DEF:execute_migration:Function]
@@ -112,29 +115,41 @@ async def dry_run_migration(
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
):
with belief_scope("dry_run_migration"):
logger.reason(f"Starting dry run: {selection.source_env_id} -> {selection.target_env_id}")
environments = config_manager.get_environments()
env_map = {env.id: env for env in environments}
source_env = env_map.get(selection.source_env_id)
target_env = env_map.get(selection.target_env_id)
if not source_env or not target_env:
raise HTTPException(status_code=400, detail="Invalid source or target environment")
if selection.source_env_id == selection.target_env_id:
raise HTTPException(status_code=400, detail="Source and target environments must be different")
if not selection.selected_ids:
raise HTTPException(status_code=400, detail="No dashboards selected for dry run")
env_map = {env.id: env for env in environments}
source_env = env_map.get(selection.source_env_id)
target_env = env_map.get(selection.target_env_id)
if not source_env or not target_env:
logger.explore("Invalid environment selection for dry run")
raise HTTPException(status_code=400, detail="Invalid source or target environment")
if selection.source_env_id == selection.target_env_id:
logger.explore("Source and target environments are identical")
raise HTTPException(status_code=400, detail="Source and target environments must be different")
if not selection.selected_ids:
logger.explore("No dashboards selected for dry run")
raise HTTPException(status_code=400, detail="No dashboards selected for dry run")
service = MigrationDryRunService()
source_client = SupersetClient(source_env)
target_client = SupersetClient(target_env)
try:
return service.run(
selection=selection,
source_client=source_client,
target_client=target_client,
db=db,
)
except ValueError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
service = MigrationDryRunService()
source_client = SupersetClient(source_env)
target_client = SupersetClient(target_env)
try:
result = service.run(
selection=selection,
source_client=source_client,
target_client=target_client,
db=db,
)
logger.reflect("Dry run analysis complete")
return result
except ValueError as exc:
logger.explore(f"Dry run orchestrator failed: {exc}")
raise HTTPException(status_code=500, detail=str(exc)) from exc
# [/DEF:dry_run_migration:Function]
# [DEF:get_migration_settings:Function]

View File

@@ -32,7 +32,13 @@ class AuthRepository:
# @DATA_CONTRACT: Input[Session] -> Output[None]
def __init__(self, db: Session):
with belief_scope("AuthRepository.__init__"):
if not isinstance(db, Session):
logger.explore("Invalid session provided to AuthRepository", extra={"type": type(db)})
raise TypeError("db must be an instance of sqlalchemy.orm.Session")
logger.reason("Binding AuthRepository to database session")
self.db = db
logger.reflect("AuthRepository initialized")
# [/DEF:__init__:Function]
# [DEF:get_user_by_username:Function]
@@ -43,7 +49,17 @@ class AuthRepository:
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_username(self, username: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_username"):
return self.db.query(User).filter(User.username == username).first()
if not username or not isinstance(username, str):
raise ValueError("username must be a non-empty string")
logger.reason(f"Querying user by username: {username}")
user = self.db.query(User).filter(User.username == username).first()
if user:
logger.reflect(f"User found: {username}")
else:
logger.explore(f"User not found: {username}")
return user
# [/DEF:get_user_by_username:Function]
# [DEF:get_user_by_id:Function]
@@ -54,7 +70,17 @@ class AuthRepository:
# @DATA_CONTRACT: Input[str] -> Output[Optional[User]]
def get_user_by_id(self, user_id: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_id"):
return self.db.query(User).filter(User.id == user_id).first()
if not user_id or not isinstance(user_id, str):
raise ValueError("user_id must be a non-empty string")
logger.reason(f"Querying user by ID: {user_id}")
user = self.db.query(User).filter(User.id == user_id).first()
if user:
logger.reflect(f"User found by ID: {user_id}")
else:
logger.explore(f"User not found by ID: {user_id}")
return user
# [/DEF:get_user_by_id:Function]
# [DEF:get_role_by_name:Function]
@@ -76,10 +102,15 @@ class AuthRepository:
# @DATA_CONTRACT: Input[User] -> Output[None]
def update_last_login(self, user: User):
with belief_scope("AuthRepository.update_last_login"):
if not isinstance(user, User):
raise TypeError("user must be an instance of User")
from datetime import datetime
logger.reason(f"Updating last login for user: {user.username}")
user.last_login = datetime.utcnow()
self.db.add(user)
self.db.commit()
logger.reflect(f"Last login updated and committed for user: {user.username}")
# [/DEF:update_last_login:Function]
# [DEF:get_role_by_id:Function]
@@ -144,9 +175,14 @@ class AuthRepository:
preference: UserDashboardPreference,
) -> UserDashboardPreference:
with belief_scope("AuthRepository.save_user_dashboard_preference"):
if not isinstance(preference, UserDashboardPreference):
raise TypeError("preference must be an instance of UserDashboardPreference")
logger.reason(f"Saving dashboard preference for user: {preference.user_id}")
self.db.add(preference)
self.db.commit()
self.db.refresh(preference)
logger.reflect(f"Dashboard preference saved and refreshed for user: {preference.user_id}")
return preference
# [/DEF:save_user_dashboard_preference:Function]

View File

@@ -36,18 +36,23 @@ class ConfigManager:
# @SIDE_EFFECT: Reads config sources and updates logging configuration.
# @DATA_CONTRACT: Input(str config_path) -> Output(None; self.config: AppConfig)
def __init__(self, config_path: str = "config.json"):
with belief_scope("__init__"):
assert isinstance(config_path, str) and config_path, "config_path must be a non-empty string"
with belief_scope("ConfigManager.__init__"):
if not isinstance(config_path, str) or not config_path:
logger.explore("Invalid config_path provided", extra={"path": config_path})
raise ValueError("config_path must be a non-empty string")
logger.info(f"[ConfigManager][Entry] Initializing with legacy path {config_path}")
logger.reason(f"Initializing ConfigManager with legacy path: {config_path}")
self.config_path = Path(config_path)
self.config: AppConfig = self._load_config()
configure_logger(self.config.settings.logging)
assert isinstance(self.config, AppConfig), "self.config must be an instance of AppConfig"
if not isinstance(self.config, AppConfig):
logger.explore("Config loading resulted in invalid type", extra={"type": type(self.config)})
raise TypeError("self.config must be an instance of AppConfig")
logger.info("[ConfigManager][Exit] Initialized")
logger.reflect("ConfigManager initialization complete")
# [/DEF:__init__:Function]
# [DEF:_default_config:Function]
@@ -104,20 +109,23 @@ class ConfigManager:
# @SIDE_EFFECT: Database read/write, possible migration write, logging.
# @DATA_CONTRACT: Input(None) -> Output(AppConfig)
def _load_config(self) -> AppConfig:
with belief_scope("_load_config"):
with belief_scope("ConfigManager._load_config"):
session: Session = SessionLocal()
try:
record = self._get_record(session)
if record and record.payload:
logger.info("[_load_config][Coherence:OK] Configuration loaded from database")
return AppConfig(**record.payload)
logger.reason("Configuration found in database")
config = AppConfig(**record.payload)
logger.reflect("Database configuration validated")
return config
logger.info("[_load_config][Action] No database config found, migrating legacy config")
logger.reason("No database config found, initiating legacy migration")
config = self._load_from_legacy_file()
self._save_config_to_db(config, session=session)
logger.reflect("Legacy configuration migrated to database")
return config
except Exception as e:
logger.error(f"[_load_config][Coherence:Failed] Error loading config from DB: {e}")
logger.explore(f"Error loading config from DB: {e}")
return self._default_config()
finally:
session.close()
@@ -130,8 +138,9 @@ class ConfigManager:
# @SIDE_EFFECT: Database insert/update, commit/rollback, logging.
# @DATA_CONTRACT: Input(AppConfig, Optional[Session]) -> Output(None)
def _save_config_to_db(self, config: AppConfig, session: Optional[Session] = None):
with belief_scope("_save_config_to_db"):
assert isinstance(config, AppConfig), "config must be an instance of AppConfig"
with belief_scope("ConfigManager._save_config_to_db"):
if not isinstance(config, AppConfig):
raise TypeError("config must be an instance of AppConfig")
owns_session = session is None
db = session or SessionLocal()
@@ -139,15 +148,17 @@ class ConfigManager:
record = self._get_record(db)
payload = config.model_dump()
if record is None:
logger.reason("Creating new global configuration record")
record = AppConfigRecord(id="global", payload=payload)
db.add(record)
else:
logger.reason("Updating existing global configuration record")
record.payload = payload
db.commit()
logger.info("[_save_config_to_db][Action] Configuration saved to database")
logger.reflect("Configuration successfully committed to database")
except Exception as e:
db.rollback()
logger.error(f"[_save_config_to_db][Coherence:Failed] Failed to save: {e}")
logger.explore(f"Failed to save configuration: {e}")
raise
finally:
if owns_session:
@@ -183,14 +194,15 @@ class ConfigManager:
# @SIDE_EFFECT: Mutates self.config, DB write, logger reconfiguration, logging.
# @DATA_CONTRACT: Input(GlobalSettings) -> Output(None)
def update_global_settings(self, settings: GlobalSettings):
with belief_scope("update_global_settings"):
logger.info("[update_global_settings][Entry] Updating settings")
assert isinstance(settings, GlobalSettings), "settings must be an instance of GlobalSettings"
with belief_scope("ConfigManager.update_global_settings"):
if not isinstance(settings, GlobalSettings):
raise TypeError("settings must be an instance of GlobalSettings")
logger.reason("Updating global settings and persisting")
self.config.settings = settings
self.save()
configure_logger(settings.logging)
logger.info("[update_global_settings][Exit] Settings updated")
logger.reflect("Global settings updated and logger reconfigured")
# [/DEF:update_global_settings:Function]
# [DEF:validate_path:Function]
@@ -257,14 +269,15 @@ class ConfigManager:
# @SIDE_EFFECT: Mutates environment list, DB write, logging.
# @DATA_CONTRACT: Input(Environment) -> Output(None)
def add_environment(self, env: Environment):
with belief_scope("add_environment"):
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
assert isinstance(env, Environment), "env must be an instance of Environment"
with belief_scope("ConfigManager.add_environment"):
if not isinstance(env, Environment):
raise TypeError("env must be an instance of Environment")
logger.reason(f"Adding/Updating environment: {env.id}")
self.config.environments = [e for e in self.config.environments if e.id != env.id]
self.config.environments.append(env)
self.save()
logger.info("[add_environment][Exit] Environment added")
logger.reflect(f"Environment {env.id} persisted")
# [/DEF:add_environment:Function]
# [DEF:update_environment:Function]
@@ -274,22 +287,25 @@ class ConfigManager:
# @SIDE_EFFECT: May mutate environment list, DB write, logging.
# @DATA_CONTRACT: Input(str env_id, Environment updated_env) -> Output(bool)
def update_environment(self, env_id: str, updated_env: Environment) -> bool:
with belief_scope("update_environment"):
logger.info(f"[update_environment][Entry] Updating {env_id}")
assert env_id and isinstance(env_id, str), "env_id must be a non-empty string"
assert isinstance(updated_env, Environment), "updated_env must be an instance of Environment"
with belief_scope("ConfigManager.update_environment"):
if not env_id or not isinstance(env_id, str):
raise ValueError("env_id must be a non-empty string")
if not isinstance(updated_env, Environment):
raise TypeError("updated_env must be an instance of Environment")
logger.reason(f"Attempting to update environment: {env_id}")
for i, env in enumerate(self.config.environments):
if env.id == env_id:
if updated_env.password == "********":
logger.reason("Preserving existing password for masked update")
updated_env.password = env.password
self.config.environments[i] = updated_env
self.save()
logger.info(f"[update_environment][Coherence:OK] Updated {env_id}")
logger.reflect(f"Environment {env_id} updated and saved")
return True
logger.warning(f"[update_environment][Coherence:Failed] Environment {env_id} not found")
logger.explore(f"Environment {env_id} not found for update")
return False
# [/DEF:update_environment:Function]
@@ -300,18 +316,19 @@ class ConfigManager:
# @SIDE_EFFECT: May mutate environment list, conditional DB write, logging.
# @DATA_CONTRACT: Input(str env_id) -> Output(None)
def delete_environment(self, env_id: str):
with belief_scope("delete_environment"):
logger.info(f"[delete_environment][Entry] Deleting {env_id}")
assert env_id and isinstance(env_id, str), "env_id must be a non-empty string"
with belief_scope("ConfigManager.delete_environment"):
if not env_id or not isinstance(env_id, str):
raise ValueError("env_id must be a non-empty string")
logger.reason(f"Attempting to delete environment: {env_id}")
original_count = len(self.config.environments)
self.config.environments = [e for e in self.config.environments if e.id != env_id]
if len(self.config.environments) < original_count:
self.save()
logger.info(f"[delete_environment][Action] Deleted {env_id}")
logger.reflect(f"Environment {env_id} deleted and configuration saved")
else:
logger.warning(f"[delete_environment][Coherence:Failed] Environment {env_id} not found")
logger.explore(f"Environment {env_id} not found for deletion")
# [/DEF:delete_environment:Function]

View File

@@ -38,7 +38,9 @@ class MigrationEngine:
# @PARAM: mapping_service (Optional[IdMappingService]) - Used for resolving target environment integer IDs.
def __init__(self, mapping_service: Optional[IdMappingService] = None):
with belief_scope("MigrationEngine.__init__"):
logger.reason("Initializing MigrationEngine")
self.mapping_service = mapping_service
logger.reflect("MigrationEngine initialized")
# [/DEF:__init__:Function]
# [DEF:transform_zip:Function]
@@ -59,12 +61,14 @@ class MigrationEngine:
Transform a Superset export ZIP by replacing database UUIDs and optionally fixing cross-filters.
"""
with belief_scope("MigrationEngine.transform_zip"):
logger.reason(f"Starting ZIP transformation: {zip_path} -> {output_path}")
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
try:
# 1. Extract
logger.info(f"[MigrationEngine.transform_zip][Action] Extracting ZIP: {zip_path}")
logger.reason(f"Extracting source archive to {temp_dir}")
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(temp_dir)
@@ -72,33 +76,33 @@ class MigrationEngine:
dataset_files = list(temp_dir.glob("**/datasets/**/*.yaml")) + list(temp_dir.glob("**/datasets/*.yaml"))
dataset_files = list(set(dataset_files))
logger.info(f"[MigrationEngine.transform_zip][State] Found {len(dataset_files)} dataset files.")
logger.reason(f"Transforming {len(dataset_files)} dataset YAML files")
for ds_file in dataset_files:
logger.info(f"[MigrationEngine.transform_zip][Action] Transforming dataset: {ds_file}")
self._transform_yaml(ds_file, db_mapping)
# 2.5 Patch Cross-Filters (Dashboards)
if fix_cross_filters and self.mapping_service and target_env_id:
dash_files = list(temp_dir.glob("**/dashboards/**/*.yaml")) + list(temp_dir.glob("**/dashboards/*.yaml"))
dash_files = list(set(dash_files))
logger.info(f"[MigrationEngine.transform_zip][State] Found {len(dash_files)} dashboard files for patching.")
# Gather all source UUID-to-ID mappings from the archive first
source_id_to_uuid_map = self._extract_chart_uuids_from_archive(temp_dir)
for dash_file in dash_files:
logger.info(f"[MigrationEngine.transform_zip][Action] Patching dashboard: {dash_file}")
self._patch_dashboard_metadata(dash_file, target_env_id, source_id_to_uuid_map)
if fix_cross_filters:
if self.mapping_service and target_env_id:
dash_files = list(temp_dir.glob("**/dashboards/**/*.yaml")) + list(temp_dir.glob("**/dashboards/*.yaml"))
dash_files = list(set(dash_files))
logger.reason(f"Patching cross-filters for {len(dash_files)} dashboards")
# Gather all source UUID-to-ID mappings from the archive first
source_id_to_uuid_map = self._extract_chart_uuids_from_archive(temp_dir)
for dash_file in dash_files:
self._patch_dashboard_metadata(dash_file, target_env_id, source_id_to_uuid_map)
else:
logger.explore("Cross-filter patching requested but mapping service or target_env_id is missing")
# 3. Re-package
logger.info(f"[MigrationEngine.transform_zip][Action] Re-packaging ZIP to: {output_path} (strip_databases={strip_databases})")
logger.reason(f"Re-packaging transformed archive (strip_databases={strip_databases})")
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(temp_dir):
rel_root = Path(root).relative_to(temp_dir)
if strip_databases and "databases" in rel_root.parts:
logger.info(f"[MigrationEngine.transform_zip][Action] Skipping file in databases directory: {rel_root}")
continue
for file in files:
@@ -106,9 +110,10 @@ class MigrationEngine:
arcname = file_path.relative_to(temp_dir)
zf.write(file_path, arcname)
logger.reflect("ZIP transformation completed successfully")
return True
except Exception as e:
logger.error(f"[MigrationEngine.transform_zip][Coherence:Failed] Error transforming ZIP: {e}")
logger.explore(f"Error transforming ZIP: {e}")
return False
# [/DEF:transform_zip:Function]
@@ -122,19 +127,23 @@ class MigrationEngine:
# @DATA_CONTRACT: Input[(Path file_path, Dict[str,str] db_mapping)] -> Output[None]
def _transform_yaml(self, file_path: Path, db_mapping: Dict[str, str]):
with belief_scope("MigrationEngine._transform_yaml"):
if not file_path.exists():
logger.explore(f"YAML file not found: {file_path}")
return
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
if not data:
return
# Superset dataset YAML structure:
# database_uuid: ...
source_uuid = data.get('database_uuid')
if source_uuid in db_mapping:
logger.reason(f"Replacing database UUID in {file_path.name}")
data['database_uuid'] = db_mapping[source_uuid]
with open(file_path, 'w') as f:
yaml.dump(data, f)
logger.reflect(f"Database UUID patched in {file_path.name}")
# [/DEF:_transform_yaml:Function]
# [DEF:_extract_chart_uuids_from_archive:Function]
@@ -176,6 +185,9 @@ class MigrationEngine:
def _patch_dashboard_metadata(self, file_path: Path, target_env_id: str, source_map: Dict[int, str]):
with belief_scope("MigrationEngine._patch_dashboard_metadata"):
try:
if not file_path.exists():
return
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
@@ -186,18 +198,13 @@ class MigrationEngine:
if not metadata_str:
return
metadata = json.loads(metadata_str)
modified = False
# We need to deeply traverse and replace. For MVP, string replacement over the raw JSON is an option,
# but careful dict traversal is safer.
# Fetch target UUIDs for everything we know:
uuids_needed = list(source_map.values())
logger.reason(f"Resolving {len(uuids_needed)} remote IDs for dashboard metadata patching")
target_ids = self.mapping_service.get_remote_ids_batch(target_env_id, ResourceType.CHART, uuids_needed)
if not target_ids:
logger.info("[MigrationEngine._patch_dashboard_metadata][Reflect] No remote target IDs found in mapping database.")
logger.reflect("No remote target IDs found in mapping database for this dashboard.")
return
# Map Source Int -> Target Int
@@ -210,21 +217,16 @@ class MigrationEngine:
missing_targets.append(s_id)
if missing_targets:
logger.warning(f"[MigrationEngine._patch_dashboard_metadata][Coherence:Recoverable] Missing target IDs for source IDs: {missing_targets}. Cross-filters for these IDs might break.")
logger.explore(f"Missing target IDs for source IDs: {missing_targets}. Cross-filters might break.")
if not source_to_target:
logger.info("[MigrationEngine._patch_dashboard_metadata][Reflect] No source IDs matched remotely. Skipping patch.")
logger.reflect("No source IDs matched remotely. Skipping patch.")
return
# Complex metadata traversal would go here (e.g. for native_filter_configuration)
# We use regex replacement over the string for safety over unknown nested dicts.
logger.reason(f"Patching {len(source_to_target)} ID references in json_metadata")
new_metadata_str = metadata_str
# Replace chartId and datasetId assignments explicitly.
# Pattern: "datasetId": 42 or "chartId": 42
for s_id, t_id in source_to_target.items():
# Replace in native_filter_configuration targets
new_metadata_str = re.sub(r'("datasetId"\s*:\s*)' + str(s_id) + r'(\b)', r'\g<1>' + str(t_id) + r'\g<2>', new_metadata_str)
new_metadata_str = re.sub(r'("chartId"\s*:\s*)' + str(s_id) + r'(\b)', r'\g<1>' + str(t_id) + r'\g<2>', new_metadata_str)
@@ -233,10 +235,10 @@ class MigrationEngine:
with open(file_path, 'w') as f:
yaml.dump(data, f)
logger.info(f"[MigrationEngine._patch_dashboard_metadata][Reason] Re-serialized modified JSON metadata for dashboard.")
logger.reflect(f"Dashboard metadata patched and saved: {file_path.name}")
except Exception as e:
logger.error(f"[MigrationEngine._patch_dashboard_metadata][Coherence:Failed] Metadata patch failed: {e}")
logger.explore(f"Metadata patch failed for {file_path.name}: {e}")
# [/DEF:_patch_dashboard_metadata:Function]