From f4416c3ebb02fa4c26cb233fb9fd816c3db6f68a Mon Sep 17 00:00:00 2001 From: busya Date: Mon, 16 Mar 2026 23:43:03 +0300 Subject: [PATCH] feat: initial dataset review orchestration flow implementation --- backend/src/core/config_models.py | 5 + backend/src/models/dataset_review.py | 661 ++++++++++++++++++ backend/src/schemas/dataset_review.py | 362 ++++++++++ backend/src/scripts/seed_permissions.py | 12 + .../src/services/dataset_review/__init__.py | 7 + .../__tests__/test_session_repository.py | 171 +++++ .../repositories/session_repository.py | 146 ++++ .../__tests__/test_datasetReviewSession.js | 112 +++ .../src/lib/stores/datasetReviewSession.js | 89 +++ 9 files changed, 1565 insertions(+) create mode 100644 backend/src/models/dataset_review.py create mode 100644 backend/src/schemas/dataset_review.py create mode 100644 backend/src/services/dataset_review/__init__.py create mode 100644 backend/src/services/dataset_review/repositories/__tests__/test_session_repository.py create mode 100644 backend/src/services/dataset_review/repositories/session_repository.py create mode 100644 frontend/src/lib/stores/__tests__/test_datasetReviewSession.js create mode 100644 frontend/src/lib/stores/datasetReviewSession.js diff --git a/backend/src/core/config_models.py b/backend/src/core/config_models.py index ca5e7d39..83115b8e 100755 --- a/backend/src/core/config_models.py +++ b/backend/src/core/config_models.py @@ -81,6 +81,11 @@ class GlobalSettings(BaseModel): # Migration sync settings migration_sync_cron: str = "0 2 * * *" + + # Dataset Review Feature Flags + ff_dataset_auto_review: bool = True + ff_dataset_clarification: bool = True + ff_dataset_execution: bool = True # [/DEF:GlobalSettings:DataClass] # [DEF:AppConfig:DataClass] diff --git a/backend/src/models/dataset_review.py b/backend/src/models/dataset_review.py new file mode 100644 index 00000000..055bbdbe --- /dev/null +++ b/backend/src/models/dataset_review.py @@ -0,0 +1,661 @@ +# [DEF:DatasetReviewModels:Module] +# +# @TIER: STANDARD +# @COMPLEXITY: 3 +# @SEMANTICS: dataset_review, session, profile, findings, semantics, clarification, execution, sqlalchemy +# @PURPOSE: SQLAlchemy models for the dataset review orchestration flow. +# @LAYER: Domain +# @RELATION: INHERITS_FROM -> [Base] +# @RELATION: DEPENDS_ON -> [AuthModels] +# @RELATION: DEPENDS_ON -> [MappingModels] +# +# @INVARIANT: Session and profile entities are strictly scoped to an authenticated user. + +# [SECTION: IMPORTS] +import uuid +import enum +from datetime import datetime +from typing import List, Optional +from sqlalchemy import Column, String, Integer, Boolean, DateTime, ForeignKey, Text, JSON, Float, Enum as SQLEnum, Table +from sqlalchemy.orm import relationship +from .mapping import Base +# [/SECTION] + +# [DEF:SessionStatus:Class] +class SessionStatus(str, enum.Enum): + ACTIVE = "active" + PAUSED = "paused" + COMPLETED = "completed" + ARCHIVED = "archived" + CANCELLED = "cancelled" +# [/DEF:SessionStatus:Class] + +# [DEF:SessionPhase:Class] +class SessionPhase(str, enum.Enum): + INTAKE = "intake" + RECOVERY = "recovery" + REVIEW = "review" + SEMANTIC_REVIEW = "semantic_review" + CLARIFICATION = "clarification" + MAPPING_REVIEW = "mapping_review" + PREVIEW = "preview" + LAUNCH = "launch" + POST_RUN = "post_run" +# [/DEF:SessionPhase:Class] + +# [DEF:ReadinessState:Class] +class ReadinessState(str, enum.Enum): + EMPTY = "empty" + IMPORTING = "importing" + REVIEW_READY = "review_ready" + SEMANTIC_SOURCE_REVIEW_NEEDED = "semantic_source_review_needed" + CLARIFICATION_NEEDED = "clarification_needed" + CLARIFICATION_ACTIVE = "clarification_active" + MAPPING_REVIEW_NEEDED = "mapping_review_needed" + COMPILED_PREVIEW_READY = "compiled_preview_ready" + PARTIALLY_READY = "partially_ready" + RUN_READY = "run_ready" + RUN_IN_PROGRESS = "run_in_progress" + COMPLETED = "completed" + RECOVERY_REQUIRED = "recovery_required" +# [/DEF:ReadinessState:Class] + +# [DEF:RecommendedAction:Class] +class RecommendedAction(str, enum.Enum): + IMPORT_FROM_SUPERSET = "import_from_superset" + REVIEW_DOCUMENTATION = "review_documentation" + APPLY_SEMANTIC_SOURCE = "apply_semantic_source" + START_CLARIFICATION = "start_clarification" + ANSWER_NEXT_QUESTION = "answer_next_question" + APPROVE_MAPPING = "approve_mapping" + GENERATE_SQL_PREVIEW = "generate_sql_preview" + COMPLETE_REQUIRED_VALUES = "complete_required_values" + LAUNCH_DATASET = "launch_dataset" + RESUME_SESSION = "resume_session" + EXPORT_OUTPUTS = "export_outputs" +# [/DEF:RecommendedAction:Class] + +# [DEF:SessionCollaboratorRole:Class] +class SessionCollaboratorRole(str, enum.Enum): + VIEWER = "viewer" + REVIEWER = "reviewer" + APPROVER = "approver" +# [/DEF:SessionCollaboratorRole:Class] + +# [DEF:SessionCollaborator:Class] +class SessionCollaborator(Base): + __tablename__ = "session_collaborators" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + user_id = Column(String, ForeignKey("users.id"), nullable=False) + role = Column(SQLEnum(SessionCollaboratorRole), nullable=False) + added_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="collaborators") + user = relationship("User") +# [/DEF:SessionCollaborator:Class] + +# [DEF:DatasetReviewSession:Class] +class DatasetReviewSession(Base): + __tablename__ = "dataset_review_sessions" + + session_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + user_id = Column(String, ForeignKey("users.id"), nullable=False) + environment_id = Column(String, ForeignKey("environments.id"), nullable=False) + source_kind = Column(String, nullable=False) # superset_link, dataset_selection + source_input = Column(String, nullable=False) + dataset_ref = Column(String, nullable=False) + dataset_id = Column(Integer, nullable=True) + dashboard_id = Column(Integer, nullable=True) + readiness_state = Column(SQLEnum(ReadinessState), nullable=False, default=ReadinessState.EMPTY) + recommended_action = Column(SQLEnum(RecommendedAction), nullable=False, default=RecommendedAction.IMPORT_FROM_SUPERSET) + status = Column(SQLEnum(SessionStatus), nullable=False, default=SessionStatus.ACTIVE) + current_phase = Column(SQLEnum(SessionPhase), nullable=False, default=SessionPhase.INTAKE) + active_task_id = Column(String, nullable=True) + last_preview_id = Column(String, nullable=True) + last_run_context_id = Column(String, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + last_activity_at = Column(DateTime, default=datetime.utcnow, nullable=False) + closed_at = Column(DateTime, nullable=True) + + owner = relationship("User") + collaborators = relationship("SessionCollaborator", back_populates="session", cascade="all, delete-orphan") + profile = relationship("DatasetProfile", back_populates="session", uselist=False, cascade="all, delete-orphan") + findings = relationship("ValidationFinding", back_populates="session", cascade="all, delete-orphan") + semantic_sources = relationship("SemanticSource", back_populates="session", cascade="all, delete-orphan") + semantic_fields = relationship("SemanticFieldEntry", back_populates="session", cascade="all, delete-orphan") + imported_filters = relationship("ImportedFilter", back_populates="session", cascade="all, delete-orphan") + template_variables = relationship("TemplateVariable", back_populates="session", cascade="all, delete-orphan") + execution_mappings = relationship("ExecutionMapping", back_populates="session", cascade="all, delete-orphan") + clarification_sessions = relationship("ClarificationSession", back_populates="session", cascade="all, delete-orphan") + previews = relationship("CompiledPreview", back_populates="session", cascade="all, delete-orphan") + run_contexts = relationship("DatasetRunContext", back_populates="session", cascade="all, delete-orphan") + export_artifacts = relationship("ExportArtifact", back_populates="session", cascade="all, delete-orphan") +# [/DEF:DatasetReviewSession:Class] + +# [DEF:BusinessSummarySource:Class] +class BusinessSummarySource(str, enum.Enum): + CONFIRMED = "confirmed" + IMPORTED = "imported" + INFERRED = "inferred" + AI_DRAFT = "ai_draft" + MANUAL_OVERRIDE = "manual_override" +# [/DEF:BusinessSummarySource:Class] + +# [DEF:ConfidenceState:Class] +class ConfidenceState(str, enum.Enum): + CONFIRMED = "confirmed" + MOSTLY_CONFIRMED = "mostly_confirmed" + MIXED = "mixed" + LOW_CONFIDENCE = "low_confidence" + UNRESOLVED = "unresolved" +# [/DEF:ConfidenceState:Class] + +# [DEF:DatasetProfile:Class] +class DatasetProfile(Base): + __tablename__ = "dataset_profiles" + + profile_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False, unique=True) + dataset_name = Column(String, nullable=False) + schema_name = Column(String, nullable=True) + database_name = Column(String, nullable=True) + business_summary = Column(Text, nullable=False) + business_summary_source = Column(SQLEnum(BusinessSummarySource), nullable=False) + description = Column(Text, nullable=True) + dataset_type = Column(String, nullable=True) # table, virtual, sqllab_view, unknown + is_sqllab_view = Column(Boolean, nullable=False, default=False) + completeness_score = Column(Float, nullable=True) + confidence_state = Column(SQLEnum(ConfidenceState), nullable=False) + has_blocking_findings = Column(Boolean, nullable=False, default=False) + has_warning_findings = Column(Boolean, nullable=False, default=False) + manual_summary_locked = Column(Boolean, nullable=False, default=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="profile") +# [/DEF:DatasetProfile:Class] + +# [DEF:FindingArea:Class] +class FindingArea(str, enum.Enum): + SOURCE_INTAKE = "source_intake" + DATASET_PROFILE = "dataset_profile" + SEMANTIC_ENRICHMENT = "semantic_enrichment" + CLARIFICATION = "clarification" + FILTER_RECOVERY = "filter_recovery" + TEMPLATE_MAPPING = "template_mapping" + COMPILED_PREVIEW = "compiled_preview" + LAUNCH = "launch" + AUDIT = "audit" +# [/DEF:FindingArea:Class] + +# [DEF:FindingSeverity:Class] +class FindingSeverity(str, enum.Enum): + BLOCKING = "blocking" + WARNING = "warning" + INFORMATIONAL = "informational" +# [/DEF:FindingSeverity:Class] + +# [DEF:ResolutionState:Class] +class ResolutionState(str, enum.Enum): + OPEN = "open" + RESOLVED = "resolved" + APPROVED = "approved" + SKIPPED = "skipped" + DEFERRED = "deferred" + EXPERT_REVIEW = "expert_review" +# [/DEF:ResolutionState:Class] + +# [DEF:ValidationFinding:Class] +class ValidationFinding(Base): + __tablename__ = "validation_findings" + + finding_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + area = Column(SQLEnum(FindingArea), nullable=False) + severity = Column(SQLEnum(FindingSeverity), nullable=False) + code = Column(String, nullable=False) + title = Column(String, nullable=False) + message = Column(Text, nullable=False) + resolution_state = Column(SQLEnum(ResolutionState), nullable=False, default=ResolutionState.OPEN) + resolution_note = Column(Text, nullable=True) + caused_by_ref = Column(String, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + resolved_at = Column(DateTime, nullable=True) + + session = relationship("DatasetReviewSession", back_populates="findings") +# [/DEF:ValidationFinding:Class] + +# [DEF:SemanticSourceType:Class] +class SemanticSourceType(str, enum.Enum): + UPLOADED_FILE = "uploaded_file" + CONNECTED_DICTIONARY = "connected_dictionary" + REFERENCE_DATASET = "reference_dataset" + NEIGHBOR_DATASET = "neighbor_dataset" + AI_GENERATED = "ai_generated" +# [/DEF:SemanticSourceType:Class] + +# [DEF:TrustLevel:Class] +class TrustLevel(str, enum.Enum): + TRUSTED = "trusted" + RECOMMENDED = "recommended" + CANDIDATE = "candidate" + GENERATED = "generated" +# [/DEF:TrustLevel:Class] + +# [DEF:SemanticSourceStatus:Class] +class SemanticSourceStatus(str, enum.Enum): + AVAILABLE = "available" + SELECTED = "selected" + APPLIED = "applied" + REJECTED = "rejected" + PARTIAL = "partial" + FAILED = "failed" +# [/DEF:SemanticSourceStatus:Class] + +# [DEF:SemanticSource:Class] +class SemanticSource(Base): + __tablename__ = "semantic_sources" + + source_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + source_type = Column(SQLEnum(SemanticSourceType), nullable=False) + source_ref = Column(String, nullable=False) + source_version = Column(String, nullable=False) + display_name = Column(String, nullable=False) + trust_level = Column(SQLEnum(TrustLevel), nullable=False) + schema_overlap_score = Column(Float, nullable=True) + status = Column(SQLEnum(SemanticSourceStatus), nullable=False, default=SemanticSourceStatus.AVAILABLE) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="semantic_sources") +# [/DEF:SemanticSource:Class] + +# [DEF:FieldKind:Class] +class FieldKind(str, enum.Enum): + COLUMN = "column" + METRIC = "metric" + FILTER_DIMENSION = "filter_dimension" + PARAMETER = "parameter" +# [/DEF:FieldKind:Class] + +# [DEF:FieldProvenance:Class] +class FieldProvenance(str, enum.Enum): + DICTIONARY_EXACT = "dictionary_exact" + REFERENCE_IMPORTED = "reference_imported" + FUZZY_INFERRED = "fuzzy_inferred" + AI_GENERATED = "ai_generated" + MANUAL_OVERRIDE = "manual_override" + UNRESOLVED = "unresolved" +# [/DEF:FieldProvenance:Class] + +# [DEF:SemanticFieldEntry:Class] +class SemanticFieldEntry(Base): + __tablename__ = "semantic_field_entries" + + field_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + field_name = Column(String, nullable=False) + field_kind = Column(SQLEnum(FieldKind), nullable=False) + verbose_name = Column(String, nullable=True) + description = Column(Text, nullable=True) + display_format = Column(String, nullable=True) + provenance = Column(SQLEnum(FieldProvenance), nullable=False, default=FieldProvenance.UNRESOLVED) + source_id = Column(String, nullable=True) + confidence_rank = Column(Integer, nullable=True) + is_locked = Column(Boolean, nullable=False, default=False) + has_conflict = Column(Boolean, nullable=False, default=False) + needs_review = Column(Boolean, nullable=False, default=True) + last_changed_by = Column(String, nullable=False) # system, user, agent + user_feedback = Column(String, nullable=True) # up, down, null + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="semantic_fields") + candidates = relationship("SemanticCandidate", back_populates="field", cascade="all, delete-orphan") +# [/DEF:SemanticFieldEntry:Class] + +# [DEF:CandidateMatchType:Class] +class CandidateMatchType(str, enum.Enum): + EXACT = "exact" + REFERENCE = "reference" + FUZZY = "fuzzy" + GENERATED = "generated" +# [/DEF:CandidateMatchType:Class] + +# [DEF:CandidateStatus:Class] +class CandidateStatus(str, enum.Enum): + PROPOSED = "proposed" + ACCEPTED = "accepted" + REJECTED = "rejected" + SUPERSEDED = "superseded" +# [/DEF:CandidateStatus:Class] + +# [DEF:SemanticCandidate:Class] +class SemanticCandidate(Base): + __tablename__ = "semantic_candidates" + + candidate_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + field_id = Column(String, ForeignKey("semantic_field_entries.field_id"), nullable=False) + source_id = Column(String, nullable=True) + candidate_rank = Column(Integer, nullable=False) + match_type = Column(SQLEnum(CandidateMatchType), nullable=False) + confidence_score = Column(Float, nullable=False) + proposed_verbose_name = Column(String, nullable=True) + proposed_description = Column(Text, nullable=True) + proposed_display_format = Column(String, nullable=True) + status = Column(SQLEnum(CandidateStatus), nullable=False, default=CandidateStatus.PROPOSED) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + field = relationship("SemanticFieldEntry", back_populates="candidates") +# [/DEF:SemanticCandidate:Class] + +# [DEF:FilterSource:Class] +class FilterSource(str, enum.Enum): + SUPERSET_NATIVE = "superset_native" + SUPERSET_URL = "superset_url" + MANUAL = "manual" + INFERRED = "inferred" +# [/DEF:FilterSource:Class] + +# [DEF:FilterConfidenceState:Class] +class FilterConfidenceState(str, enum.Enum): + CONFIRMED = "confirmed" + IMPORTED = "imported" + INFERRED = "inferred" + AI_DRAFT = "ai_draft" + UNRESOLVED = "unresolved" +# [/DEF:FilterConfidenceState:Class] + +# [DEF:FilterRecoveryStatus:Class] +class FilterRecoveryStatus(str, enum.Enum): + RECOVERED = "recovered" + PARTIAL = "partial" + MISSING = "missing" + CONFLICTED = "conflicted" +# [/DEF:FilterRecoveryStatus:Class] + +# [DEF:ImportedFilter:Class] +class ImportedFilter(Base): + __tablename__ = "imported_filters" + + filter_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + filter_name = Column(String, nullable=False) + display_name = Column(String, nullable=True) + raw_value = Column(JSON, nullable=False) + normalized_value = Column(JSON, nullable=True) + source = Column(SQLEnum(FilterSource), nullable=False) + confidence_state = Column(SQLEnum(FilterConfidenceState), nullable=False) + requires_confirmation = Column(Boolean, nullable=False, default=False) + recovery_status = Column(SQLEnum(FilterRecoveryStatus), nullable=False) + notes = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="imported_filters") +# [/DEF:ImportedFilter:Class] + +# [DEF:VariableKind:Class] +class VariableKind(str, enum.Enum): + NATIVE_FILTER = "native_filter" + PARAMETER = "parameter" + DERIVED = "derived" + UNKNOWN = "unknown" +# [/DEF:VariableKind:Class] + +# [DEF:MappingStatus:Class] +class MappingStatus(str, enum.Enum): + UNMAPPED = "unmapped" + PROPOSED = "proposed" + APPROVED = "approved" + OVERRIDDEN = "overridden" + INVALID = "invalid" +# [/DEF:MappingStatus:Class] + +# [DEF:TemplateVariable:Class] +class TemplateVariable(Base): + __tablename__ = "template_variables" + + variable_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + variable_name = Column(String, nullable=False) + expression_source = Column(Text, nullable=False) + variable_kind = Column(SQLEnum(VariableKind), nullable=False) + is_required = Column(Boolean, nullable=False, default=True) + default_value = Column(JSON, nullable=True) + mapping_status = Column(SQLEnum(MappingStatus), nullable=False, default=MappingStatus.UNMAPPED) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="template_variables") +# [/DEF:TemplateVariable:Class] + +# [DEF:MappingMethod:Class] +class MappingMethod(str, enum.Enum): + DIRECT_MATCH = "direct_match" + HEURISTIC_MATCH = "heuristic_match" + SEMANTIC_MATCH = "semantic_match" + MANUAL_OVERRIDE = "manual_override" +# [/DEF:MappingMethod:Class] + +# [DEF:MappingWarningLevel:Class] +class MappingWarningLevel(str, enum.Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" +# [/DEF:MappingWarningLevel:Class] + +# [DEF:ApprovalState:Class] +class ApprovalState(str, enum.Enum): + PENDING = "pending" + APPROVED = "approved" + REJECTED = "rejected" + NOT_REQUIRED = "not_required" +# [/DEF:ApprovalState:Class] + +# [DEF:ExecutionMapping:Class] +class ExecutionMapping(Base): + __tablename__ = "execution_mappings" + + mapping_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + filter_id = Column(String, nullable=False) + variable_id = Column(String, nullable=False) + mapping_method = Column(SQLEnum(MappingMethod), nullable=False) + raw_input_value = Column(JSON, nullable=False) + effective_value = Column(JSON, nullable=True) + transformation_note = Column(Text, nullable=True) + warning_level = Column(SQLEnum(MappingWarningLevel), nullable=True) + requires_explicit_approval = Column(Boolean, nullable=False, default=False) + approval_state = Column(SQLEnum(ApprovalState), nullable=False, default=ApprovalState.NOT_REQUIRED) + approved_by_user_id = Column(String, nullable=True) + approved_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="execution_mappings") +# [/DEF:ExecutionMapping:Class] + +# [DEF:ClarificationStatus:Class] +class ClarificationStatus(str, enum.Enum): + PENDING = "pending" + ACTIVE = "active" + PAUSED = "paused" + COMPLETED = "completed" + CANCELLED = "cancelled" +# [/DEF:ClarificationStatus:Class] + +# [DEF:ClarificationSession:Class] +class ClarificationSession(Base): + __tablename__ = "clarification_sessions" + + clarification_session_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + status = Column(SQLEnum(ClarificationStatus), nullable=False, default=ClarificationStatus.PENDING) + current_question_id = Column(String, nullable=True) + resolved_count = Column(Integer, nullable=False, default=0) + remaining_count = Column(Integer, nullable=False, default=0) + summary_delta = Column(Text, nullable=True) + started_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + completed_at = Column(DateTime, nullable=True) + + session = relationship("DatasetReviewSession", back_populates="clarification_sessions") + questions = relationship("ClarificationQuestion", back_populates="clarification_session", cascade="all, delete-orphan") +# [/DEF:ClarificationSession:Class] + +# [DEF:QuestionState:Class] +class QuestionState(str, enum.Enum): + OPEN = "open" + ANSWERED = "answered" + SKIPPED = "skipped" + EXPERT_REVIEW = "expert_review" + SUPERSEDED = "superseded" +# [/DEF:QuestionState:Class] + +# [DEF:ClarificationQuestion:Class] +class ClarificationQuestion(Base): + __tablename__ = "clarification_questions" + + question_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + clarification_session_id = Column(String, ForeignKey("clarification_sessions.clarification_session_id"), nullable=False) + topic_ref = Column(String, nullable=False) + question_text = Column(Text, nullable=False) + why_it_matters = Column(Text, nullable=False) + current_guess = Column(Text, nullable=True) + priority = Column(Integer, nullable=False, default=0) + state = Column(SQLEnum(QuestionState), nullable=False, default=QuestionState.OPEN) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + clarification_session = relationship("ClarificationSession", back_populates="questions") + options = relationship("ClarificationOption", back_populates="question", cascade="all, delete-orphan") + answer = relationship("ClarificationAnswer", back_populates="question", uselist=False, cascade="all, delete-orphan") +# [/DEF:ClarificationQuestion:Class] + +# [DEF:ClarificationOption:Class] +class ClarificationOption(Base): + __tablename__ = "clarification_options" + + option_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + question_id = Column(String, ForeignKey("clarification_questions.question_id"), nullable=False) + label = Column(String, nullable=False) + value = Column(String, nullable=False) + is_recommended = Column(Boolean, nullable=False, default=False) + display_order = Column(Integer, nullable=False, default=0) + + question = relationship("ClarificationQuestion", back_populates="options") +# [/DEF:ClarificationOption:Class] + +# [DEF:AnswerKind:Class] +class AnswerKind(str, enum.Enum): + SELECTED = "selected" + CUSTOM = "custom" + SKIPPED = "skipped" + EXPERT_REVIEW = "expert_review" +# [/DEF:AnswerKind:Class] + +# [DEF:ClarificationAnswer:Class] +class ClarificationAnswer(Base): + __tablename__ = "clarification_answers" + + answer_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + question_id = Column(String, ForeignKey("clarification_questions.question_id"), nullable=False, unique=True) + answer_kind = Column(SQLEnum(AnswerKind), nullable=False) + answer_value = Column(Text, nullable=True) + answered_by_user_id = Column(String, nullable=False) + impact_summary = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + question = relationship("ClarificationQuestion", back_populates="answer") +# [/DEF:ClarificationAnswer:Class] + +# [DEF:PreviewStatus:Class] +class PreviewStatus(str, enum.Enum): + PENDING = "pending" + READY = "ready" + FAILED = "failed" + STALE = "stale" +# [/DEF:PreviewStatus:Class] + +# [DEF:CompiledPreview:Class] +class CompiledPreview(Base): + __tablename__ = "compiled_previews" + + preview_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + preview_status = Column(SQLEnum(PreviewStatus), nullable=False, default=PreviewStatus.PENDING) + compiled_sql = Column(Text, nullable=True) + preview_fingerprint = Column(String, nullable=False) + compiled_by = Column(String, nullable=False, default="superset") + error_code = Column(String, nullable=True) + error_details = Column(Text, nullable=True) + compiled_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="previews") +# [/DEF:CompiledPreview:Class] + +# [DEF:LaunchStatus:Class] +class LaunchStatus(str, enum.Enum): + STARTED = "started" + SUCCESS = "success" + FAILED = "failed" +# [/DEF:LaunchStatus:Class] + +# [DEF:DatasetRunContext:Class] +class DatasetRunContext(Base): + __tablename__ = "dataset_run_contexts" + + run_context_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + dataset_ref = Column(String, nullable=False) + environment_id = Column(String, nullable=False) + preview_id = Column(String, nullable=False) + sql_lab_session_ref = Column(String, nullable=False) + effective_filters = Column(JSON, nullable=False) + template_params = Column(JSON, nullable=False) + approved_mapping_ids = Column(JSON, nullable=False) + semantic_decision_refs = Column(JSON, nullable=False) + open_warning_refs = Column(JSON, nullable=False) + launch_status = Column(SQLEnum(LaunchStatus), nullable=False) + launch_error = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="run_contexts") +# [/DEF:DatasetRunContext:Class] + +# [DEF:ArtifactType:Class] +class ArtifactType(str, enum.Enum): + DOCUMENTATION = "documentation" + VALIDATION_REPORT = "validation_report" + RUN_SUMMARY = "run_summary" +# [/DEF:ArtifactType:Class] + +# [DEF:ArtifactFormat:Class] +class ArtifactFormat(str, enum.Enum): + JSON = "json" + MARKDOWN = "markdown" + CSV = "csv" + PDF = "pdf" +# [/DEF:ArtifactFormat:Class] + +# [DEF:ExportArtifact:Class] +class ExportArtifact(Base): + __tablename__ = "export_artifacts" + + artifact_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False) + artifact_type = Column(SQLEnum(ArtifactType), nullable=False) + format = Column(SQLEnum(ArtifactFormat), nullable=False) + storage_ref = Column(String, nullable=False) + created_by_user_id = Column(String, nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + session = relationship("DatasetReviewSession", back_populates="export_artifacts") +# [/DEF:ExportArtifact:Class] + +# [/DEF:DatasetReviewModels:Module] \ No newline at end of file diff --git a/backend/src/schemas/dataset_review.py b/backend/src/schemas/dataset_review.py new file mode 100644 index 00000000..4bb39f19 --- /dev/null +++ b/backend/src/schemas/dataset_review.py @@ -0,0 +1,362 @@ +# [DEF:DatasetReviewSchemas:Module] +# +# @COMPLEXITY: 3 +# @SEMANTICS: dataset_review, schemas, pydantic, session, profile, findings +# @PURPOSE: Defines API schemas for the dataset review orchestration flow. +# @LAYER: API +# @RELATION: DEPENDS_ON -> pydantic + +# [SECTION: IMPORTS] +from datetime import datetime +from typing import List, Optional, Any +from pydantic import BaseModel, Field +from src.models.dataset_review import ( + SessionStatus, + SessionPhase, + ReadinessState, + RecommendedAction, + SessionCollaboratorRole, + BusinessSummarySource, + ConfidenceState, + FindingArea, + FindingSeverity, + ResolutionState, + SemanticSourceType, + TrustLevel, + SemanticSourceStatus, + FieldKind, + FieldProvenance, + CandidateMatchType, + CandidateStatus, + FilterSource, + FilterConfidenceState, + FilterRecoveryStatus, + VariableKind, + MappingStatus, + MappingMethod, + MappingWarningLevel, + ApprovalState, + ClarificationStatus, + QuestionState, + AnswerKind, + PreviewStatus, + LaunchStatus, + ArtifactType, + ArtifactFormat +) +# [/SECTION] + +# [DEF:SessionCollaboratorDto:Class] +class SessionCollaboratorDto(BaseModel): + user_id: str + role: SessionCollaboratorRole + added_at: datetime + + class Config: + from_attributes = True +# [/DEF:SessionCollaboratorDto:Class] + +# [DEF:DatasetProfileDto:Class] +class DatasetProfileDto(BaseModel): + profile_id: str + session_id: str + dataset_name: str + schema_name: Optional[str] = None + database_name: Optional[str] = None + business_summary: str + business_summary_source: BusinessSummarySource + description: Optional[str] = None + dataset_type: Optional[str] = None + is_sqllab_view: bool + completeness_score: Optional[float] = None + confidence_state: ConfidenceState + has_blocking_findings: bool + has_warning_findings: bool + manual_summary_locked: bool + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True +# [/DEF:DatasetProfileDto:Class] + +# [DEF:ValidationFindingDto:Class] +class ValidationFindingDto(BaseModel): + finding_id: str + session_id: str + area: FindingArea + severity: FindingSeverity + code: str + title: str + message: str + resolution_state: ResolutionState + resolution_note: Optional[str] = None + caused_by_ref: Optional[str] = None + created_at: datetime + resolved_at: Optional[datetime] = None + + class Config: + from_attributes = True +# [/DEF:ValidationFindingDto:Class] + +# [DEF:SemanticSourceDto:Class] +class SemanticSourceDto(BaseModel): + source_id: str + session_id: str + source_type: SemanticSourceType + source_ref: str + source_version: str + display_name: str + trust_level: TrustLevel + schema_overlap_score: Optional[float] = None + status: SemanticSourceStatus + created_at: datetime + + class Config: + from_attributes = True +# [/DEF:SemanticSourceDto:Class] + +# [DEF:SemanticCandidateDto:Class] +class SemanticCandidateDto(BaseModel): + candidate_id: str + field_id: str + source_id: Optional[str] = None + candidate_rank: int + match_type: CandidateMatchType + confidence_score: float + proposed_verbose_name: Optional[str] = None + proposed_description: Optional[str] = None + proposed_display_format: Optional[str] = None + status: CandidateStatus + created_at: datetime + + class Config: + from_attributes = True +# [/DEF:SemanticCandidateDto:Class] + +# [DEF:SemanticFieldEntryDto:Class] +class SemanticFieldEntryDto(BaseModel): + field_id: str + session_id: str + field_name: str + field_kind: FieldKind + verbose_name: Optional[str] = None + description: Optional[str] = None + display_format: Optional[str] = None + provenance: FieldProvenance + source_id: Optional[str] = None + confidence_rank: Optional[int] = None + is_locked: bool + has_conflict: bool + needs_review: bool + last_changed_by: str + user_feedback: Optional[str] = None + created_at: datetime + updated_at: datetime + candidates: List[SemanticCandidateDto] = [] + + class Config: + from_attributes = True +# [/DEF:SemanticFieldEntryDto:Class] + +# [DEF:ImportedFilterDto:Class] +class ImportedFilterDto(BaseModel): + filter_id: str + session_id: str + filter_name: str + display_name: Optional[str] = None + raw_value: Any + normalized_value: Optional[Any] = None + source: FilterSource + confidence_state: FilterConfidenceState + requires_confirmation: bool + recovery_status: FilterRecoveryStatus + notes: Optional[str] = None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True +# [/DEF:ImportedFilterDto:Class] + +# [DEF:TemplateVariableDto:Class] +class TemplateVariableDto(BaseModel): + variable_id: str + session_id: str + variable_name: str + expression_source: str + variable_kind: VariableKind + is_required: bool + default_value: Optional[Any] = None + mapping_status: MappingStatus + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True +# [/DEF:TemplateVariableDto:Class] + +# [DEF:ExecutionMappingDto:Class] +class ExecutionMappingDto(BaseModel): + mapping_id: str + session_id: str + filter_id: str + variable_id: str + mapping_method: MappingMethod + raw_input_value: Any + effective_value: Optional[Any] = None + transformation_note: Optional[str] = None + warning_level: Optional[MappingWarningLevel] = None + requires_explicit_approval: bool + approval_state: ApprovalState + approved_by_user_id: Optional[str] = None + approved_at: Optional[datetime] = None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True +# [/DEF:ExecutionMappingDto:Class] + +# [DEF:ClarificationOptionDto:Class] +class ClarificationOptionDto(BaseModel): + option_id: str + question_id: str + label: str + value: str + is_recommended: bool + display_order: int + + class Config: + from_attributes = True +# [/DEF:ClarificationOptionDto:Class] + +# [DEF:ClarificationAnswerDto:Class] +class ClarificationAnswerDto(BaseModel): + answer_id: str + question_id: str + answer_kind: AnswerKind + answer_value: Optional[str] = None + answered_by_user_id: str + impact_summary: Optional[str] = None + created_at: datetime + + class Config: + from_attributes = True +# [/DEF:ClarificationAnswerDto:Class] + +# [DEF:ClarificationQuestionDto:Class] +class ClarificationQuestionDto(BaseModel): + question_id: str + clarification_session_id: str + topic_ref: str + question_text: str + why_it_matters: str + current_guess: Optional[str] = None + priority: int + state: QuestionState + created_at: datetime + updated_at: datetime + options: List[ClarificationOptionDto] = [] + answer: Optional[ClarificationAnswerDto] = None + + class Config: + from_attributes = True +# [/DEF:ClarificationQuestionDto:Class] + +# [DEF:ClarificationSessionDto:Class] +class ClarificationSessionDto(BaseModel): + clarification_session_id: str + session_id: str + status: ClarificationStatus + current_question_id: Optional[str] = None + resolved_count: int + remaining_count: int + summary_delta: Optional[str] = None + started_at: datetime + updated_at: datetime + completed_at: Optional[datetime] = None + questions: List[ClarificationQuestionDto] = [] + + class Config: + from_attributes = True +# [/DEF:ClarificationSessionDto:Class] + +# [DEF:CompiledPreviewDto:Class] +class CompiledPreviewDto(BaseModel): + preview_id: str + session_id: str + preview_status: PreviewStatus + compiled_sql: Optional[str] = None + preview_fingerprint: str + compiled_by: str + error_code: Optional[str] = None + error_details: Optional[str] = None + compiled_at: Optional[datetime] = None + created_at: datetime + + class Config: + from_attributes = True +# [/DEF:CompiledPreviewDto:Class] + +# [DEF:DatasetRunContextDto:Class] +class DatasetRunContextDto(BaseModel): + run_context_id: str + session_id: str + dataset_ref: str + environment_id: str + preview_id: str + sql_lab_session_ref: str + effective_filters: Any + template_params: Any + approved_mapping_ids: List[str] + semantic_decision_refs: List[str] + open_warning_refs: List[str] + launch_status: LaunchStatus + launch_error: Optional[str] = None + created_at: datetime + + class Config: + from_attributes = True +# [/DEF:DatasetRunContextDto:Class] + +# [DEF:SessionSummary:Class] +class SessionSummary(BaseModel): + session_id: str + user_id: str + environment_id: str + source_kind: str + source_input: str + dataset_ref: str + dataset_id: Optional[int] = None + readiness_state: ReadinessState + recommended_action: RecommendedAction + status: SessionStatus + current_phase: SessionPhase + created_at: datetime + updated_at: datetime + last_activity_at: datetime + + class Config: + from_attributes = True +# [/DEF:SessionSummary:Class] + +# [DEF:SessionDetail:Class] +class SessionDetail(SessionSummary): + collaborators: List[SessionCollaboratorDto] = [] + profile: Optional[DatasetProfileDto] = None + findings: List[ValidationFindingDto] = [] + semantic_sources: List[SemanticSourceDto] = [] + semantic_fields: List[SemanticFieldEntryDto] = [] + imported_filters: List[ImportedFilterDto] = [] + template_variables: List[TemplateVariableDto] = [] + execution_mappings: List[ExecutionMappingDto] = [] + clarification_sessions: List[ClarificationSessionDto] = [] + previews: List[CompiledPreviewDto] = [] + run_contexts: List[DatasetRunContextDto] = [] + + class Config: + from_attributes = True +# [/DEF:SessionDetail:Class] + +# [/DEF:DatasetReviewSchemas:Module] \ No newline at end of file diff --git a/backend/src/scripts/seed_permissions.py b/backend/src/scripts/seed_permissions.py index 63a5b469..67651f27 100644 --- a/backend/src/scripts/seed_permissions.py +++ b/backend/src/scripts/seed_permissions.py @@ -46,6 +46,14 @@ INITIAL_PERMISSIONS = [ {"resource": "plugin:storage", "action": "WRITE"}, {"resource": "plugin:debug", "action": "EXECUTE"}, {"resource": "git_config", "action": "READ"}, + + # Dataset Review Permissions + {"resource": "dataset:session", "action": "READ"}, + {"resource": "dataset:session", "action": "MANAGE"}, + {"resource": "dataset:session", "action": "APPROVE"}, + {"resource": "dataset:execution", "action": "PREVIEW"}, + {"resource": "dataset:execution", "action": "LAUNCH"}, + {"resource": "dataset:execution", "action": "LAUNCH_PROD"}, ] # [/DEF:INITIAL_PERMISSIONS:Constant] @@ -95,6 +103,10 @@ def seed_permissions(): ("tasks", "READ"), ("tasks", "WRITE"), ("git_config", "READ"), + ("dataset:session", "READ"), + ("dataset:session", "MANAGE"), + ("dataset:execution", "PREVIEW"), + ("dataset:execution", "LAUNCH"), ] for res, act in user_permissions: diff --git a/backend/src/services/dataset_review/__init__.py b/backend/src/services/dataset_review/__init__.py new file mode 100644 index 00000000..058c6f38 --- /dev/null +++ b/backend/src/services/dataset_review/__init__.py @@ -0,0 +1,7 @@ +# [DEF:backend.src.services.dataset_review:Module] +# +# @SEMANTICS: dataset, review, orchestration +# @PURPOSE: Provides services for dataset-centered orchestration flow. +# @LAYER: Services +# +# [/DEF:backend.src.services.dataset_review:Module] \ No newline at end of file diff --git a/backend/src/services/dataset_review/repositories/__tests__/test_session_repository.py b/backend/src/services/dataset_review/repositories/__tests__/test_session_repository.py new file mode 100644 index 00000000..9ac49b13 --- /dev/null +++ b/backend/src/services/dataset_review/repositories/__tests__/test_session_repository.py @@ -0,0 +1,171 @@ +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from src.models.mapping import Base, Environment +from src.models.auth import User +from src.models.dataset_review import ( + DatasetReviewSession, + DatasetProfile, + ValidationFinding, + CompiledPreview, + DatasetRunContext, + BusinessSummarySource, + ConfidenceState, + FindingArea, + FindingSeverity, + ReadinessState, + RecommendedAction +) +from src.services.dataset_review.repositories.session_repository import DatasetReviewSessionRepository + +# [DEF:SessionRepositoryTests:Module] +# @COMPLEXITY: 3 +# @PURPOSE: Unit tests for DatasetReviewSessionRepository. +# @RELATION: TESTS -> [DatasetReviewSessionRepository] + +@pytest.fixture +def db_session(): + # [DEF:db_session:Function] + # @COMPLEXITY: 1 + # @RELATION: BINDS_TO -> [SessionRepositoryTests] + engine = create_engine("sqlite:///:memory:") + Base.metadata.create_all(engine) + Session = sessionmaker(bind=engine) + session = Session() + + # Create test data + user = User(id="user1", username="testuser", email="test@example.com", password_hash="pw") + env = Environment(id="env1", name="Prod", url="http://superset", credentials_id="cred1") + session.add_all([user, env]) + session.commit() + + yield session + session.close() + +def test_create_session(db_session): + # @PURPOSE: Verify session creation and persistence. + repo = DatasetReviewSessionRepository(db_session) + session = DatasetReviewSession( + user_id="user1", + environment_id="env1", + source_kind="superset_link", + source_input="http://link", + dataset_ref="dataset1" + ) + repo.create_session(session) + + assert session.session_id is not None + loaded = db_session.query(DatasetReviewSession).filter_by(session_id=session.session_id).first() + assert loaded.user_id == "user1" + +def test_load_session_detail_ownership(db_session): + # @PURPOSE: Verify ownership enforcement in detail loading. + repo = DatasetReviewSessionRepository(db_session) + session = DatasetReviewSession( + user_id="user1", environment_id="env1", source_kind="superset_link", + source_input="http://link", dataset_ref="dataset1" + ) + repo.create_session(session) + + # Correct user + loaded = repo.load_session_detail(session.session_id, "user1") + assert loaded is not None + + # Wrong user + loaded_wrong = repo.load_session_detail(session.session_id, "wrong_user") + assert loaded_wrong is None + +def test_save_preview_marks_stale(db_session): + # @PURPOSE: Verify that saving a new preview marks old ones as stale. + repo = DatasetReviewSessionRepository(db_session) + session = DatasetReviewSession( + user_id="user1", environment_id="env1", source_kind="superset_link", + source_input="http://link", dataset_ref="dataset1" + ) + repo.create_session(session) + + p1 = CompiledPreview(session_id=session.session_id, preview_status="ready", preview_fingerprint="f1") + repo.save_preview(session.session_id, "user1", p1) + + p2 = CompiledPreview(session_id=session.session_id, preview_status="ready", preview_fingerprint="f2") + repo.save_preview(session.session_id, "user1", p2) + + db_session.refresh(p1) + assert p1.preview_status == "stale" + assert p2.preview_status == "ready" + assert session.last_preview_id == p2.preview_id + +def test_save_profile_and_findings(db_session): + # @PURPOSE: Verify persistence of profile and findings. + repo = DatasetReviewSessionRepository(db_session) + session = DatasetReviewSession( + user_id="user1", environment_id="env1", source_kind="superset_link", + source_input="http://link", dataset_ref="dataset1" + ) + repo.create_session(session) + + profile = DatasetProfile( + session_id=session.session_id, + dataset_name="Test DS", + business_summary="Summary", + business_summary_source=BusinessSummarySource.INFERRED, + confidence_state=ConfidenceState.UNRESOLVED + ) + + finding = ValidationFinding( + session_id=session.session_id, + area=FindingArea.SOURCE_INTAKE, + severity=FindingSeverity.BLOCKING, + code="ERR1", + title="Error", + message="Failure" + ) + + repo.save_profile_and_findings(session.session_id, "user1", profile, [finding]) + + updated_session = repo.load_session_detail(session.session_id, "user1") + assert updated_session.profile.dataset_name == "Test DS" + assert len(updated_session.findings) == 1 + assert updated_session.findings[0].code == "ERR1" + +def test_save_run_context(db_session): + # @PURPOSE: Verify saving of run context. + repo = DatasetReviewSessionRepository(db_session) + session = DatasetReviewSession( + user_id="user1", environment_id="env1", source_kind="superset_link", + source_input="http://link", dataset_ref="dataset1" + ) + repo.create_session(session) + + rc = DatasetRunContext( + session_id=session.session_id, + dataset_ref="ds1", + environment_id="env1", + preview_id="p1", + sql_lab_session_ref="s1", + effective_filters={}, + template_params={}, + approved_mapping_ids=[], + semantic_decision_refs=[], + open_warning_refs=[], + launch_status="success" + ) + repo.save_run_context(session.session_id, "user1", rc) + + assert session.last_run_context_id == rc.run_context_id + +def test_list_sessions_for_user(db_session): + # @PURPOSE: Verify listing of sessions by user. + repo = DatasetReviewSessionRepository(db_session) + s1 = DatasetReviewSession(user_id="user1", environment_id="env1", source_kind="k", source_input="i", dataset_ref="r1") + s2 = DatasetReviewSession(user_id="user1", environment_id="env1", source_kind="k", source_input="i", dataset_ref="r2") + s3 = DatasetReviewSession(user_id="other", environment_id="env1", source_kind="k", source_input="i", dataset_ref="r3") + + db_session.add_all([s1, s2, s3]) + db_session.commit() + + sessions = repo.list_sessions_for_user("user1") + assert len(sessions) == 2 + assert all(s.user_id == "user1" for s in sessions) + +# [/DEF:SessionRepositoryTests:Module] \ No newline at end of file diff --git a/backend/src/services/dataset_review/repositories/session_repository.py b/backend/src/services/dataset_review/repositories/session_repository.py new file mode 100644 index 00000000..35116c56 --- /dev/null +++ b/backend/src/services/dataset_review/repositories/session_repository.py @@ -0,0 +1,146 @@ +# [DEF:DatasetReviewSessionRepository:Module] +# @COMPLEXITY: 5 +# @PURPOSE: Persist and retrieve dataset review session aggregates, including readiness, findings, semantic decisions, clarification state, previews, and run contexts. +# @LAYER: Domain +# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] +# @RELATION: [DEPENDS_ON] -> [DatasetProfile] +# @RELATION: [DEPENDS_ON] -> [ValidationFinding] +# @RELATION: [DEPENDS_ON] -> [CompiledPreview] +# @PRE: repository operations execute within authenticated request or task scope. +# @POST: session aggregate reads are structurally consistent and writes preserve ownership and version semantics. + +from typing import Optional, List +from sqlalchemy.orm import Session, joinedload +from src.models.dataset_review import ( + DatasetReviewSession, + DatasetProfile, + ValidationFinding, + CompiledPreview, + DatasetRunContext +) +from src.core.logger import belief_scope + +class DatasetReviewSessionRepository: + """ + @PURPOSE: Persist and retrieve dataset review session aggregates. + @INVARIANT: ownership_scope -> All operations must respect the session owner's user_id. + """ + def __init__(self, db: Session): + self.db = db + + def create_session(self, session: DatasetReviewSession) -> DatasetReviewSession: + """ + @PURPOSE: Persist initial session shell. + """ + with belief_scope("DatasetReviewSessionRepository.create_session"): + self.db.add(session) + self.db.commit() + self.db.refresh(session) + return session + + def load_session_detail(self, session_id: str, user_id: str) -> Optional[DatasetReviewSession]: + """ + @PURPOSE: Return the full session aggregate for API/frontend use. + @PRE: user_id must match session owner or authorized collaborator. + """ + with belief_scope("DatasetReviewSessionRepository.load_session_detail"): + # Note: We check user_id to enforce the ownership_scope invariant. + return self.db.query(DatasetReviewSession)\ + .options( + joinedload(DatasetReviewSession.profile), + joinedload(DatasetReviewSession.findings), + joinedload(DatasetReviewSession.collaborators), + joinedload(DatasetReviewSession.semantic_sources), + joinedload(DatasetReviewSession.semantic_fields), + joinedload(DatasetReviewSession.imported_filters), + joinedload(DatasetReviewSession.template_variables), + joinedload(DatasetReviewSession.execution_mappings), + joinedload(DatasetReviewSession.clarification_sessions), + joinedload(DatasetReviewSession.previews), + joinedload(DatasetReviewSession.run_contexts) + )\ + .filter(DatasetReviewSession.session_id == session_id)\ + .filter(DatasetReviewSession.user_id == user_id)\ + .first() + + def save_profile_and_findings(self, session_id: str, user_id: str, profile: DatasetProfile, findings: List[ValidationFinding]) -> DatasetReviewSession: + """ + @PURPOSE: Persist profile and validation state together. + """ + with belief_scope("DatasetReviewSessionRepository.save_profile_and_findings"): + session = self.db.query(DatasetReviewSession).filter( + DatasetReviewSession.session_id == session_id, + DatasetReviewSession.user_id == user_id + ).first() + + if not session: + raise ValueError("Session not found or access denied") + + if profile: + self.db.merge(profile) + + # For findings, we might want to sync them (remove old ones if not in new list, or update) + # Simplest for now: add/merge findings + for finding in findings: + self.db.merge(finding) + + self.db.commit() + return self.load_session_detail(session_id, user_id) + + def save_preview(self, session_id: str, user_id: str, preview: CompiledPreview) -> CompiledPreview: + """ + @PURPOSE: Persist compiled preview attempt and mark older fingerprints stale. + """ + with belief_scope("DatasetReviewSessionRepository.save_preview"): + session = self.db.query(DatasetReviewSession).filter( + DatasetReviewSession.session_id == session_id, + DatasetReviewSession.user_id == user_id + ).first() + + if not session: + raise ValueError("Session not found or access denied") + + # Mark existing previews for this session as stale if they are not the new one + self.db.query(CompiledPreview).filter( + CompiledPreview.session_id == session_id + ).update({"preview_status": "stale"}) + + self.db.add(preview) + self.db.flush() + session.last_preview_id = preview.preview_id + + self.db.commit() + self.db.refresh(preview) + return preview + + def save_run_context(self, session_id: str, user_id: str, run_context: DatasetRunContext) -> DatasetRunContext: + """ + @PURPOSE: Persist immutable launch audit snapshot. + """ + with belief_scope("DatasetReviewSessionRepository.save_run_context"): + session = self.db.query(DatasetReviewSession).filter( + DatasetReviewSession.session_id == session_id, + DatasetReviewSession.user_id == user_id + ).first() + + if not session: + raise ValueError("Session not found or access denied") + + self.db.add(run_context) + self.db.flush() + session.last_run_context_id = run_context.run_context_id + + self.db.commit() + self.db.refresh(run_context) + return run_context + + def list_sessions_for_user(self, user_id: str) -> List[DatasetReviewSession]: + """ + @PURPOSE: List all review sessions owned by a user. + """ + with belief_scope("DatasetReviewSessionRepository.list_sessions_for_user"): + return self.db.query(DatasetReviewSession).filter( + DatasetReviewSession.user_id == user_id + ).order_by(DatasetReviewSession.updated_at.desc()).all() + +# [/DEF:DatasetReviewSessionRepository:Module] \ No newline at end of file diff --git a/frontend/src/lib/stores/__tests__/test_datasetReviewSession.js b/frontend/src/lib/stores/__tests__/test_datasetReviewSession.js new file mode 100644 index 00000000..5b818c5e --- /dev/null +++ b/frontend/src/lib/stores/__tests__/test_datasetReviewSession.js @@ -0,0 +1,112 @@ +// [DEF:frontend.src.lib.stores.__tests__.test_datasetReviewSession:Module] +// @COMPLEXITY: 3 +// @SEMANTICS: dataset-review, store, session, tests +// @PURPOSE: Unit tests for dataset review session store +// @LAYER: UI +// @RELATION: VERIFIES -> [datasetReviewSession:Store] + +import { describe, it, expect, beforeEach, vi } from 'vitest'; + +describe('datasetReviewSession store', () => { + beforeEach(async () => { + vi.resetModules(); + }); + + it('should have correct initial state', async () => { + const { datasetReviewSessionStore } = await import('../datasetReviewSession.js'); + + let state = null; + const unsubscribe = datasetReviewSessionStore.subscribe(s => { state = s; }); + unsubscribe(); + + expect(state.session).toBeNull(); + expect(state.isLoading).toBe(false); + expect(state.isSaving).toBe(false); + expect(state.error).toBeNull(); + expect(state.isDirty).toBe(false); + }); + + it('should set session data', async () => { + const { datasetReviewSessionStore, setSession } = await import('../datasetReviewSession.js'); + const mockSession = { session_id: 's1', user_id: 'u1' }; + + setSession(mockSession); + + let state = null; + const unsubscribe = datasetReviewSessionStore.subscribe(s => { state = s; }); + unsubscribe(); + + expect(state.session).toEqual(mockSession); + expect(state.isLoading).toBe(false); + expect(state.error).toBeNull(); + expect(state.lastUpdated).toBeInstanceOf(Date); + expect(state.isDirty).toBe(false); + }); + + it('should update loading state', async () => { + const { datasetReviewSessionStore, setLoading } = await import('../datasetReviewSession.js'); + + setLoading(true); + + let state = null; + const unsubscribe = datasetReviewSessionStore.subscribe(s => { state = s; }); + unsubscribe(); + + expect(state.isLoading).toBe(true); + }); + + it('should set error state', async () => { + const { datasetReviewSessionStore, setError } = await import('../datasetReviewSession.js'); + + setError('Failed to load'); + + let state = null; + const unsubscribe = datasetReviewSessionStore.subscribe(s => { state = s; }); + unsubscribe(); + + expect(state.error).toBe('Failed to load'); + expect(state.isLoading).toBe(false); + }); + + it('should mark as dirty', async () => { + const { datasetReviewSessionStore, setDirty } = await import('../datasetReviewSession.js'); + + setDirty(true); + + let state = null; + const unsubscribe = datasetReviewSessionStore.subscribe(s => { state = s; }); + unsubscribe(); + + expect(state.isDirty).toBe(true); + }); + + it('should patch session data and mark as dirty', async () => { + const { datasetReviewSessionStore, setSession, patchSession } = await import('../datasetReviewSession.js'); + const mockSession = { session_id: 's1', readiness_state: 'empty' }; + + setSession(mockSession); + patchSession({ readiness_state: 'ready' }); + + let state = null; + const unsubscribe = datasetReviewSessionStore.subscribe(s => { state = s; }); + unsubscribe(); + + expect(state.session.readiness_state).toBe('ready'); + expect(state.isDirty).toBe(true); + }); + + it('should reset session', async () => { + const { datasetReviewSessionStore, setSession, resetSession } = await import('../datasetReviewSession.js'); + setSession({ id: 's1' }); + + resetSession(); + + let state = null; + const unsubscribe = datasetReviewSessionStore.subscribe(s => { state = s; }); + unsubscribe(); + + expect(state.session).toBeNull(); + }); +}); + +// [/DEF:frontend.src.lib.stores.__tests__.test_datasetReviewSession:Module] \ No newline at end of file diff --git a/frontend/src/lib/stores/datasetReviewSession.js b/frontend/src/lib/stores/datasetReviewSession.js new file mode 100644 index 00000000..48bf1d05 --- /dev/null +++ b/frontend/src/lib/stores/datasetReviewSession.js @@ -0,0 +1,89 @@ +// [DEF:datasetReviewSession:Store] +// @COMPLEXITY: 4 +// @PURPOSE: Manage active dataset review session state, including loading, updates, and navigation guards. +// @LAYER: UI +// @RELATION: DEPENDS_ON -> api_module (requestApi/fetchApi) +// +// @UX_STATE: Loading -> Session detail is being fetched. +// @UX_STATE: Ready -> Session detail is available for UI binding. +// @UX_STATE: Saving -> Updates are being persisted. +// @UX_STATE: Error -> Failed to load or update session. +// @UX_REACTIVITY: Uses Svelte writable store for session aggregate. + +import { writable } from 'svelte/store'; + +// [SECTION: INITIAL STATE] +const initialState = { + session: null, + isLoading: false, + isSaving: false, + error: null, + lastUpdated: null, + isDirty: false +}; +// [/SECTION] + +export const datasetReviewSessionStore = writable(initialState); + +/** + * Set active session data + * @param {Object} session - Full SessionDetail aggregate + */ +export function setSession(session) { + datasetReviewSessionStore.update(state => ({ + ...state, + session, + isLoading: false, + error: null, + lastUpdated: new Date(), + isDirty: false + })); +} + +/** + * Update session loading state + * @param {boolean} isLoading + */ +export function setLoading(isLoading) { + datasetReviewSessionStore.update(state => ({ ...state, isLoading })); +} + +/** + * Set session error state + * @param {string|null} error + */ +export function setError(error) { + datasetReviewSessionStore.update(state => ({ ...state, error, isLoading: false, isSaving: false })); +} + +/** + * Mark session as dirty (unsaved changes) + * @param {boolean} isDirty + */ +export function setDirty(isDirty) { + datasetReviewSessionStore.update(state => ({ ...state, isDirty })); +} + +/** + * Reset store to initial state + */ +export function resetSession() { + datasetReviewSessionStore.set(initialState); +} + +/** + * Patch session data locally + * @param {Object} patch - Partial session data + */ +export function patchSession(patch) { + datasetReviewSessionStore.update(state => { + if (!state.session) return state; + return { + ...state, + session: { ...state.session, ...patch }, + isDirty: true + }; + }); +} + +// [/DEF:datasetReviewSession:Store] \ No newline at end of file