# [DEF:DatasetReviewApi:Module] # @COMPLEXITY: 4 # @SEMANTICS: dataset_review, api, session_lifecycle, exports, rbac, feature_flags # @PURPOSE: Expose dataset review session lifecycle and export endpoints for backend US1. # @LAYER: API # @RELATION: [DEPENDS_ON] ->[AppDependencies] # @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository] # @RELATION: [DEPENDS_ON] ->[DatasetReviewOrchestrator] # @PRE: Authenticated user and valid environment/session scope are required for all mutations and reads. # @POST: Returns ownership-scoped session state and export payloads with feature-flag/RBAC enforcement. # @SIDE_EFFECT: Persists session state and may enqueue recovery task. # @DATA_CONTRACT: Input[HTTP Request] -> Output[SessionSummary | SessionDetail | ExportArtifactResponse | HTTP 204] # @INVARIANT: No cross-user session leakage is allowed; export payloads only expose the current user's accessible session. from __future__ import annotations # [DEF:DatasetReviewApi.imports:Block] import json from datetime import datetime from typing import Any, Dict, List, Optional, Union from fastapi import APIRouter, Depends, HTTPException, Query, Response, status from pydantic import BaseModel, Field from sqlalchemy.orm import Session from src.core.database import get_db from src.core.logger import belief_scope, logger from src.dependencies import get_config_manager, get_current_user, get_task_manager, has_permission from src.models.auth import User from src.models.dataset_review import ( AnswerKind, ApprovalState, ArtifactFormat, CandidateStatus, ClarificationSession, DatasetReviewSession, ExecutionMapping, FieldProvenance, MappingMethod, PreviewStatus, QuestionState, ReadinessState, RecommendedAction, SemanticCandidate, SemanticFieldEntry, SessionStatus, ) from src.schemas.dataset_review import ( ClarificationAnswerDto, ClarificationQuestionDto, ClarificationSessionDto, CompiledPreviewDto, DatasetRunContextDto, ExecutionMappingDto, SemanticFieldEntryDto, SessionDetail, SessionSummary, ValidationFindingDto, ) from src.services.dataset_review.clarification_engine import ( ClarificationAnswerCommand, ClarificationEngine, ClarificationQuestionPayload, ClarificationStateResult, ) from src.services.dataset_review.orchestrator import ( DatasetReviewOrchestrator, LaunchDatasetCommand, PreparePreviewCommand, StartSessionCommand, ) from src.services.dataset_review.repositories.session_repository import ( DatasetReviewSessionRepository, ) # [/DEF:DatasetReviewApi.imports:Block] router = APIRouter(prefix="/api/dataset-orchestration", tags=["Dataset Orchestration"]) # [DEF:StartSessionRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for starting one dataset review session from a Superset link or dataset selection. class StartSessionRequest(BaseModel): source_kind: str = Field(..., pattern="^(superset_link|dataset_selection)$") source_input: str = Field(..., min_length=1) environment_id: str = Field(..., min_length=1) # [/DEF:StartSessionRequest:Class] # [DEF:UpdateSessionRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for lifecycle state updates on an existing session. class UpdateSessionRequest(BaseModel): status: SessionStatus note: Optional[str] = None # [/DEF:UpdateSessionRequest:Class] # [DEF:SessionCollectionResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Paginated ownership-scoped dataset review session collection response. class SessionCollectionResponse(BaseModel): items: List[SessionSummary] total: int page: int page_size: int has_next: bool # [/DEF:SessionCollectionResponse:Class] # [DEF:ExportArtifactResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Inline export response for documentation or validation outputs without introducing unrelated persistence changes. class ExportArtifactResponse(BaseModel): artifact_id: str session_id: str artifact_type: str format: str storage_ref: str created_by_user_id: str created_at: Optional[str] = None content: Dict[str, Any] # [/DEF:ExportArtifactResponse:Class] # [DEF:FieldSemanticUpdateRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for field-level semantic candidate acceptance or manual override. class FieldSemanticUpdateRequest(BaseModel): candidate_id: Optional[str] = None verbose_name: Optional[str] = None description: Optional[str] = None display_format: Optional[str] = None lock_field: bool = False resolution_note: Optional[str] = None # [/DEF:FieldSemanticUpdateRequest:Class] # [DEF:FeedbackRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for thumbs up/down feedback persistence on AI-assisted content. class FeedbackRequest(BaseModel): feedback: str = Field(..., pattern="^(up|down)$") # [/DEF:FeedbackRequest:Class] # [DEF:ClarificationAnswerRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for submitting one clarification answer. class ClarificationAnswerRequest(BaseModel): question_id: str = Field(..., min_length=1) answer_kind: AnswerKind answer_value: Optional[str] = None # [/DEF:ClarificationAnswerRequest:Class] # [DEF:ClarificationSessionSummaryResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Summary DTO for current clarification session state without exposing historical noise. class ClarificationSessionSummaryResponse(BaseModel): clarification_session_id: str session_id: str status: str current_question_id: Optional[str] = None resolved_count: int remaining_count: int summary_delta: Optional[str] = None # [/DEF:ClarificationSessionSummaryResponse:Class] # [DEF:ClarificationStateResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Response DTO for current clarification state and active question payload. class ClarificationStateResponse(BaseModel): clarification_session: ClarificationSessionSummaryResponse current_question: Optional[ClarificationQuestionDto] = None # [/DEF:ClarificationStateResponse:Class] # [DEF:ClarificationAnswerResultResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Response DTO for one clarification answer mutation result. class ClarificationAnswerResultResponse(BaseModel): clarification_state: ClarificationStateResponse session: SessionSummary changed_findings: List[ValidationFindingDto] # [/DEF:ClarificationAnswerResultResponse:Class] # [DEF:FeedbackResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Minimal response DTO for persisted AI feedback actions. class FeedbackResponse(BaseModel): target_id: str feedback: str # [/DEF:FeedbackResponse:Class] # [DEF:ApproveMappingRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Optional request DTO for explicit mapping approval audit notes. class ApproveMappingRequest(BaseModel): approval_note: Optional[str] = None # [/DEF:ApproveMappingRequest:Class] # [DEF:BatchApproveSemanticItemRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for one batch semantic-approval item aligned with single-field acceptance semantics. class BatchApproveSemanticItemRequest(BaseModel): field_id: str = Field(..., min_length=1) candidate_id: str = Field(..., min_length=1) lock_field: bool = False # [/DEF:BatchApproveSemanticItemRequest:Class] # [DEF:BatchApproveSemanticRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for explicit batch semantic approvals inside one owned session scope. class BatchApproveSemanticRequest(BaseModel): items: List[BatchApproveSemanticItemRequest] = Field(..., min_length=1) # [/DEF:BatchApproveSemanticRequest:Class] # [DEF:BatchApproveMappingRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for explicit batch mapping approvals aligned with single-item approval semantics. class BatchApproveMappingRequest(BaseModel): mapping_ids: List[str] = Field(..., min_length=1) approval_note: Optional[str] = None # [/DEF:BatchApproveMappingRequest:Class] # [DEF:PreviewEnqueueResultResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Contract-compliant async preview trigger response exposing only enqueue state. class PreviewEnqueueResultResponse(BaseModel): session_id: str preview_status: str task_id: Optional[str] = None # [/DEF:PreviewEnqueueResultResponse:Class] # [DEF:MappingCollectionResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Contract-compliant wrapper for execution mapping list responses. class MappingCollectionResponse(BaseModel): items: List[ExecutionMappingDto] # [/DEF:MappingCollectionResponse:Class] # [DEF:UpdateExecutionMappingRequest:Class] # @COMPLEXITY: 2 # @PURPOSE: Request DTO for one manual execution-mapping override update without introducing unrelated bulk mutation semantics. class UpdateExecutionMappingRequest(BaseModel): effective_value: Optional[Any] = None mapping_method: Optional[str] = Field(default=None, pattern="^(manual_override|direct_match|heuristic_match|semantic_match)$") transformation_note: Optional[str] = None # [/DEF:UpdateExecutionMappingRequest:Class] # [DEF:LaunchDatasetResponse:Class] # @COMPLEXITY: 2 # @PURPOSE: Contract-compliant launch result exposing audited run context and SQL Lab redirect target. class LaunchDatasetResponse(BaseModel): run_context: DatasetRunContextDto redirect_url: str # [/DEF:LaunchDatasetResponse:Class] # [DEF:_require_auto_review_flag:Function] # @COMPLEXITY: 3 # @PURPOSE: Guard US1 dataset review endpoints behind the configured feature flag. # @RELATION: [DEPENDS_ON] ->[ConfigManager] def _require_auto_review_flag(config_manager=Depends(get_config_manager)) -> bool: with belief_scope("dataset_review.require_auto_review_flag"): if not config_manager.get_config().settings.ff_dataset_auto_review: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Dataset auto review feature is disabled", ) return True # [/DEF:_require_auto_review_flag:Function] # [DEF:_require_clarification_flag:Function] # @COMPLEXITY: 3 # @PURPOSE: Guard clarification-specific US2 endpoints behind the configured feature flag. # @RELATION: [DEPENDS_ON] ->[ConfigManager] def _require_clarification_flag(config_manager=Depends(get_config_manager)) -> bool: with belief_scope("dataset_review.require_clarification_flag"): if not config_manager.get_config().settings.ff_dataset_clarification: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Dataset clarification feature is disabled", ) return True # [/DEF:_require_clarification_flag:Function] # [DEF:_require_execution_flag:Function] # @COMPLEXITY: 3 # @PURPOSE: Guard US3 execution endpoints behind the configured feature flag. # @RELATION: [DEPENDS_ON] ->[ConfigManager] def _require_execution_flag(config_manager=Depends(get_config_manager)) -> bool: with belief_scope("dataset_review.require_execution_flag"): if not config_manager.get_config().settings.ff_dataset_execution: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Dataset execution feature is disabled", ) return True # [/DEF:_require_execution_flag:Function] # [DEF:_get_repository:Function] # @COMPLEXITY: 2 # @PURPOSE: Build repository dependency for dataset review session aggregate access. def _get_repository(db: Session = Depends(get_db)) -> DatasetReviewSessionRepository: return DatasetReviewSessionRepository(db) # [/DEF:_get_repository:Function] # [DEF:_get_orchestrator:Function] # @COMPLEXITY: 3 # @PURPOSE: Build orchestrator dependency for session lifecycle actions. # @RELATION: [DEPENDS_ON] ->[DatasetReviewOrchestrator] def _get_orchestrator( repository: DatasetReviewSessionRepository = Depends(_get_repository), config_manager=Depends(get_config_manager), task_manager=Depends(get_task_manager), ) -> DatasetReviewOrchestrator: return DatasetReviewOrchestrator( repository=repository, config_manager=config_manager, task_manager=task_manager, ) # [/DEF:_get_orchestrator:Function] # [DEF:_get_clarification_engine:Function] # @COMPLEXITY: 3 # @PURPOSE: Build clarification engine dependency for one-question-at-a-time guided clarification mutations. # @RELATION: [DEPENDS_ON] ->[ClarificationEngine] def _get_clarification_engine( repository: DatasetReviewSessionRepository = Depends(_get_repository), ) -> ClarificationEngine: return ClarificationEngine(repository=repository) # [/DEF:_get_clarification_engine:Function] # [DEF:_serialize_session_summary:Function] # @COMPLEXITY: 3 # @PURPOSE: Map SQLAlchemy session aggregate root into stable API summary DTO. # @RELATION: [DEPENDS_ON] ->[SessionSummary] def _serialize_session_summary(session: DatasetReviewSession) -> SessionSummary: return SessionSummary.model_validate(session, from_attributes=True) # [/DEF:_serialize_session_summary:Function] # [DEF:_serialize_session_detail:Function] # @COMPLEXITY: 3 # @PURPOSE: Map SQLAlchemy session aggregate root into stable API detail DTO. # @RELATION: [DEPENDS_ON] ->[SessionDetail] def _serialize_session_detail(session: DatasetReviewSession) -> SessionDetail: return SessionDetail.model_validate(session, from_attributes=True) # [/DEF:_serialize_session_detail:Function] # [DEF:_serialize_semantic_field:Function] # @COMPLEXITY: 3 # @PURPOSE: Map one semantic field aggregate into stable field-level DTO output. # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntryDto] def _serialize_semantic_field(field: SemanticFieldEntry) -> SemanticFieldEntryDto: return SemanticFieldEntryDto.model_validate(field, from_attributes=True) # [/DEF:_serialize_semantic_field:Function] # [DEF:_serialize_clarification_question_payload:Function] # @COMPLEXITY: 3 # @PURPOSE: Convert clarification engine payload into API DTO aligned with the clarification contract. # @RELATION: [DEPENDS_ON] ->[ClarificationQuestionDto] def _serialize_clarification_question_payload( payload: Optional[ClarificationQuestionPayload], ) -> Optional[ClarificationQuestionDto]: if payload is None: return None return ClarificationQuestionDto.model_validate( { "question_id": payload.question_id, "clarification_session_id": payload.clarification_session_id, "topic_ref": payload.topic_ref, "question_text": payload.question_text, "why_it_matters": payload.why_it_matters, "current_guess": payload.current_guess, "priority": payload.priority, "state": payload.state, "options": payload.options, "answer": None, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), } ) # [/DEF:_serialize_clarification_question_payload:Function] # [DEF:_serialize_clarification_state:Function] # @COMPLEXITY: 3 # @PURPOSE: Convert clarification engine state into stable API response payload. # @RELATION: [DEPENDS_ON] ->[ClarificationStateResponse] def _serialize_clarification_state( state: ClarificationStateResult, ) -> ClarificationStateResponse: return ClarificationStateResponse( clarification_session=ClarificationSessionSummaryResponse( clarification_session_id=state.clarification_session.clarification_session_id, session_id=state.clarification_session.session_id, status=state.clarification_session.status.value, current_question_id=state.clarification_session.current_question_id, resolved_count=state.clarification_session.resolved_count, remaining_count=state.clarification_session.remaining_count, summary_delta=state.clarification_session.summary_delta, ), current_question=_serialize_clarification_question_payload(state.current_question), ) # [/DEF:_serialize_clarification_state:Function] # [DEF:_get_owned_session_or_404:Function] # @COMPLEXITY: 4 # @PURPOSE: Resolve one session for current user or collaborator scope, returning 404 when inaccessible. # @RELATION: [CALLS] ->[load_detail] # @PRE: session_id is a non-empty identifier and current_user is authenticated. # @POST: returns accessible session detail or raises HTTP 404 without leaking foreign-session existence. # @SIDE_EFFECT: none. # @DATA_CONTRACT: Input[session_id:str,current_user:User] -> Output[DatasetReviewSession|HTTPException] def _get_owned_session_or_404( repository: DatasetReviewSessionRepository, session_id: str, current_user: User, ) -> DatasetReviewSession: with belief_scope("dataset_review.get_owned_session_or_404"): session = repository.load_session_detail(session_id, current_user.id) if session is None: logger.explore( "Dataset review session not found in current ownership scope", extra={"session_id": session_id, "user_id": current_user.id}, ) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") return session # [/DEF:_get_owned_session_or_404:Function] # [DEF:_require_owner_mutation_scope:Function] # @COMPLEXITY: 4 # @PURPOSE: Enforce owner-only mutation scope for dataset review write endpoints. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] # @PRE: Session is already ownership-visible to the current user. # @POST: Returns the session when current user is owner, otherwise raises HTTP 403. # @SIDE_EFFECT: none. # @DATA_CONTRACT: Input[DatasetReviewSession,User] -> Output[DatasetReviewSession|HTTPException] def _require_owner_mutation_scope( session: DatasetReviewSession, current_user: User, ) -> DatasetReviewSession: if session.user_id != current_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the owner can mutate dataset review state", ) return session # [/DEF:_require_owner_mutation_scope:Function] # [DEF:_record_session_event:Function] # @COMPLEXITY: 3 # @PURPOSE: Persist one explicit audit event for an owned dataset-review mutation endpoint. # @RELATION: [CALLS] ->[SessionEventLogger.log_for_session] def _record_session_event( repository: DatasetReviewSessionRepository, session: DatasetReviewSession, current_user: User, *, event_type: str, event_summary: str, event_details: Optional[Dict[str, Any]] = None, ) -> None: repository.event_logger.log_for_session( session, actor_user_id=current_user.id, event_type=event_type, event_summary=event_summary, event_details=event_details or {}, ) # [/DEF:_record_session_event:Function] # [DEF:_get_owned_mapping_or_404:Function] # @COMPLEXITY: 4 # @PURPOSE: Resolve one execution mapping inside one owned session aggregate without leaking foreign-mapping existence. # @RELATION: [DEPENDS_ON] ->[ExecutionMapping] # @PRE: Session is accessible to current user. # @POST: Returns the requested mapping or raises HTTP 404. # @SIDE_EFFECT: none. # @DATA_CONTRACT: Input[DatasetReviewSession,mapping_id:str] -> Output[ExecutionMapping|HTTPException] def _get_owned_mapping_or_404( session: DatasetReviewSession, mapping_id: str, ) -> ExecutionMapping: for mapping in session.execution_mappings: if mapping.mapping_id == mapping_id: return mapping raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Execution mapping not found") # [/DEF:_get_owned_mapping_or_404:Function] # [DEF:_get_owned_field_or_404:Function] # @COMPLEXITY: 4 # @PURPOSE: Resolve a semantic field inside one owned session aggregate without leaking foreign-field existence. # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] # @PRE: Session is accessible to current user. # @POST: Returns the requested field or raises HTTP 404. # @SIDE_EFFECT: none. # @DATA_CONTRACT: Input[DatasetReviewSession,field_id:str] -> Output[SemanticFieldEntry|HTTPException] def _get_owned_field_or_404( session: DatasetReviewSession, field_id: str, ) -> SemanticFieldEntry: for field in session.semantic_fields: if field.field_id == field_id: return field raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Semantic field not found") # [/DEF:_get_owned_field_or_404:Function] # [DEF:_get_latest_clarification_session_or_404:Function] # @COMPLEXITY: 3 # @PURPOSE: Resolve the latest clarification aggregate for one session or raise when clarification is unavailable. # @RELATION: [DEPENDS_ON] ->[ClarificationSession] def _get_latest_clarification_session_or_404( session: DatasetReviewSession, ) -> ClarificationSession: if not session.clarification_sessions: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Clarification session not found") return sorted( session.clarification_sessions, key=lambda item: (item.started_at, item.clarification_session_id), reverse=True, )[0] # [/DEF:_get_latest_clarification_session_or_404:Function] # [DEF:_map_candidate_provenance:Function] # @COMPLEXITY: 2 # @PURPOSE: Translate accepted semantic candidate type into stable field provenance. def _map_candidate_provenance(candidate: SemanticCandidate) -> FieldProvenance: if str(candidate.match_type.value) == "exact": return FieldProvenance.DICTIONARY_EXACT if str(candidate.match_type.value) == "reference": return FieldProvenance.REFERENCE_IMPORTED if str(candidate.match_type.value) == "generated": return FieldProvenance.AI_GENERATED return FieldProvenance.FUZZY_INFERRED # [/DEF:_map_candidate_provenance:Function] # [DEF:_resolve_candidate_source_version:Function] # @COMPLEXITY: 3 # @PURPOSE: Resolve the semantic source version for one accepted candidate from the loaded session aggregate. # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] # @RELATION: [DEPENDS_ON] ->[SemanticSource] def _resolve_candidate_source_version(field: SemanticFieldEntry, source_id: Optional[str]) -> Optional[str]: if not source_id: return None session = getattr(field, "session", None) if session is None: return None for source in getattr(session, "semantic_sources", []) or []: if source.source_id == source_id: return source.source_version return None # [/DEF:_resolve_candidate_source_version:Function] # [DEF:_update_semantic_field_state:Function] # @COMPLEXITY: 4 # @PURPOSE: Apply field-level semantic manual override or candidate acceptance while preserving lock/provenance invariants. # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] # @RELATION: [DEPENDS_ON] ->[SemanticCandidate] # @PRE: Field belongs to the owned session and request is owner-authorized. # @POST: Manual overrides always set manual provenance plus lock; explicit field edits may lock accepted candidate state but later imports cannot silently replace locked values. # @SIDE_EFFECT: Mutates field state and candidate statuses in persistence. # @DATA_CONTRACT: Input[SemanticFieldEntry,FieldSemanticUpdateRequest,changed_by:str] -> Output[SemanticFieldEntry] def _update_semantic_field_state( field: SemanticFieldEntry, request: FieldSemanticUpdateRequest, changed_by: str, ) -> SemanticFieldEntry: has_manual_override = any( value is not None for value in [request.verbose_name, request.description, request.display_format] ) selected_candidate = None if request.candidate_id: selected_candidate = next( (candidate for candidate in field.candidates if candidate.candidate_id == request.candidate_id), None, ) if selected_candidate is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Semantic candidate not found") if has_manual_override: field.verbose_name = request.verbose_name field.description = request.description field.display_format = request.display_format field.provenance = FieldProvenance.MANUAL_OVERRIDE field.source_id = None field.source_version = None field.confidence_rank = None field.is_locked = True field.has_conflict = False field.needs_review = False field.last_changed_by = changed_by for candidate in field.candidates: candidate.status = CandidateStatus.SUPERSEDED return field if selected_candidate is not None: field.verbose_name = selected_candidate.proposed_verbose_name field.description = selected_candidate.proposed_description field.display_format = selected_candidate.proposed_display_format field.provenance = _map_candidate_provenance(selected_candidate) field.source_id = selected_candidate.source_id field.source_version = _resolve_candidate_source_version(field, selected_candidate.source_id) field.confidence_rank = selected_candidate.candidate_rank field.is_locked = bool(request.lock_field or field.is_locked) field.has_conflict = len(field.candidates) > 1 field.needs_review = False field.last_changed_by = changed_by for candidate in field.candidates: candidate.status = ( CandidateStatus.ACCEPTED if candidate.candidate_id == selected_candidate.candidate_id else CandidateStatus.SUPERSEDED ) return field raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Provide candidate_id or at least one manual override field", ) # [/DEF:_update_semantic_field_state:Function] # [DEF:_serialize_execution_mapping:Function] # @COMPLEXITY: 3 # @PURPOSE: Map one persisted execution mapping into stable API DTO output. # @RELATION: [DEPENDS_ON] ->[ExecutionMappingDto] def _serialize_execution_mapping(mapping: ExecutionMapping) -> ExecutionMappingDto: return ExecutionMappingDto.model_validate(mapping, from_attributes=True) # [/DEF:_serialize_execution_mapping:Function] # [DEF:_serialize_run_context:Function] # @COMPLEXITY: 3 # @PURPOSE: Map one persisted launch run context into stable API DTO output for SQL Lab handoff confirmation. # @RELATION: [DEPENDS_ON] ->[DatasetRunContextDto] def _serialize_run_context(run_context) -> DatasetRunContextDto: return DatasetRunContextDto.model_validate(run_context, from_attributes=True) # [/DEF:_serialize_run_context:Function] # [DEF:_build_sql_lab_redirect_url:Function] # @COMPLEXITY: 3 # @PURPOSE: Build a stable SQL Lab redirect URL from the configured Superset environment and persisted run context reference. # @RELATION: [DEPENDS_ON] ->[DatasetRunContextDto] def _build_sql_lab_redirect_url(environment_url: str, sql_lab_session_ref: str) -> str: base_url = str(environment_url or "").rstrip("/") session_ref = str(sql_lab_session_ref or "").strip() if not base_url: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Superset environment URL is not configured", ) if not session_ref: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="SQL Lab session reference is missing", ) return f"{base_url}/superset/sqllab?queryId={session_ref}" # [/DEF:_build_sql_lab_redirect_url:Function] # [DEF:_build_documentation_export:Function] # @COMPLEXITY: 3 # @PURPOSE: Produce session documentation export content from current persisted review state. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] def _build_documentation_export(session: DatasetReviewSession, export_format: ArtifactFormat) -> Dict[str, Any]: profile = session.profile findings = sorted(session.findings, key=lambda item: (item.severity.value, item.code)) if export_format == ArtifactFormat.MARKDOWN: lines = [ f"# Dataset Review: {session.dataset_ref}", "", f"- Session ID: {session.session_id}", f"- Environment: {session.environment_id}", f"- Readiness: {session.readiness_state.value}", f"- Recommended action: {session.recommended_action.value}", "", "## Business Summary", profile.business_summary if profile else "No profile summary available.", "", "## Findings", ] if findings: for finding in findings: lines.append( f"- [{finding.severity.value}] {finding.title}: {finding.message}" ) else: lines.append("- No findings recorded.") content = {"markdown": "\n".join(lines)} storage_ref = f"inline://dataset-review/{session.session_id}/documentation.md" else: content = { "session": _serialize_session_summary(session).model_dump(mode="json"), "profile": profile and { "dataset_name": profile.dataset_name, "business_summary": profile.business_summary, "confidence_state": profile.confidence_state.value, "dataset_type": profile.dataset_type, }, "findings": [ { "code": finding.code, "severity": finding.severity.value, "title": finding.title, "message": finding.message, "resolution_state": finding.resolution_state.value, } for finding in findings ], } storage_ref = f"inline://dataset-review/{session.session_id}/documentation.json" return {"storage_ref": storage_ref, "content": content} # [/DEF:_build_documentation_export:Function] # [DEF:_build_validation_export:Function] # @COMPLEXITY: 3 # @PURPOSE: Produce validation-focused export content from persisted findings and readiness state. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] def _build_validation_export(session: DatasetReviewSession, export_format: ArtifactFormat) -> Dict[str, Any]: findings = sorted(session.findings, key=lambda item: (item.severity.value, item.code)) if export_format == ArtifactFormat.MARKDOWN: lines = [ f"# Validation Report: {session.dataset_ref}", "", f"- Session ID: {session.session_id}", f"- Readiness: {session.readiness_state.value}", "", "## Findings", ] if findings: for finding in findings: lines.append( f"- `{finding.code}` [{finding.severity.value}] {finding.message}" ) else: lines.append("- No findings recorded.") content = {"markdown": "\n".join(lines)} storage_ref = f"inline://dataset-review/{session.session_id}/validation.md" else: content = { "session_id": session.session_id, "dataset_ref": session.dataset_ref, "readiness_state": session.readiness_state.value, "findings": [ { "finding_id": finding.finding_id, "area": finding.area.value, "severity": finding.severity.value, "code": finding.code, "title": finding.title, "message": finding.message, "resolution_state": finding.resolution_state.value, } for finding in findings ], } storage_ref = f"inline://dataset-review/{session.session_id}/validation.json" return {"storage_ref": storage_ref, "content": content} # [/DEF:_build_validation_export:Function] # [DEF:list_sessions:Function] # @COMPLEXITY: 3 # @PURPOSE: List resumable dataset review sessions for the current user. # @RELATION: [CALLS] ->[list_user_sess] @router.get( "/sessions", response_model=SessionCollectionResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "READ")), ], ) async def list_sessions( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.list_sessions"): sessions = repository.list_sessions_for_user(current_user.id) start = (page - 1) * page_size end = start + page_size items = [_serialize_session_summary(session) for session in sessions[start:end]] return SessionCollectionResponse( items=items, total=len(sessions), page=page, page_size=page_size, has_next=end < len(sessions), ) # [/DEF:list_sessions:Function] # [DEF:start_session:Function] # @COMPLEXITY: 4 # @PURPOSE: Start a new dataset review session from a Superset link or dataset selection. # @RELATION: [CALLS] ->[DatasetReviewOrchestrator.start_session] # @PRE: feature flag enabled, user authenticated, and request body valid. # @POST: returns persisted session summary scoped to the authenticated user. # @SIDE_EFFECT: persists session/profile/findings and may enqueue recovery task. # @DATA_CONTRACT: Input[StartSessionRequest] -> Output[SessionSummary] @router.post( "/sessions", response_model=SessionSummary, status_code=status.HTTP_201_CREATED, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def start_session( request: StartSessionRequest, orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.start_session"): try: result = orchestrator.start_session( StartSessionCommand( user=current_user, environment_id=request.environment_id, source_kind=request.source_kind, source_input=request.source_input, ) ) except ValueError as exc: logger.explore( "Dataset review session start rejected", extra={"user_id": current_user.id, "error": str(exc)}, ) detail = str(exc) status_code = status.HTTP_404_NOT_FOUND if detail == "Environment not found" else status.HTTP_400_BAD_REQUEST raise HTTPException(status_code=status_code, detail=detail) from exc return _serialize_session_summary(result.session) # [/DEF:start_session:Function] # [DEF:get_session_detail:Function] # @COMPLEXITY: 3 # @PURPOSE: Return the full accessible dataset review session aggregate for current user scope. # @RELATION: [CALLS] ->[_get_owned_session_or_404] @router.get( "/sessions/{session_id}", response_model=SessionDetail, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "READ")), ], ) async def get_session_detail( session_id: str, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.get_session_detail"): session = _get_owned_session_or_404(repository, session_id, current_user) return _serialize_session_detail(session) # [/DEF:get_session_detail:Function] # [DEF:update_session:Function] # @COMPLEXITY: 4 # @PURPOSE: Update resumable lifecycle status for an owned dataset review session. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] # @PRE: session is accessible to current user and requested status is allowed by lifecycle policy. # @POST: returns updated summary without changing ownership or unrelated aggregates. # @SIDE_EFFECT: mutates session lifecycle fields in persistence. # @DATA_CONTRACT: Input[UpdateSessionRequest] -> Output[SessionSummary] @router.patch( "/sessions/{session_id}", response_model=SessionSummary, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def update_session( session_id: str, request: UpdateSessionRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.update_session"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) session.status = request.status if request.status == SessionStatus.PAUSED: session.recommended_action = RecommendedAction.RESUME_SESSION elif request.status in {SessionStatus.ARCHIVED, SessionStatus.CANCELLED, SessionStatus.COMPLETED}: session.active_task_id = None repository.db.commit() repository.db.refresh(session) _record_session_event( repository, session, current_user, event_type="session_status_updated", event_summary="Dataset review session lifecycle updated", event_details={"status": session.status.value}, ) return _serialize_session_summary(session) # [/DEF:update_session:Function] # [DEF:delete_session:Function] # @COMPLEXITY: 4 # @PURPOSE: Archive or hard-delete a session owned by the current user. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] # @PRE: session is owner-scoped to current user. # @POST: session is archived or deleted and no foreign-session existence is disclosed. # @SIDE_EFFECT: mutates or deletes persisted session aggregate. # @DATA_CONTRACT: Input[session_id:str,hard_delete:bool] -> Output[HTTP 204] @router.delete( "/sessions/{session_id}", status_code=status.HTTP_204_NO_CONTENT, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def delete_session( session_id: str, hard_delete: bool = Query(False), repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.delete_session"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) if hard_delete: _record_session_event( repository, session, current_user, event_type="session_deleted", event_summary="Dataset review session hard-deleted", event_details={"hard_delete": True}, ) repository.db.delete(session) repository.db.commit() return Response(status_code=status.HTTP_204_NO_CONTENT) session.status = SessionStatus.ARCHIVED session.active_task_id = None repository.db.commit() repository.db.refresh(session) _record_session_event( repository, session, current_user, event_type="session_archived", event_summary="Dataset review session archived", event_details={"hard_delete": False}, ) return Response(status_code=status.HTTP_204_NO_CONTENT) # [/DEF:delete_session:Function] # [DEF:export_documentation:Function] # @COMPLEXITY: 4 # @PURPOSE: Export documentation output for the current session in JSON or Markdown form. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] # @PRE: session is accessible to current user and requested format is supported. # @POST: returns ownership-scoped export payload without fabricating unrelated artifacts. # @SIDE_EFFECT: none beyond response construction. # @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse] @router.get( "/sessions/{session_id}/exports/documentation", response_model=ExportArtifactResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "READ")), ], ) async def export_documentation( session_id: str, format: ArtifactFormat = Query(ArtifactFormat.JSON), repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.export_documentation"): if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Only json and markdown exports are supported") session = _get_owned_session_or_404(repository, session_id, current_user) export_payload = _build_documentation_export(session, format) return ExportArtifactResponse( artifact_id=f"documentation-{session.session_id}-{format.value}", session_id=session.session_id, artifact_type="documentation", format=format.value, storage_ref=export_payload["storage_ref"], created_by_user_id=current_user.id, content=export_payload["content"], ) # [/DEF:export_documentation:Function] # [DEF:export_validation:Function] # @COMPLEXITY: 4 # @PURPOSE: Export validation findings for the current session in JSON or Markdown form. # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] # @PRE: session is accessible to current user and requested format is supported. # @POST: returns explicit validation export payload scoped to current user session access. # @SIDE_EFFECT: none beyond response construction. # @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse] @router.get( "/sessions/{session_id}/exports/validation", response_model=ExportArtifactResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "READ")), ], ) async def export_validation( session_id: str, format: ArtifactFormat = Query(ArtifactFormat.JSON), repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.export_validation"): if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Only json and markdown exports are supported") session = _get_owned_session_or_404(repository, session_id, current_user) export_payload = _build_validation_export(session, format) return ExportArtifactResponse( artifact_id=f"validation-{session.session_id}-{format.value}", session_id=session.session_id, artifact_type="validation_report", format=format.value, storage_ref=export_payload["storage_ref"], created_by_user_id=current_user.id, content=export_payload["content"], ) # [/DEF:export_validation:Function] # [DEF:get_clarification_state:Function] # @COMPLEXITY: 4 # @PURPOSE: Return the current clarification session summary and one active question payload. # @RELATION: [CALLS] ->[ClarificationEngine.build_question_payload] # @PRE: Session is accessible to current user and clarification feature is enabled. # @POST: Returns at most one active clarification question with why_it_matters, current_guess, and ordered options. # @SIDE_EFFECT: May normalize clarification pointer and readiness state in persistence. # @DATA_CONTRACT: Input[session_id:str] -> Output[ClarificationStateResponse] @router.get( "/sessions/{session_id}/clarification", response_model=ClarificationStateResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_clarification_flag), Depends(has_permission("dataset:session", "READ")), ], ) async def get_clarification_state( session_id: str, repository: DatasetReviewSessionRepository = Depends(_get_repository), clarification_engine: ClarificationEngine = Depends(_get_clarification_engine), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.get_clarification_state"): session = _get_owned_session_or_404(repository, session_id, current_user) clarification_session = _get_latest_clarification_session_or_404(session) current_question = clarification_engine.build_question_payload(session) return _serialize_clarification_state( ClarificationStateResult( clarification_session=clarification_session, current_question=current_question, session=session, changed_findings=[], ) ) # [/DEF:get_clarification_state:Function] # [DEF:resume_clarification:Function] # @COMPLEXITY: 4 # @PURPOSE: Resume clarification mode on the highest-priority unresolved question for an owned session. # @RELATION: [CALLS] ->[ClarificationEngine.build_question_payload] # @PRE: Session belongs to the current owner and clarification feature is enabled. # @POST: Clarification session enters active state with one current question or completes deterministically when no unresolved items remain. # @SIDE_EFFECT: Mutates clarification pointer, readiness, and recommended action. # @DATA_CONTRACT: Input[session_id:str] -> Output[ClarificationStateResponse] @router.post( "/sessions/{session_id}/clarification/resume", response_model=ClarificationStateResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_clarification_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def resume_clarification( session_id: str, repository: DatasetReviewSessionRepository = Depends(_get_repository), clarification_engine: ClarificationEngine = Depends(_get_clarification_engine), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.resume_clarification"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) clarification_session = _get_latest_clarification_session_or_404(session) current_question = clarification_engine.build_question_payload(session) return _serialize_clarification_state( ClarificationStateResult( clarification_session=clarification_session, current_question=current_question, session=session, changed_findings=[], ) ) # [/DEF:resume_clarification:Function] # [DEF:record_clarification_answer:Function] # @COMPLEXITY: 4 # @PURPOSE: Persist one clarification answer before advancing the active pointer or readiness state. # @RELATION: [CALLS] ->[ClarificationEngine.record_answer] # @PRE: Target question is the session's active clarification question and current user owns the session. # @POST: Answer is persisted, changed findings are returned, and unresolved skipped/expert-review questions remain visible. # @SIDE_EFFECT: Inserts answer row and mutates clarification/session state. # @DATA_CONTRACT: Input[ClarificationAnswerRequest] -> Output[ClarificationAnswerResultResponse] @router.post( "/sessions/{session_id}/clarification/answers", response_model=ClarificationAnswerResultResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_clarification_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def record_clarification_answer( session_id: str, request: ClarificationAnswerRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), clarification_engine: ClarificationEngine = Depends(_get_clarification_engine), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.record_clarification_answer"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) try: result = clarification_engine.record_answer( ClarificationAnswerCommand( session=session, question_id=request.question_id, answer_kind=request.answer_kind, answer_value=request.answer_value, user=current_user, ) ) except ValueError as exc: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc return ClarificationAnswerResultResponse( clarification_state=_serialize_clarification_state(result), session=_serialize_session_summary(result.session), changed_findings=[ ValidationFindingDto.model_validate(item, from_attributes=True) for item in result.changed_findings ], ) # [/DEF:record_clarification_answer:Function] # [DEF:update_field_semantic:Function] # @COMPLEXITY: 4 # @PURPOSE: Apply one field-level semantic candidate decision or manual override with lock/provenance safeguards. # @RELATION: [CALLS] ->[_update_semantic_field_state] # @PRE: Session and field belong to the current owner, and request contains a candidate selection or manual override values. # @POST: Manual overrides set manual provenance plus lock; explicit lock state prevents later silent replacement. # @SIDE_EFFECT: Mutates field state and accepted/superseded candidate statuses in persistence. # @DATA_CONTRACT: Input[FieldSemanticUpdateRequest] -> Output[SemanticFieldEntryDto] @router.patch( "/sessions/{session_id}/fields/{field_id}/semantic", response_model=SemanticFieldEntryDto, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def update_field_semantic( session_id: str, field_id: str, request: FieldSemanticUpdateRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.update_field_semantic"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) field = _get_owned_field_or_404(session, field_id) _update_semantic_field_state(field, request, changed_by="user") repository.db.commit() repository.db.refresh(field) _record_session_event( repository, session, current_user, event_type="semantic_field_updated", event_summary="Semantic field decision persisted", event_details={ "field_id": field.field_id, "candidate_id": request.candidate_id, "is_locked": field.is_locked, "source_id": field.source_id, "source_version": field.source_version, }, ) return _serialize_semantic_field(field) # [/DEF:update_field_semantic:Function] # [DEF:lock_field_semantic:Function] # @COMPLEXITY: 4 # @PURPOSE: Lock one semantic field against later automatic overwrite while preserving the current active value. # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] # @PRE: Field belongs to the current owner. # @POST: Field remains active and locked; later imports may add candidates but cannot replace the locked value implicitly. # @SIDE_EFFECT: Mutates field lock state in persistence. # @DATA_CONTRACT: Input[session_id:str,field_id:str] -> Output[SemanticFieldEntryDto] @router.post( "/sessions/{session_id}/fields/{field_id}/lock", response_model=SemanticFieldEntryDto, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def lock_field_semantic( session_id: str, field_id: str, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.lock_field_semantic"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) field = _get_owned_field_or_404(session, field_id) field.is_locked = True field.last_changed_by = "user" repository.db.commit() repository.db.refresh(field) _record_session_event( repository, session, current_user, event_type="semantic_field_locked", event_summary="Semantic field lock persisted", event_details={"field_id": field.field_id}, ) return _serialize_semantic_field(field) # [/DEF:lock_field_semantic:Function] # [DEF:unlock_field_semantic:Function] # @COMPLEXITY: 4 # @PURPOSE: Unlock one semantic field so later automated candidate application may replace it explicitly. # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] # @PRE: Field belongs to the current owner. # @POST: Field becomes unlocked; manual-override provenance is downgraded to unresolved to preserve the lock/provenance invariant. # @SIDE_EFFECT: Mutates field lock/provenance state in persistence. # @DATA_CONTRACT: Input[session_id:str,field_id:str] -> Output[SemanticFieldEntryDto] @router.post( "/sessions/{session_id}/fields/{field_id}/unlock", response_model=SemanticFieldEntryDto, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def unlock_field_semantic( session_id: str, field_id: str, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.unlock_field_semantic"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) field = _get_owned_field_or_404(session, field_id) field.is_locked = False field.last_changed_by = "user" if field.provenance == FieldProvenance.MANUAL_OVERRIDE: field.provenance = FieldProvenance.UNRESOLVED field.needs_review = True repository.db.commit() repository.db.refresh(field) _record_session_event( repository, session, current_user, event_type="semantic_field_unlocked", event_summary="Semantic field unlock persisted", event_details={"field_id": field.field_id}, ) return _serialize_semantic_field(field) # [/DEF:unlock_field_semantic:Function] # [DEF:approve_batch_semantic_fields:Function] # @COMPLEXITY: 4 # @PURPOSE: Approve multiple semantic candidate decisions in one owner-authorized batch without bypassing single-field semantics. # @RELATION: [CALLS] ->[_update_semantic_field_state] # @PRE: Session belongs to the current owner and each requested field/candidate pair is contained in the session aggregate. # @POST: Returns updated semantic fields after applying the same candidate/lock invariants as the single-field endpoint. # @SIDE_EFFECT: Persists multiple semantic field decisions in one transaction and records one explicit session audit event. # @DATA_CONTRACT: Input[BatchApproveSemanticRequest] -> Output[List[SemanticFieldEntryDto]] @router.post( "/sessions/{session_id}/fields/semantic/approve-batch", response_model=List[SemanticFieldEntryDto], dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def approve_batch_semantic_fields( session_id: str, request: BatchApproveSemanticRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.approve_batch_semantic_fields"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) updated_fields: List[SemanticFieldEntry] = [] for item in request.items: field = _get_owned_field_or_404(session, item.field_id) updated_field = _update_semantic_field_state( field, FieldSemanticUpdateRequest(candidate_id=item.candidate_id, lock_field=item.lock_field), changed_by="user", ) updated_fields.append(updated_field) repository.db.commit() for field in updated_fields: repository.db.refresh(field) _record_session_event( repository, session, current_user, event_type="semantic_fields_batch_approved", event_summary="Batch semantic approval persisted", event_details={ "field_ids": [field.field_id for field in updated_fields], "count": len(updated_fields), }, ) return [_serialize_semantic_field(field) for field in updated_fields] # [/DEF:approve_batch_semantic_fields:Function] # [DEF:list_execution_mappings:Function] # @COMPLEXITY: 4 # @PURPOSE: Return the current mapping-review set for one accessible session. # @RELATION: [CALLS] ->[_get_owned_session_or_404] # @PRE: Session is ownership-accessible to the authenticated user and execution feature is enabled. # @POST: Returns the persisted mapping review set for the requested session wrapped in the contract collection shape without mutating approval state. # @SIDE_EFFECT: none. # @DATA_CONTRACT: Input[session_id:str] -> Output[MappingCollectionResponse] @router.get( "/sessions/{session_id}/mappings", response_model=MappingCollectionResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_execution_flag), Depends(has_permission("dataset:session", "READ")), ], ) async def list_execution_mappings( session_id: str, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.list_execution_mappings"): session = _get_owned_session_or_404(repository, session_id, current_user) return MappingCollectionResponse( items=[_serialize_execution_mapping(item) for item in session.execution_mappings] ) # [/DEF:list_execution_mappings:Function] # [DEF:update_execution_mapping:Function] # @COMPLEXITY: 4 # @PURPOSE: Persist one owner-authorized execution-mapping effective value override and invalidate stale preview truth. # @RELATION: [DEPENDS_ON] ->[ExecutionMapping] # @PRE: Mapping belongs to the current owner session and request carries an explicit effective value decision. # @POST: Mapping effective value and override metadata are persisted and any prior preview truth is marked stale for safe relaunch. # @SIDE_EFFECT: Mutates mapping value/approval state, may mark latest preview stale, and updates session readiness cues. # @DATA_CONTRACT: Input[UpdateExecutionMappingRequest] -> Output[ExecutionMappingDto] @router.patch( "/sessions/{session_id}/mappings/{mapping_id}", response_model=ExecutionMappingDto, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_execution_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def update_execution_mapping( session_id: str, mapping_id: str, request: UpdateExecutionMappingRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.update_execution_mapping"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) mapping = _get_owned_mapping_or_404(session, mapping_id) if request.effective_value is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="effective_value is required for execution mapping updates", ) mapping.effective_value = request.effective_value mapping.mapping_method = MappingMethod(request.mapping_method or MappingMethod.MANUAL_OVERRIDE.value) mapping.transformation_note = request.transformation_note mapping.approval_state = ApprovalState.APPROVED mapping.approved_by_user_id = current_user.id mapping.approved_at = datetime.utcnow() session.last_activity_at = datetime.utcnow() session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW if session.readiness_state in { ReadinessState.MAPPING_REVIEW_NEEDED, ReadinessState.COMPILED_PREVIEW_READY, ReadinessState.RUN_READY, ReadinessState.RUN_IN_PROGRESS, }: session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY for preview in session.previews: if preview.preview_status == PreviewStatus.READY: preview.preview_status = PreviewStatus.STALE repository.db.commit() repository.db.refresh(mapping) _record_session_event( repository, session, current_user, event_type="execution_mapping_updated", event_summary="Execution mapping override persisted", event_details={ "mapping_id": mapping.mapping_id, "approval_state": mapping.approval_state.value, "preview_state": "stale", }, ) return _serialize_execution_mapping(mapping) # [/DEF:update_execution_mapping:Function] # [DEF:approve_execution_mapping:Function] # @COMPLEXITY: 4 # @PURPOSE: Explicitly approve a warning-sensitive mapping transformation and preserve audit note state. # @RELATION: [DEPENDS_ON] ->[ExecutionMapping] # @PRE: Mapping belongs to the current owner session and execution feature is enabled. # @POST: Mapping approval state becomes approved and owner-scoped audit markers are updated. # @SIDE_EFFECT: Mutates persisted mapping approval state and session readiness cues. # @DATA_CONTRACT: Input[ApproveMappingRequest] -> Output[ExecutionMappingDto] @router.post( "/sessions/{session_id}/mappings/{mapping_id}/approve", response_model=ExecutionMappingDto, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_execution_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def approve_execution_mapping( session_id: str, mapping_id: str, request: ApproveMappingRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.approve_execution_mapping"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) mapping = _get_owned_mapping_or_404(session, mapping_id) mapping.approval_state = ApprovalState.APPROVED mapping.approved_by_user_id = current_user.id mapping.approved_at = datetime.utcnow() if request.approval_note: mapping.transformation_note = request.approval_note session.last_activity_at = datetime.utcnow() if session.readiness_state == ReadinessState.MAPPING_REVIEW_NEEDED: session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW repository.db.commit() repository.db.refresh(mapping) _record_session_event( repository, session, current_user, event_type="execution_mapping_approved", event_summary="Execution mapping approval persisted", event_details={ "mapping_id": mapping.mapping_id, "approval_state": mapping.approval_state.value, }, ) return _serialize_execution_mapping(mapping) # [/DEF:approve_execution_mapping:Function] # [DEF:approve_batch_execution_mappings:Function] # @COMPLEXITY: 4 # @PURPOSE: Approve multiple warning-sensitive execution mappings in one owner-authorized batch. # @RELATION: [DEPENDS_ON] ->[ExecutionMapping] # @PRE: Session belongs to the current owner and every requested mapping belongs to the same session aggregate. # @POST: Returns updated mappings after applying the same approval semantics as the single mapping endpoint. # @SIDE_EFFECT: Persists multiple approvals and records one explicit audit event. # @DATA_CONTRACT: Input[BatchApproveMappingRequest] -> Output[List[ExecutionMappingDto]] @router.post( "/sessions/{session_id}/mappings/approve-batch", response_model=List[ExecutionMappingDto], dependencies=[ Depends(_require_auto_review_flag), Depends(_require_execution_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def approve_batch_execution_mappings( session_id: str, request: BatchApproveMappingRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.approve_batch_execution_mappings"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) updated_mappings: List[ExecutionMapping] = [] for mapping_id in list(dict.fromkeys(request.mapping_ids)): mapping = _get_owned_mapping_or_404(session, mapping_id) mapping.approval_state = ApprovalState.APPROVED mapping.approved_by_user_id = current_user.id mapping.approved_at = datetime.utcnow() if request.approval_note: mapping.transformation_note = request.approval_note updated_mappings.append(mapping) session.last_activity_at = datetime.utcnow() if session.readiness_state == ReadinessState.MAPPING_REVIEW_NEEDED: session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW repository.db.commit() for mapping in updated_mappings: repository.db.refresh(mapping) _record_session_event( repository, session, current_user, event_type="execution_mappings_batch_approved", event_summary="Batch mapping approval persisted", event_details={ "mapping_ids": [mapping.mapping_id for mapping in updated_mappings], "count": len(updated_mappings), }, ) return [_serialize_execution_mapping(mapping) for mapping in updated_mappings] # [/DEF:approve_batch_execution_mappings:Function] # [DEF:trigger_preview_generation:Function] # @COMPLEXITY: 4 # @PURPOSE: Trigger Superset-side preview compilation for the current owned execution context. # @RELATION: [CALLS] ->[DatasetReviewOrchestrator.prepare_launch_preview] # @PRE: Session belongs to the current owner and required mapping inputs are available. # @POST: Returns the compiled preview directly for synchronous success or enqueue-state shape when preview generation remains pending. # @SIDE_EFFECT: Persists preview attempt and updates readiness state. # @DATA_CONTRACT: Input[session_id:str] -> Output[CompiledPreviewDto | PreviewEnqueueResultResponse] @router.post( "/sessions/{session_id}/preview", response_model=Union[CompiledPreviewDto, PreviewEnqueueResultResponse], dependencies=[ Depends(_require_auto_review_flag), Depends(_require_execution_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def trigger_preview_generation( session_id: str, response: Response, orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.trigger_preview_generation"): try: result = orchestrator.prepare_launch_preview( PreparePreviewCommand( user=current_user, session_id=session_id, ) ) except ValueError as exc: detail = str(exc) status_code = ( status.HTTP_404_NOT_FOUND if detail in {"Session not found", "Environment not found"} else status.HTTP_409_CONFLICT if detail.startswith("Preview blocked:") else status.HTTP_400_BAD_REQUEST ) raise HTTPException(status_code=status_code, detail=detail) from exc if result.preview.preview_status == PreviewStatus.PENDING: response.status_code = status.HTTP_202_ACCEPTED return PreviewEnqueueResultResponse( session_id=result.session.session_id, preview_status=result.preview.preview_status.value, task_id=None, ) response.status_code = status.HTTP_200_OK return CompiledPreviewDto.model_validate(result.preview, from_attributes=True) # [/DEF:trigger_preview_generation:Function] # [DEF:launch_dataset:Function] # @COMPLEXITY: 4 # @PURPOSE: Execute the current owned session launch handoff through the orchestrator and return audited SQL Lab run context. # @RELATION: [CALLS] ->[DatasetReviewOrchestrator.launch_dataset] # @PRE: Session belongs to the current owner, execution feature is enabled, and launch gates are satisfied or a deterministic conflict is returned. # @POST: Returns persisted run context plus redirect URL when launch handoff is accepted. # @SIDE_EFFECT: Persists launch audit snapshot and may trigger SQL Lab session creation. # @DATA_CONTRACT: Input[session_id:str] -> Output[LaunchDatasetResponse] @router.post( "/sessions/{session_id}/launch", response_model=LaunchDatasetResponse, status_code=status.HTTP_201_CREATED, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_execution_flag), Depends(has_permission("dataset:execution:launch", "EXECUTE")), ], ) async def launch_dataset( session_id: str, orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator), config_manager=Depends(get_config_manager), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.launch_dataset"): try: result = orchestrator.launch_dataset( LaunchDatasetCommand( user=current_user, session_id=session_id, ) ) except ValueError as exc: detail = str(exc) status_code = ( status.HTTP_404_NOT_FOUND if detail in {"Session not found", "Environment not found"} else status.HTTP_409_CONFLICT if detail.startswith("Launch blocked:") else status.HTTP_400_BAD_REQUEST ) raise HTTPException(status_code=status_code, detail=detail) from exc environment = config_manager.get_environment(result.session.environment_id) environment_url = getattr(environment, "url", "") if environment is not None else "" return LaunchDatasetResponse( run_context=_serialize_run_context(result.run_context), redirect_url=_build_sql_lab_redirect_url( environment_url=environment_url, sql_lab_session_ref=result.run_context.sql_lab_session_ref, ), ) # [/DEF:launch_dataset:Function] # [DEF:record_field_feedback:Function] # @COMPLEXITY: 4 # @PURPOSE: Persist thumbs up/down feedback for AI-assisted semantic field content. # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] # @PRE: Field belongs to the current owner and feedback value is valid. # @POST: Field feedback is stored without altering lock or active semantic value. # @SIDE_EFFECT: Updates one persisted semantic field feedback marker. # @DATA_CONTRACT: Input[FeedbackRequest] -> Output[FeedbackResponse] @router.post( "/sessions/{session_id}/fields/{field_id}/feedback", response_model=FeedbackResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def record_field_feedback( session_id: str, field_id: str, request: FeedbackRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.record_field_feedback"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) field = _get_owned_field_or_404(session, field_id) field.user_feedback = request.feedback repository.db.commit() _record_session_event( repository, session, current_user, event_type="semantic_field_feedback_recorded", event_summary="Semantic field feedback persisted", event_details={"field_id": field.field_id, "feedback": request.feedback}, ) return FeedbackResponse(target_id=field.field_id, feedback=request.feedback) # [/DEF:record_field_feedback:Function] # [DEF:record_clarification_feedback:Function] # @COMPLEXITY: 4 # @PURPOSE: Persist thumbs up/down feedback for clarification question/answer content used in guided review. # @RELATION: [DEPENDS_ON] ->[ClarificationAnswer] # @PRE: Clarification question belongs to the current owner session and already has a persisted answer. # @POST: Feedback is stored on the clarification answer audit record. # @SIDE_EFFECT: Updates one clarification answer feedback marker in persistence. # @DATA_CONTRACT: Input[FeedbackRequest] -> Output[FeedbackResponse] @router.post( "/sessions/{session_id}/clarification/questions/{question_id}/feedback", response_model=FeedbackResponse, dependencies=[ Depends(_require_auto_review_flag), Depends(_require_clarification_flag), Depends(has_permission("dataset:session", "MANAGE")), ], ) async def record_clarification_feedback( session_id: str, question_id: str, request: FeedbackRequest, repository: DatasetReviewSessionRepository = Depends(_get_repository), current_user: User = Depends(get_current_user), ): with belief_scope("dataset_review.record_clarification_feedback"): session = _get_owned_session_or_404(repository, session_id, current_user) _require_owner_mutation_scope(session, current_user) clarification_session = _get_latest_clarification_session_or_404(session) question = next( (item for item in clarification_session.questions if item.question_id == question_id), None, ) if question is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Clarification question not found") if question.answer is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Clarification answer not found") question.answer.user_feedback = request.feedback repository.db.commit() _record_session_event( repository, session, current_user, event_type="clarification_feedback_recorded", event_summary="Clarification feedback persisted", event_details={"question_id": question.question_id, "feedback": request.feedback}, ) return FeedbackResponse(target_id=question.question_id, feedback=request.feedback) # [/DEF:record_clarification_feedback:Function] # [/DEF:DatasetReviewApi:Module]