# [DEF:backend.src.core.migration_engine:Module] # # @COMPLEXITY: 5 # @SEMANTICS: migration, engine, zip, yaml, transformation, cross-filter, id-mapping # @PURPOSE: Transforms Superset export ZIP archives while preserving archive integrity and patching mapped identifiers. # @LAYER: Domain # @RELATION: [DEPENDS_ON] ->[src.core.logger] # @RELATION: [DEPENDS_ON] ->[src.core.mapping_service.IdMappingService] # @RELATION: [DEPENDS_ON] ->[src.models.mapping.ResourceType] # @RELATION: [DEPENDS_ON] ->[yaml] # # @INVARIANT: ZIP structure and non-targeted metadata must remain valid after transformation. # [SECTION: IMPORTS] import zipfile import yaml import os import json import re import tempfile from pathlib import Path from typing import Dict, Optional, List from .logger import logger, belief_scope from src.core.mapping_service import IdMappingService from src.models.mapping import ResourceType # [/SECTION] # [DEF:MigrationEngine:Class] # @PURPOSE: Engine for transforming Superset export ZIPs. # @RELATION: CONTAINS -> [__init__, transform_zip, _transform_yaml, _extract_chart_uuids_from_archive, _patch_dashboard_metadata] class MigrationEngine: # [DEF:__init__:Function] # @PURPOSE: Initializes migration orchestration dependencies for ZIP/YAML metadata transformations. # @PRE: mapping_service is None or implements batch remote ID lookup for ResourceType.CHART. # @POST: self.mapping_service is assigned and available for optional cross-filter patching flows. # @SIDE_EFFECT: Mutates in-memory engine state by storing dependency reference. # @DATA_CONTRACT: Input[Optional[IdMappingService]] -> Output[MigrationEngine] # @PARAM: mapping_service (Optional[IdMappingService]) - Used for resolving target environment integer IDs. def __init__(self, mapping_service: Optional[IdMappingService] = None): with belief_scope("MigrationEngine.__init__"): logger.reason("Initializing MigrationEngine") self.mapping_service = mapping_service logger.reflect("MigrationEngine initialized") # [/DEF:__init__:Function] # [DEF:transform_zip:Function] # @PURPOSE: Extracts ZIP, replaces database UUIDs in YAMLs, patches cross-filters, and re-packages. # @RELATION: DEPENDS_ON -> [_transform_yaml, _extract_chart_uuids_from_archive, _patch_dashboard_metadata] # @PARAM: zip_path (str) - Path to the source ZIP file. # @PARAM: output_path (str) - Path where the transformed ZIP will be saved. # @PARAM: db_mapping (Dict[str, str]) - Mapping of source UUID to target UUID. # @PARAM: strip_databases (bool) - Whether to remove the databases directory from the archive. # @PARAM: target_env_id (Optional[str]) - Used if fix_cross_filters is True to know which environment map to use. # @PARAM: fix_cross_filters (bool) - Whether to patch dashboard json_metadata. # @PRE: zip_path points to a readable ZIP; output_path parent is writable; db_mapping keys/values are UUID strings. # @POST: Returns True only when extraction, transformation, and packaging complete without exception. # @SIDE_EFFECT: Reads/writes filesystem archives, creates temporary directory, emits structured logs. # @DATA_CONTRACT: Input[(str zip_path, str output_path, Dict[str,str] db_mapping, bool strip_databases, Optional[str] target_env_id, bool fix_cross_filters)] -> Output[bool] # @RETURN: bool - True if successful. def transform_zip( self, zip_path: str, output_path: str, db_mapping: Dict[str, str], strip_databases: bool = True, target_env_id: Optional[str] = None, fix_cross_filters: bool = False, ) -> bool: """ Transform a Superset export ZIP by replacing database UUIDs and optionally fixing cross-filters. """ with belief_scope("MigrationEngine.transform_zip"): logger.reason(f"Starting ZIP transformation: {zip_path} -> {output_path}") with tempfile.TemporaryDirectory() as temp_dir_str: temp_dir = Path(temp_dir_str) try: # 1. Extract logger.reason(f"Extracting source archive to {temp_dir}") with zipfile.ZipFile(zip_path, "r") as zf: zf.extractall(temp_dir) # 2. Transform YAMLs (Databases) dataset_files = list(temp_dir.glob("**/datasets/**/*.yaml")) + list( temp_dir.glob("**/datasets/*.yaml") ) dataset_files = list(set(dataset_files)) logger.reason( f"Transforming {len(dataset_files)} dataset YAML files" ) for ds_file in dataset_files: self._transform_yaml(ds_file, db_mapping) # 2.5 Patch Cross-Filters (Dashboards) if fix_cross_filters: if self.mapping_service and target_env_id: dash_files = list( temp_dir.glob("**/dashboards/**/*.yaml") ) + list(temp_dir.glob("**/dashboards/*.yaml")) dash_files = list(set(dash_files)) logger.reason( f"Patching cross-filters for {len(dash_files)} dashboards" ) # Gather all source UUID-to-ID mappings from the archive first source_id_to_uuid_map = ( self._extract_chart_uuids_from_archive(temp_dir) ) for dash_file in dash_files: self._patch_dashboard_metadata( dash_file, target_env_id, source_id_to_uuid_map ) else: logger.explore( "Cross-filter patching requested but mapping service or target_env_id is missing" ) # 3. Re-package logger.reason( f"Re-packaging transformed archive (strip_databases={strip_databases})" ) with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(temp_dir): rel_root = Path(root).relative_to(temp_dir) if strip_databases and "databases" in rel_root.parts: continue for file in files: file_path = Path(root) / file arcname = file_path.relative_to(temp_dir) zf.write(file_path, arcname) logger.reflect("ZIP transformation completed successfully") return True except Exception as e: logger.explore(f"Error transforming ZIP: {e}") return False # [/DEF:transform_zip:Function] # [DEF:_transform_yaml:Function] # @PURPOSE: Replaces database_uuid in a single YAML file. # @PARAM: file_path (Path) - Path to the YAML file. # @PARAM: db_mapping (Dict[str, str]) - UUID mapping dictionary. # @PRE: file_path exists, is readable YAML, and db_mapping contains source->target UUID pairs. # @POST: database_uuid is replaced in-place only when source UUID is present in db_mapping. # @SIDE_EFFECT: Reads and conditionally rewrites YAML file on disk. # @DATA_CONTRACT: Input[(Path file_path, Dict[str,str] db_mapping)] -> Output[None] def _transform_yaml(self, file_path: Path, db_mapping: Dict[str, str]): with belief_scope("MigrationEngine._transform_yaml"): if not file_path.exists(): logger.explore(f"YAML file not found: {file_path}") raise FileNotFoundError(str(file_path)) with open(file_path, "r") as f: data = yaml.safe_load(f) if not data: return source_uuid = data.get("database_uuid") if source_uuid in db_mapping: logger.reason(f"Replacing database UUID in {file_path.name}") data["database_uuid"] = db_mapping[source_uuid] with open(file_path, "w") as f: yaml.dump(data, f) logger.reflect(f"Database UUID patched in {file_path.name}") # [/DEF:_transform_yaml:Function] # [DEF:_extract_chart_uuids_from_archive:Function] # @PURPOSE: Scans extracted chart YAML files and builds a source chart ID to UUID lookup map. # @PRE: temp_dir exists and points to extracted archive root with optional chart YAML resources. # @POST: Returns a best-effort Dict[int, str] containing only parseable chart id/uuid pairs. # @SIDE_EFFECT: Reads chart YAML files from filesystem; suppresses per-file parsing failures. # @DATA_CONTRACT: Input[Path] -> Output[Dict[int,str]] # @PARAM: temp_dir (Path) - Root dir of unpacked archive. # @RETURN: Dict[int, str] - Mapping of source Integer ID to UUID. def _extract_chart_uuids_from_archive(self, temp_dir: Path) -> Dict[int, str]: with belief_scope("MigrationEngine._extract_chart_uuids_from_archive"): # Implementation Note: This is a placeholder for the logic that extracts # actual Source IDs. In a real scenario, this involves parsing chart YAMLs # or manifesting the export metadata structure where source IDs are stored. # For simplicity in US1 MVP, we assume it's read from chart files if present. mapping = {} chart_files = list(temp_dir.glob("**/charts/**/*.yaml")) + list( temp_dir.glob("**/charts/*.yaml") ) for cf in set(chart_files): try: with open(cf, "r") as f: cdata = yaml.safe_load(f) if cdata and "id" in cdata and "uuid" in cdata: mapping[cdata["id"]] = cdata["uuid"] except Exception: pass return mapping # [/DEF:_extract_chart_uuids_from_archive:Function] # [DEF:_patch_dashboard_metadata:Function] # @PURPOSE: Rewrites dashboard json_metadata chart/dataset integer identifiers using target environment mappings. # @PRE: file_path points to dashboard YAML with json_metadata; target_env_id is non-empty; source_map contains source id->uuid. # @POST: json_metadata is re-serialized with mapped integer IDs when remote mappings are available; otherwise file remains unchanged. # @SIDE_EFFECT: Reads/writes YAML file, performs mapping lookup via mapping_service, emits logs for recoverable/terminal failures. # @DATA_CONTRACT: Input[(Path file_path, str target_env_id, Dict[int,str] source_map)] -> Output[None] # @PARAM: file_path (Path) # @PARAM: target_env_id (str) # @PARAM: source_map (Dict[int, str]) def _patch_dashboard_metadata( self, file_path: Path, target_env_id: str, source_map: Dict[int, str] ): with belief_scope("MigrationEngine._patch_dashboard_metadata"): try: if not file_path.exists(): return with open(file_path, "r") as f: data = yaml.safe_load(f) if not data or "json_metadata" not in data: return metadata_str = data["json_metadata"] if not metadata_str: return # Fetch target UUIDs for everything we know: uuids_needed = list(source_map.values()) logger.reason( f"Resolving {len(uuids_needed)} remote IDs for dashboard metadata patching" ) target_ids = self.mapping_service.get_remote_ids_batch( target_env_id, ResourceType.CHART, uuids_needed ) if not target_ids: logger.reflect( "No remote target IDs found in mapping database for this dashboard." ) return # Map Source Int -> Target Int source_to_target = {} missing_targets = [] for s_id, s_uuid in source_map.items(): if s_uuid in target_ids: source_to_target[s_id] = target_ids[s_uuid] else: missing_targets.append(s_id) if missing_targets: logger.explore( f"Missing target IDs for source IDs: {missing_targets}. Cross-filters might break." ) if not source_to_target: logger.reflect("No source IDs matched remotely. Skipping patch.") return logger.reason( f"Patching {len(source_to_target)} ID references in json_metadata" ) new_metadata_str = metadata_str for s_id, t_id in source_to_target.items(): new_metadata_str = re.sub( r'("datasetId"\s*:\s*)' + str(s_id) + r"(\b)", r"\g<1>" + str(t_id) + r"\g<2>", new_metadata_str, ) new_metadata_str = re.sub( r'("chartId"\s*:\s*)' + str(s_id) + r"(\b)", r"\g<1>" + str(t_id) + r"\g<2>", new_metadata_str, ) # Re-parse to validate valid JSON data["json_metadata"] = json.dumps(json.loads(new_metadata_str)) with open(file_path, "w") as f: yaml.dump(data, f) logger.reflect( f"Dashboard metadata patched and saved: {file_path.name}" ) except Exception as e: logger.explore(f"Metadata patch failed for {file_path.name}: {e}") # [/DEF:_patch_dashboard_metadata:Function] # [/DEF:MigrationEngine:Class] # [/DEF:backend.src.core.migration_engine:Module]