# [DEF:DatasetReviewOrchestrator:Module] # @COMPLEXITY: 5 # @SEMANTICS: dataset_review, orchestration, session_lifecycle, intake, recovery # @PURPOSE: Coordinate dataset review session startup and lifecycle-safe intake recovery for one authenticated user. # @LAYER: Domain # @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository] # @RELATION: [DEPENDS_ON] ->[SemanticSourceResolver] # @RELATION: [DEPENDS_ON] ->[ClarificationEngine] # @RELATION: [DEPENDS_ON] ->[SupersetContextExtractor] # @RELATION: [DEPENDS_ON] ->[SupersetCompilationAdapter] # @RELATION: [DEPENDS_ON] ->[TaskManager] # @PRE: session mutations must execute inside a persisted session boundary scoped to one authenticated user. # @POST: state transitions are persisted atomically and emit observable progress for long-running steps. # @SIDE_EFFECT: creates task records, updates session aggregates, triggers upstream Superset calls, persists audit artifacts. # @DATA_CONTRACT: Input[SessionCommand] -> Output[DatasetReviewSession | CompiledPreview | DatasetRunContext] # @INVARIANT: Launch is blocked unless a current session has no open blocking findings, all launch-sensitive mappings are approved, and a non-stale Superset-generated compiled preview matches the current input fingerprint. from __future__ import annotations # [DEF:DatasetReviewOrchestrator.imports:Block] from dataclasses import dataclass, field from datetime import datetime import hashlib import json from typing import Any, Dict, List, Optional, cast from src.core.config_manager import ConfigManager from src.core.logger import belief_scope, logger from src.core.task_manager import TaskManager from src.core.utils.superset_compilation_adapter import ( PreviewCompilationPayload, SqlLabLaunchPayload, SupersetCompilationAdapter, ) from src.core.utils.superset_context_extractor import ( SupersetContextExtractor, SupersetParsedContext, ) from src.models.auth import User from src.models.dataset_review import ( ApprovalState, BusinessSummarySource, CompiledPreview, ConfidenceState, DatasetProfile, DatasetReviewSession, DatasetRunContext, ExecutionMapping, FilterConfidenceState, FilterRecoveryStatus, FilterSource, FindingArea, FindingSeverity, ImportedFilter, LaunchStatus, MappingMethod, MappingStatus, PreviewStatus, RecommendedAction, ReadinessState, ResolutionState, SessionPhase, SessionStatus, TemplateVariable, ValidationFinding, VariableKind, ) from src.services.dataset_review.repositories.session_repository import ( DatasetReviewSessionRepository, ) from src.services.dataset_review.semantic_resolver import SemanticSourceResolver from src.services.dataset_review.event_logger import SessionEventPayload # [/DEF:DatasetReviewOrchestrator.imports:Block] logger = cast(Any, logger) # [DEF:StartSessionCommand:Class] # @COMPLEXITY: 2 # @PURPOSE: Typed input contract for starting a dataset review session. @dataclass class StartSessionCommand: user: User environment_id: str source_kind: str source_input: str # [/DEF:StartSessionCommand:Class] # [DEF:StartSessionResult:Class] # @COMPLEXITY: 2 # @PURPOSE: Session-start result carrying the persisted session and intake recovery metadata. @dataclass class StartSessionResult: session: DatasetReviewSession parsed_context: Optional[SupersetParsedContext] = None findings: List[ValidationFinding] = field(default_factory=list) # [/DEF:StartSessionResult:Class] # [DEF:PreparePreviewCommand:Class] # @COMPLEXITY: 2 # @PURPOSE: Typed input contract for compiling one Superset-backed session preview. @dataclass class PreparePreviewCommand: user: User session_id: str # [/DEF:PreparePreviewCommand:Class] # [DEF:PreparePreviewResult:Class] # @COMPLEXITY: 2 # @PURPOSE: Result contract for one persisted compiled preview attempt. @dataclass class PreparePreviewResult: session: DatasetReviewSession preview: CompiledPreview blocked_reasons: List[str] = field(default_factory=list) # [/DEF:PreparePreviewResult:Class] # [DEF:LaunchDatasetCommand:Class] # @COMPLEXITY: 2 # @PURPOSE: Typed input contract for launching one dataset-review session into SQL Lab. @dataclass class LaunchDatasetCommand: user: User session_id: str # [/DEF:LaunchDatasetCommand:Class] # [DEF:LaunchDatasetResult:Class] # @COMPLEXITY: 2 # @PURPOSE: Launch result carrying immutable run context and any gate blockers surfaced before launch. @dataclass class LaunchDatasetResult: session: DatasetReviewSession run_context: DatasetRunContext blocked_reasons: List[str] = field(default_factory=list) # [/DEF:LaunchDatasetResult:Class] # [DEF:DatasetReviewOrchestrator:Class] # @COMPLEXITY: 5 # @PURPOSE: Coordinate safe session startup while preserving cross-user isolation and explicit partial recovery. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository] # @RELATION: [DEPENDS_ON] ->[SupersetContextExtractor] # @RELATION: [DEPENDS_ON] ->[TaskManager] # @RELATION: [DEPENDS_ON] ->[SessionRepo] # @RELATION: [DEPENDS_ON] ->[ConfigManager] # @PRE: constructor dependencies are valid and tied to the current request/task scope. # @POST: orchestrator instance can execute session-scoped mutations for one authenticated user. # @SIDE_EFFECT: downstream operations may persist session/profile/finding state and enqueue background tasks. # @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult] # @INVARIANT: session ownership is preserved on every mutation and recovery remains explicit when partial. class DatasetReviewOrchestrator: # [DEF:DatasetReviewOrchestrator.__init__:Function] # @COMPLEXITY: 3 # @PURPOSE: Bind repository, config, and task dependencies required by the orchestration boundary. # @RELATION: [DEPENDS_ON] ->[SessionRepo] # @RELATION: [DEPENDS_ON] ->[ConfigManager] def __init__( self, repository: DatasetReviewSessionRepository, config_manager: ConfigManager, task_manager: Optional[TaskManager] = None, semantic_resolver: Optional[SemanticSourceResolver] = None, ) -> None: self.repository = repository self.config_manager = config_manager self.task_manager = task_manager self.semantic_resolver = semantic_resolver or SemanticSourceResolver() # [/DEF:DatasetReviewOrchestrator.__init__:Function] # [DEF:DatasetReviewOrchestrator.start_session:Function] # @COMPLEXITY: 5 # @PURPOSE: Initialize a new session from a Superset link or dataset selection and trigger context recovery. # @RELATION: [DEPENDS_ON] ->[SessionRepo] # @RELATION: [CALLS] ->[SupersetContextExtractor.parse_superset_link] # @RELATION: [CALLS] ->[create_task] # @PRE: source input is non-empty and environment is accessible. # @POST: session exists in persisted storage with intake/recovery state and task linkage when async work is required. # @SIDE_EFFECT: persists session and may enqueue recovery task. # @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult] # @INVARIANT: no cross-user session leakage occurs; session and follow-up task remain owned by the authenticated user. def start_session(self, command: StartSessionCommand) -> StartSessionResult: with belief_scope("DatasetReviewOrchestrator.start_session"): normalized_source_kind = str(command.source_kind or "").strip() normalized_source_input = str(command.source_input or "").strip() normalized_environment_id = str(command.environment_id or "").strip() if not normalized_source_input: logger.explore( "Blocked dataset review session start due to empty source input" ) raise ValueError("source_input must be non-empty") if normalized_source_kind not in {"superset_link", "dataset_selection"}: logger.explore( "Blocked dataset review session start due to unsupported source kind", extra={"source_kind": normalized_source_kind}, ) raise ValueError( "source_kind must be 'superset_link' or 'dataset_selection'" ) environment = self.config_manager.get_environment(normalized_environment_id) if environment is None: logger.explore( "Blocked dataset review session start because environment was not found", extra={"environment_id": normalized_environment_id}, ) raise ValueError("Environment not found") logger.reason( "Starting dataset review session", extra={ "user_id": command.user.id, "environment_id": normalized_environment_id, "source_kind": normalized_source_kind, }, ) parsed_context: Optional[SupersetParsedContext] = None findings: List[ValidationFinding] = [] dataset_ref = normalized_source_input dataset_id: Optional[int] = None dashboard_id: Optional[int] = None readiness_state = ReadinessState.IMPORTING recommended_action = RecommendedAction.REVIEW_DOCUMENTATION current_phase = SessionPhase.RECOVERY if normalized_source_kind == "superset_link": extractor = SupersetContextExtractor(environment) parsed_context = extractor.parse_superset_link(normalized_source_input) dataset_ref = parsed_context.dataset_ref dataset_id = parsed_context.dataset_id dashboard_id = parsed_context.dashboard_id if parsed_context.partial_recovery: readiness_state = ReadinessState.RECOVERY_REQUIRED recommended_action = RecommendedAction.REVIEW_DOCUMENTATION findings.extend( self._build_partial_recovery_findings(parsed_context) ) else: readiness_state = ReadinessState.REVIEW_READY else: dataset_ref, dataset_id = self._parse_dataset_selection( normalized_source_input ) readiness_state = ReadinessState.REVIEW_READY current_phase = SessionPhase.REVIEW session = DatasetReviewSession( user_id=command.user.id, environment_id=normalized_environment_id, source_kind=normalized_source_kind, source_input=normalized_source_input, dataset_ref=dataset_ref, dataset_id=dataset_id, dashboard_id=dashboard_id, readiness_state=readiness_state, recommended_action=recommended_action, status=SessionStatus.ACTIVE, current_phase=current_phase, ) persisted_session = cast(Any, self.repository.create_session(session)) recovered_filters: List[ImportedFilter] = [] template_variables: List[TemplateVariable] = [] execution_mappings: List[ExecutionMapping] = [] if normalized_source_kind == "superset_link" and parsed_context is not None: recovered_filters, template_variables, execution_mappings, findings = ( self._build_recovery_bootstrap( environment=environment, session=persisted_session, parsed_context=parsed_context, findings=findings, ) ) profile = self._build_initial_profile( session_id=persisted_session.session_id, parsed_context=parsed_context, dataset_ref=dataset_ref, ) self.repository.event_logger.log_event( SessionEventPayload( session_id=persisted_session.session_id, actor_user_id=command.user.id, event_type="session_started", event_summary="Dataset review session shell created", current_phase=persisted_session.current_phase.value, readiness_state=persisted_session.readiness_state.value, event_details={ "source_kind": persisted_session.source_kind, "dataset_ref": persisted_session.dataset_ref, "dataset_id": persisted_session.dataset_id, "dashboard_id": persisted_session.dashboard_id, "partial_recovery": bool( parsed_context and parsed_context.partial_recovery ), }, ) ) persisted_session = self.repository.save_profile_and_findings( persisted_session.session_id, command.user.id, profile, findings, ) if recovered_filters or template_variables or execution_mappings: persisted_session = self.repository.save_recovery_state( persisted_session.session_id, command.user.id, recovered_filters, template_variables, execution_mappings, ) active_task_id = self._enqueue_recovery_task( command=command, session=persisted_session, parsed_context=parsed_context, ) if active_task_id: persisted_session.active_task_id = active_task_id self.repository.db.commit() self.repository.db.refresh(persisted_session) self.repository.event_logger.log_event( SessionEventPayload( session_id=persisted_session.session_id, actor_user_id=command.user.id, event_type="recovery_task_linked", event_summary="Recovery task linked to dataset review session", current_phase=persisted_session.current_phase.value, readiness_state=persisted_session.readiness_state.value, event_details={"task_id": active_task_id}, ) ) logger.reason( "Linked recovery task to started dataset review session", extra={ "session_id": persisted_session.session_id, "task_id": active_task_id, }, ) logger.reflect( "Dataset review session start completed", extra={ "session_id": persisted_session.session_id, "dataset_ref": persisted_session.dataset_ref, "dataset_id": persisted_session.dataset_id, "dashboard_id": persisted_session.dashboard_id, "readiness_state": persisted_session.readiness_state.value, "active_task_id": persisted_session.active_task_id, "finding_count": len(findings), }, ) return StartSessionResult( session=persisted_session, parsed_context=parsed_context, findings=findings, ) # [/DEF:DatasetReviewOrchestrator.start_session:Function] # [DEF:DatasetReviewOrchestrator.prepare_launch_preview:Function] # @COMPLEXITY: 4 # @PURPOSE: Assemble effective execution inputs and trigger Superset-side preview compilation. # @RELATION: [CALLS] ->[SupersetCompilationAdapter.compile_preview] # @PRE: all required variables have candidate values or explicitly accepted defaults. # @POST: returns preview artifact in pending, ready, failed, or stale state. # @SIDE_EFFECT: persists preview attempt and upstream compilation diagnostics. # @DATA_CONTRACT: Input[PreparePreviewCommand] -> Output[PreparePreviewResult] def prepare_launch_preview( self, command: PreparePreviewCommand ) -> PreparePreviewResult: with belief_scope("DatasetReviewOrchestrator.prepare_launch_preview"): session = self.repository.load_session_detail( command.session_id, command.user.id ) if session is None or session.user_id != command.user.id: logger.explore( "Preview preparation rejected because owned session was not found", extra={ "session_id": command.session_id, "user_id": command.user.id, }, ) raise ValueError("Session not found") if session.dataset_id is None: raise ValueError("Preview requires a resolved dataset_id") environment = self.config_manager.get_environment(session.environment_id) if environment is None: raise ValueError("Environment not found") execution_snapshot = self._build_execution_snapshot(session) preview_blockers = execution_snapshot["preview_blockers"] if preview_blockers: logger.explore( "Preview preparation blocked by incomplete execution context", extra={ "session_id": session.session_id, "blocked_reasons": preview_blockers, }, ) raise ValueError("Preview blocked: " + "; ".join(preview_blockers)) adapter = SupersetCompilationAdapter(environment) preview = adapter.compile_preview( PreviewCompilationPayload( session_id=session.session_id, dataset_id=session.dataset_id, preview_fingerprint=execution_snapshot["preview_fingerprint"], template_params=execution_snapshot["template_params"], effective_filters=execution_snapshot["effective_filters"], ) ) persisted_preview = self.repository.save_preview( session.session_id, command.user.id, preview, ) session.current_phase = SessionPhase.PREVIEW session.last_activity_at = datetime.utcnow() if persisted_preview.preview_status == PreviewStatus.READY: launch_blockers = self._build_launch_blockers( session=session, execution_snapshot=execution_snapshot, preview=persisted_preview, ) if launch_blockers: session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY session.recommended_action = RecommendedAction.APPROVE_MAPPING else: session.readiness_state = ReadinessState.RUN_READY session.recommended_action = RecommendedAction.LAUNCH_DATASET else: session.readiness_state = ReadinessState.PARTIALLY_READY session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW self.repository.db.commit() self.repository.db.refresh(session) self.repository.event_logger.log_event( SessionEventPayload( session_id=session.session_id, actor_user_id=command.user.id, event_type="preview_generated", event_summary="Superset preview generation persisted", current_phase=session.current_phase.value, readiness_state=session.readiness_state.value, event_details={ "preview_id": persisted_preview.preview_id, "preview_status": persisted_preview.preview_status.value, "preview_fingerprint": persisted_preview.preview_fingerprint, }, ) ) logger.reflect( "Superset preview preparation completed", extra={ "session_id": session.session_id, "preview_id": persisted_preview.preview_id, "preview_status": persisted_preview.preview_status.value, "preview_fingerprint": persisted_preview.preview_fingerprint, }, ) return PreparePreviewResult( session=session, preview=persisted_preview, blocked_reasons=[], ) # [/DEF:DatasetReviewOrchestrator.prepare_launch_preview:Function] # [DEF:DatasetReviewOrchestrator.launch_dataset:Function] # @COMPLEXITY: 5 # @PURPOSE: Start the approved dataset execution through SQL Lab and persist run context for audit/replay. # @RELATION: [CALLS] ->[SupersetCompilationAdapter.create_sql_lab_session] # @PRE: session is run-ready and compiled preview is current. # @POST: returns persisted run context with SQL Lab session reference and launch outcome. # @SIDE_EFFECT: creates SQL Lab execution session and audit snapshot. # @DATA_CONTRACT: Input[LaunchDatasetCommand] -> Output[LaunchDatasetResult] # @INVARIANT: launch remains blocked unless blocking findings are closed, approvals are satisfied, and the latest Superset preview fingerprint matches current execution inputs. def launch_dataset(self, command: LaunchDatasetCommand) -> LaunchDatasetResult: with belief_scope("DatasetReviewOrchestrator.launch_dataset"): session = self.repository.load_session_detail( command.session_id, command.user.id ) if session is None or session.user_id != command.user.id: logger.explore( "Launch rejected because owned session was not found", extra={ "session_id": command.session_id, "user_id": command.user.id, }, ) raise ValueError("Session not found") if session.dataset_id is None: raise ValueError("Launch requires a resolved dataset_id") environment = self.config_manager.get_environment(session.environment_id) if environment is None: raise ValueError("Environment not found") execution_snapshot = self._build_execution_snapshot(session) current_preview = self._get_latest_preview(session) launch_blockers = self._build_launch_blockers( session=session, execution_snapshot=execution_snapshot, preview=current_preview, ) if launch_blockers: logger.explore( "Launch gate blocked dataset execution", extra={ "session_id": session.session_id, "blocked_reasons": launch_blockers, }, ) raise ValueError("Launch blocked: " + "; ".join(launch_blockers)) adapter = SupersetCompilationAdapter(environment) try: sql_lab_session_ref = adapter.create_sql_lab_session( SqlLabLaunchPayload( session_id=session.session_id, dataset_id=session.dataset_id, preview_id=current_preview.preview_id, compiled_sql=str(current_preview.compiled_sql or ""), template_params=execution_snapshot["template_params"], ) ) launch_status = LaunchStatus.STARTED launch_error = None except Exception as exc: logger.explore( "SQL Lab launch failed after passing gates", extra={"session_id": session.session_id, "error": str(exc)}, ) sql_lab_session_ref = "unavailable" launch_status = LaunchStatus.FAILED launch_error = str(exc) run_context = DatasetRunContext( session_id=session.session_id, dataset_ref=session.dataset_ref, environment_id=session.environment_id, preview_id=current_preview.preview_id, sql_lab_session_ref=sql_lab_session_ref, effective_filters=execution_snapshot["effective_filters"], template_params=execution_snapshot["template_params"], approved_mapping_ids=execution_snapshot["approved_mapping_ids"], semantic_decision_refs=execution_snapshot["semantic_decision_refs"], open_warning_refs=execution_snapshot["open_warning_refs"], launch_status=launch_status, launch_error=launch_error, ) persisted_run_context = self.repository.save_run_context( session.session_id, command.user.id, run_context, ) session.current_phase = SessionPhase.LAUNCH session.last_activity_at = datetime.utcnow() if launch_status == LaunchStatus.FAILED: session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY session.recommended_action = RecommendedAction.LAUNCH_DATASET else: session.readiness_state = ReadinessState.RUN_IN_PROGRESS session.recommended_action = RecommendedAction.EXPORT_OUTPUTS self.repository.db.commit() self.repository.db.refresh(session) self.repository.event_logger.log_event( SessionEventPayload( session_id=session.session_id, actor_user_id=command.user.id, event_type="dataset_launch_requested", event_summary="Dataset launch handoff persisted", current_phase=session.current_phase.value, readiness_state=session.readiness_state.value, event_details={ "run_context_id": persisted_run_context.run_context_id, "launch_status": persisted_run_context.launch_status.value, "preview_id": persisted_run_context.preview_id, "sql_lab_session_ref": persisted_run_context.sql_lab_session_ref, }, ) ) logger.reflect( "Dataset launch orchestration completed with audited run context", extra={ "session_id": session.session_id, "run_context_id": persisted_run_context.run_context_id, "launch_status": persisted_run_context.launch_status.value, "preview_id": persisted_run_context.preview_id, }, ) return LaunchDatasetResult( session=session, run_context=persisted_run_context, blocked_reasons=[], ) # [/DEF:DatasetReviewOrchestrator.launch_dataset:Function] # [DEF:DatasetReviewOrchestrator._parse_dataset_selection:Function] # @COMPLEXITY: 3 # @PURPOSE: Normalize dataset-selection payload into canonical session references. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] def _parse_dataset_selection(self, source_input: str) -> tuple[str, Optional[int]]: normalized = str(source_input or "").strip() if not normalized: raise ValueError("dataset selection input must be non-empty") if normalized.isdigit(): dataset_id = int(normalized) return f"dataset:{dataset_id}", dataset_id if normalized.startswith("dataset:"): suffix = normalized.split(":", 1)[1].strip() if suffix.isdigit(): return normalized, int(suffix) return normalized, None return normalized, None # [/DEF:DatasetReviewOrchestrator._parse_dataset_selection:Function] # [DEF:DatasetReviewOrchestrator._build_initial_profile:Function] # @COMPLEXITY: 3 # @PURPOSE: Create the first profile snapshot so exports and detail views remain usable immediately after intake. # @RELATION: [DEPENDS_ON] ->[DatasetProfile] def _build_initial_profile( self, session_id: str, parsed_context: Optional[SupersetParsedContext], dataset_ref: str, ) -> DatasetProfile: dataset_name = ( dataset_ref.split(".")[-1] if dataset_ref else "Unresolved dataset" ) business_summary = ( f"Review session initialized for {dataset_ref}." if dataset_ref else "Review session initialized with unresolved dataset context." ) confidence_state = ( ConfidenceState.MIXED if parsed_context and parsed_context.partial_recovery else ConfidenceState.MOSTLY_CONFIRMED ) return DatasetProfile( session_id=session_id, dataset_name=dataset_name or "Unresolved dataset", schema_name=dataset_ref.split(".")[0] if "." in dataset_ref else None, business_summary=business_summary, business_summary_source=BusinessSummarySource.IMPORTED, description="Initial review profile created from source intake.", dataset_type="unknown", is_sqllab_view=False, completeness_score=0.25, confidence_state=confidence_state, has_blocking_findings=False, has_warning_findings=bool( parsed_context and parsed_context.partial_recovery ), manual_summary_locked=False, ) # [/DEF:DatasetReviewOrchestrator._build_initial_profile:Function] # [DEF:DatasetReviewOrchestrator._build_partial_recovery_findings:Function] # @COMPLEXITY: 4 # @PURPOSE: Project partial Superset intake recovery into explicit findings without blocking session usability. # @RELATION: [DEPENDS_ON] ->[ValidationFinding] # @PRE: parsed_context.partial_recovery is true. # @POST: returns warning-level findings that preserve usable but incomplete state. # @SIDE_EFFECT: none beyond structured finding creation. # @DATA_CONTRACT: Input[SupersetParsedContext] -> Output[List[ValidationFinding]] def _build_partial_recovery_findings( self, parsed_context: SupersetParsedContext, ) -> List[ValidationFinding]: findings: List[ValidationFinding] = [] for unresolved_ref in parsed_context.unresolved_references: findings.append( ValidationFinding( area=FindingArea.SOURCE_INTAKE, severity=FindingSeverity.WARNING, code="PARTIAL_SUPERSET_RECOVERY", title="Superset context recovered partially", message=( "Session remains usable, but some Superset context requires review: " f"{unresolved_ref.replace('_', ' ')}." ), resolution_state=ResolutionState.OPEN, caused_by_ref=unresolved_ref, ) ) return findings # [/DEF:DatasetReviewOrchestrator._build_partial_recovery_findings:Function] # [DEF:DatasetReviewOrchestrator._build_recovery_bootstrap:Function] # @COMPLEXITY: 4 # @PURPOSE: Recover and materialize initial imported filters, template variables, and draft execution mappings after session creation. # @RELATION: [CALLS] ->[SupersetContextExtractor.recover_imported_filters] # @RELATION: [CALLS] ->[SupersetContextExtractor.discover_template_variables] # @PRE: session belongs to the just-created review aggregate and parsed_context was produced for the same environment scope. # @POST: Returns bootstrap imported filters, template variables, execution mappings, and updated findings without persisting them directly. # @SIDE_EFFECT: Performs Superset reads through the extractor and may append warning findings for incomplete recovery. # @DATA_CONTRACT: Input[Environment, DatasetReviewSession, SupersetParsedContext, List[ValidationFinding]] -> Output[Tuple[List[ImportedFilter], List[TemplateVariable], List[ExecutionMapping], List[ValidationFinding]]] def _build_recovery_bootstrap( self, environment, session: DatasetReviewSession, parsed_context: SupersetParsedContext, findings: List[ValidationFinding], ) -> tuple[ List[ImportedFilter], List[TemplateVariable], List[ExecutionMapping], List[ValidationFinding], ]: session_record = cast(Any, session) extractor = SupersetContextExtractor(environment) imported_filters_payload = extractor.recover_imported_filters(parsed_context) if imported_filters_payload is None: imported_filters_payload = [] imported_filters = [ ImportedFilter( session_id=session_record.session_id, filter_name=str(item.get("filter_name") or f"imported_filter_{index}"), display_name=item.get("display_name"), raw_value=item.get("raw_value"), normalized_value=item.get("normalized_value"), source=FilterSource( str(item.get("source") or FilterSource.SUPERSET_URL.value) ), confidence_state=FilterConfidenceState( str( item.get("confidence_state") or FilterConfidenceState.UNRESOLVED.value ) ), requires_confirmation=bool(item.get("requires_confirmation", False)), recovery_status=FilterRecoveryStatus( str( item.get("recovery_status") or FilterRecoveryStatus.PARTIAL.value ) ), notes=item.get("notes"), ) for index, item in enumerate(imported_filters_payload) ] template_variables: List[TemplateVariable] = [] execution_mappings: List[ExecutionMapping] = [] if session.dataset_id is not None: try: dataset_payload = extractor.client.get_dataset_detail( session_record.dataset_id ) discovered_variables = extractor.discover_template_variables( dataset_payload ) template_variables = [ TemplateVariable( session_id=session_record.session_id, variable_name=str( item.get("variable_name") or f"variable_{index}" ), expression_source=str(item.get("expression_source") or ""), variable_kind=VariableKind( str(item.get("variable_kind") or VariableKind.UNKNOWN.value) ), is_required=bool(item.get("is_required", True)), default_value=item.get("default_value"), mapping_status=MappingStatus( str( item.get("mapping_status") or MappingStatus.UNMAPPED.value ) ), ) for index, item in enumerate(discovered_variables) ] except Exception as exc: if ( "dataset_template_variable_discovery_failed" not in parsed_context.unresolved_references ): parsed_context.unresolved_references.append( "dataset_template_variable_discovery_failed" ) if not any( finding.caused_by_ref == "dataset_template_variable_discovery_failed" for finding in findings ): findings.append( ValidationFinding( area=FindingArea.TEMPLATE_MAPPING, severity=FindingSeverity.WARNING, code="TEMPLATE_VARIABLE_DISCOVERY_FAILED", title="Template variables could not be discovered", message="Session remains usable, but dataset template variables still need review.", resolution_state=ResolutionState.OPEN, caused_by_ref="dataset_template_variable_discovery_failed", ) ) logger.explore( "Template variable discovery failed during session bootstrap", extra={ "session_id": session_record.session_id, "dataset_id": session_record.dataset_id, "error": str(exc), }, ) filter_lookup = { str(imported_filter.filter_name or "").strip().lower(): imported_filter for imported_filter in imported_filters if str(imported_filter.filter_name or "").strip() } for template_variable in template_variables: matched_filter = filter_lookup.get( str(template_variable.variable_name or "").strip().lower() ) if matched_filter is None: continue requires_explicit_approval = bool( matched_filter.requires_confirmation or matched_filter.recovery_status != FilterRecoveryStatus.RECOVERED ) execution_mappings.append( ExecutionMapping( session_id=session_record.session_id, filter_id=matched_filter.filter_id, variable_id=template_variable.variable_id, mapping_method=MappingMethod.DIRECT_MATCH, raw_input_value=matched_filter.raw_value, effective_value=matched_filter.normalized_value if matched_filter.normalized_value is not None else matched_filter.raw_value, transformation_note="Bootstrapped from Superset recovery context", warning_level=None if not requires_explicit_approval else None, requires_explicit_approval=requires_explicit_approval, approval_state=ApprovalState.PENDING if requires_explicit_approval else ApprovalState.NOT_REQUIRED, approved_by_user_id=None, approved_at=None, ) ) return imported_filters, template_variables, execution_mappings, findings # [/DEF:DatasetReviewOrchestrator._build_recovery_bootstrap:Function] # [DEF:DatasetReviewOrchestrator._build_execution_snapshot:Function] # @COMPLEXITY: 4 # @PURPOSE: Build effective filters, template params, approvals, and fingerprint for preview and launch gating. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] # @PRE: Session aggregate includes imported filters, template variables, and current execution mappings. # @POST: returns deterministic execution snapshot for current session state without mutating persistence. # @SIDE_EFFECT: none. # @DATA_CONTRACT: Input[DatasetReviewSession] -> Output[Dict[str,Any]] def _build_execution_snapshot( self, session: DatasetReviewSession ) -> Dict[str, Any]: session_record = cast(Any, session) filter_lookup = { item.filter_id: item for item in session_record.imported_filters } variable_lookup = { item.variable_id: item for item in session_record.template_variables } effective_filters: List[Dict[str, Any]] = [] template_params: Dict[str, Any] = {} approved_mapping_ids: List[str] = [] open_warning_refs: List[str] = [] preview_blockers: List[str] = [] mapped_filter_ids: set[str] = set() for mapping in session_record.execution_mappings: imported_filter = filter_lookup.get(mapping.filter_id) template_variable = variable_lookup.get(mapping.variable_id) if imported_filter is None: preview_blockers.append(f"mapping:{mapping.mapping_id}:missing_filter") continue if template_variable is None: preview_blockers.append( f"mapping:{mapping.mapping_id}:missing_variable" ) continue effective_value = mapping.effective_value if effective_value is None: effective_value = imported_filter.normalized_value if effective_value is None: effective_value = imported_filter.raw_value if effective_value is None: effective_value = template_variable.default_value if effective_value is None and template_variable.is_required: preview_blockers.append( f"variable:{template_variable.variable_name}:missing_required_value" ) continue mapped_filter_ids.add(imported_filter.filter_id) if effective_value is not None: effective_filters.append( { "mapping_id": mapping.mapping_id, "filter_id": imported_filter.filter_id, "filter_name": imported_filter.filter_name, "display_name": imported_filter.display_name, "variable_id": template_variable.variable_id, "variable_name": template_variable.variable_name, "effective_value": effective_value, "raw_input_value": mapping.raw_input_value, "normalized_filter_payload": imported_filter.normalized_value, } ) template_params[template_variable.variable_name] = effective_value if mapping.approval_state == ApprovalState.APPROVED: approved_mapping_ids.append(mapping.mapping_id) if ( mapping.requires_explicit_approval and mapping.approval_state != ApprovalState.APPROVED ): open_warning_refs.append(mapping.mapping_id) for imported_filter in session_record.imported_filters: if imported_filter.filter_id in mapped_filter_ids: continue effective_value = imported_filter.normalized_value if effective_value is None: effective_value = imported_filter.raw_value if effective_value is None: continue effective_filters.append( { "filter_id": imported_filter.filter_id, "filter_name": imported_filter.filter_name, "display_name": imported_filter.display_name, "effective_value": effective_value, "raw_input_value": imported_filter.raw_value, "normalized_filter_payload": imported_filter.normalized_value, } ) mapped_variable_ids = { mapping.variable_id for mapping in session_record.execution_mappings } for variable in session_record.template_variables: if variable.variable_id in mapped_variable_ids: continue if variable.default_value is not None: template_params[variable.variable_name] = variable.default_value continue if variable.is_required: preview_blockers.append(f"variable:{variable.variable_name}:unmapped") semantic_decision_refs = [ field.field_id for field in session.semantic_fields if field.is_locked or not field.needs_review or field.provenance.value != "unresolved" ] preview_fingerprint = self._compute_preview_fingerprint( { "dataset_id": session_record.dataset_id, "template_params": template_params, "effective_filters": effective_filters, } ) return { "effective_filters": effective_filters, "template_params": template_params, "approved_mapping_ids": sorted(approved_mapping_ids), "semantic_decision_refs": sorted(semantic_decision_refs), "open_warning_refs": sorted(open_warning_refs), "preview_blockers": sorted(set(preview_blockers)), "preview_fingerprint": preview_fingerprint, } # [/DEF:DatasetReviewOrchestrator._build_execution_snapshot:Function] # [DEF:DatasetReviewOrchestrator._build_launch_blockers:Function] # @COMPLEXITY: 4 # @PURPOSE: Enforce launch gates from findings, approvals, and current preview truth. # @RELATION: [DEPENDS_ON] ->[CompiledPreview] # @PRE: execution_snapshot was computed from current session state and preview is the latest persisted preview or None. # @POST: returns explicit blocker codes for every unmet launch invariant. # @SIDE_EFFECT: none. # @DATA_CONTRACT: Input[DatasetReviewSession,Dict[str,Any],CompiledPreview|None] -> Output[List[str]] def _build_launch_blockers( self, session: DatasetReviewSession, execution_snapshot: Dict[str, Any], preview: Optional[CompiledPreview], ) -> List[str]: session_record = cast(Any, session) blockers = list(execution_snapshot["preview_blockers"]) for finding in session_record.findings: if ( finding.severity == FindingSeverity.BLOCKING and finding.resolution_state not in {ResolutionState.RESOLVED, ResolutionState.APPROVED} ): blockers.append(f"finding:{finding.code}:blocking") for mapping in session_record.execution_mappings: if ( mapping.requires_explicit_approval and mapping.approval_state != ApprovalState.APPROVED ): blockers.append(f"mapping:{mapping.mapping_id}:approval_required") if preview is None: blockers.append("preview:missing") else: if preview.preview_status != PreviewStatus.READY: blockers.append(f"preview:{preview.preview_status.value}") if preview.preview_fingerprint != execution_snapshot["preview_fingerprint"]: blockers.append("preview:fingerprint_mismatch") return sorted(set(blockers)) # [/DEF:DatasetReviewOrchestrator._build_launch_blockers:Function] # [DEF:DatasetReviewOrchestrator._get_latest_preview:Function] # @COMPLEXITY: 2 # @PURPOSE: Resolve the current latest preview snapshot for one session aggregate. def _get_latest_preview( self, session: DatasetReviewSession ) -> Optional[CompiledPreview]: session_record = cast(Any, session) if not session_record.previews: return None if session_record.last_preview_id: for preview in session_record.previews: if preview.preview_id == session_record.last_preview_id: return preview return sorted( session_record.previews, key=lambda item: (item.created_at or datetime.min, item.preview_id), reverse=True, )[0] # [/DEF:DatasetReviewOrchestrator._get_latest_preview:Function] # [DEF:DatasetReviewOrchestrator._compute_preview_fingerprint:Function] # @COMPLEXITY: 2 # @PURPOSE: Produce deterministic execution fingerprint for preview truth and staleness checks. def _compute_preview_fingerprint(self, payload: Dict[str, Any]) -> str: serialized = json.dumps(payload, sort_keys=True, default=str) return hashlib.sha256(serialized.encode("utf-8")).hexdigest() # [/DEF:DatasetReviewOrchestrator._compute_preview_fingerprint:Function] # [DEF:DatasetReviewOrchestrator._enqueue_recovery_task:Function] # @COMPLEXITY: 4 # @PURPOSE: Link session start to observable async recovery when task infrastructure is available. # @RELATION: [CALLS] ->[create_task] # @PRE: session is already persisted. # @POST: returns task identifier when a task could be enqueued, otherwise None. # @SIDE_EFFECT: may create one background task for progressive recovery. # @DATA_CONTRACT: Input[StartSessionCommand,DatasetReviewSession,SupersetParsedContext|None] -> Output[task_id:str|None] def _enqueue_recovery_task( self, command: StartSessionCommand, session: DatasetReviewSession, parsed_context: Optional[SupersetParsedContext], ) -> Optional[str]: session_record = cast(Any, session) if self.task_manager is None: logger.reason( "Dataset review session started without task manager; continuing synchronously", extra={"session_id": session_record.session_id}, ) return None task_params: Dict[str, Any] = { "session_id": session_record.session_id, "user_id": command.user.id, "environment_id": session_record.environment_id, "source_kind": session_record.source_kind, "source_input": session_record.source_input, "dataset_ref": session_record.dataset_ref, "dataset_id": session_record.dataset_id, "dashboard_id": session_record.dashboard_id, "partial_recovery": bool( parsed_context and parsed_context.partial_recovery ), } create_task = getattr(self.task_manager, "create_task", None) if create_task is None: logger.explore( "Task manager has no create_task method; skipping recovery enqueue" ) return None try: task_object = create_task( plugin_id="dataset-review-recovery", params=task_params, ) except TypeError: logger.explore( "Recovery task enqueue skipped because task manager create_task contract is incompatible", extra={"session_id": session_record.session_id}, ) return None task_id = getattr(task_object, "id", None) return str(task_id) if task_id else None # [/DEF:DatasetReviewOrchestrator._enqueue_recovery_task:Function] # [/DEF:DatasetReviewOrchestrator:Class] # [/DEF:DatasetReviewOrchestrator:Module]