From 023bacde39b5bc9eb7620978b6c5f1ec08093dc1 Mon Sep 17 00:00:00 2001 From: busya Date: Tue, 17 Mar 2026 10:57:49 +0300 Subject: [PATCH] feat(us1): add dataset review orchestration automatic review slice --- backend/src/api/routes/__init__.py | 15 +- .../__tests__/test_dataset_review_api.py | 349 +++++++ backend/src/api/routes/dataset_review.py | 533 +++++++++++ backend/src/app.py | 197 ++-- .../core/utils/superset_context_extractor.py | 334 +++++++ backend/src/models/dataset_review.py | 1 - backend/src/schemas/dataset_review.py | 2 +- .../services/dataset_review/orchestrator.py | 386 ++++++++ .../repositories/session_repository.py | 97 +- .../dataset_review/semantic_resolver.py | 342 +++++++ .../lib/components/dataset-review/.gitkeep | 0 .../dataset-review/SourceIntakePanel.svelte | 332 +++++++ .../ValidationFindingsPanel.svelte | 332 +++++++ .../__tests__/source_intake_panel.ux.test.js | 161 ++++ .../validation_findings_panel.ux.test.js | 141 +++ frontend/src/lib/i18n/locales/en.json | 181 ++++ frontend/src/lib/i18n/locales/ru.json | 181 ++++ .../src/lib/stores/__tests__/mocks/state.js | 16 + .../__tests__/test_datasetReviewSession.js | 7 +- .../src/lib/stores/datasetReviewSession.js | 8 +- .../routes/datasets/review/[id]/+page.svelte | 896 ++++++++++++++++++ .../dataset_review_workspace.ux.test.js | 453 +++++++++ frontend/vitest.config.js | 1 + specs/027-dataset-llm-orchestration/tasks.md | 36 +- 24 files changed, 4870 insertions(+), 131 deletions(-) create mode 100644 backend/src/api/routes/__tests__/test_dataset_review_api.py create mode 100644 backend/src/api/routes/dataset_review.py create mode 100644 backend/src/core/utils/superset_context_extractor.py create mode 100644 backend/src/services/dataset_review/orchestrator.py create mode 100644 backend/src/services/dataset_review/semantic_resolver.py create mode 100644 frontend/src/lib/components/dataset-review/.gitkeep create mode 100644 frontend/src/lib/components/dataset-review/SourceIntakePanel.svelte create mode 100644 frontend/src/lib/components/dataset-review/ValidationFindingsPanel.svelte create mode 100644 frontend/src/lib/components/dataset-review/__tests__/source_intake_panel.ux.test.js create mode 100644 frontend/src/lib/components/dataset-review/__tests__/validation_findings_panel.ux.test.js create mode 100644 frontend/src/lib/stores/__tests__/mocks/state.js create mode 100644 frontend/src/routes/datasets/review/[id]/+page.svelte create mode 100644 frontend/src/routes/datasets/review/[id]/__tests__/dataset_review_workspace.ux.test.js diff --git a/backend/src/api/routes/__init__.py b/backend/src/api/routes/__init__.py index e845a972..b589aca5 100755 --- a/backend/src/api/routes/__init__.py +++ b/backend/src/api/routes/__init__.py @@ -1,17 +1,18 @@ -# [DEF:backend.src.api.routes.__init__:Module] +# [DEF:ApiRoutesModule:Module] # @COMPLEXITY: 3 # @SEMANTICS: routes, lazy-import, module-registry # @PURPOSE: Provide lazy route module loading to avoid heavyweight imports during tests. # @LAYER: API -# @RELATION: DEPENDS_ON -> importlib +# @RELATION: [CALLS] ->[ApiRoutesGetAttr] # @INVARIANT: Only names listed in __all__ are importable via __getattr__. -__all__ = ['plugins', 'tasks', 'settings', 'connections', 'environments', 'mappings', 'migration', 'git', 'storage', 'admin', 'reports', 'assistant', 'clean_release', 'profile'] +__all__ = ['plugins', 'tasks', 'settings', 'connections', 'environments', 'mappings', 'migration', 'git', 'storage', 'admin', 'reports', 'assistant', 'clean_release', 'profile', 'dataset_review'] -# [DEF:__getattr__:Function] -# @COMPLEXITY: 1 +# [DEF:ApiRoutesGetAttr:Function] +# @COMPLEXITY: 3 # @PURPOSE: Lazily import route module by attribute name. +# @RELATION: [DEPENDS_ON] ->[ApiRoutesModule] # @PRE: name is module candidate exposed in __all__. # @POST: Returns imported submodule or raises AttributeError. def __getattr__(name): @@ -19,5 +20,5 @@ def __getattr__(name): import importlib return importlib.import_module(f".{name}", __name__) raise AttributeError(f"module {__name__!r} has no attribute {name!r}") -# [/DEF:__getattr__:Function] -# [/DEF:backend.src.api.routes.__init__:Module] +# [/DEF:ApiRoutesGetAttr:Function] +# [/DEF:ApiRoutesModule:Module] diff --git a/backend/src/api/routes/__tests__/test_dataset_review_api.py b/backend/src/api/routes/__tests__/test_dataset_review_api.py new file mode 100644 index 00000000..7c6d8977 --- /dev/null +++ b/backend/src/api/routes/__tests__/test_dataset_review_api.py @@ -0,0 +1,349 @@ +# [DEF:DatasetReviewApiTests:Module] +# @COMPLEXITY: 3 +# @SEMANTICS: dataset_review, api, tests, lifecycle, exports, orchestration +# @PURPOSE: Verify backend US1 dataset review lifecycle, export, parsing, and dictionary-resolution contracts. +# @LAYER: API +# @RELATION: [BINDS_TO] ->[DatasetReviewApi] +# @RELATION: [BINDS_TO] ->[DatasetReviewOrchestrator] + +from datetime import datetime, timezone +import json +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + +from src.app import app +from src.api.routes.dataset_review import _get_orchestrator, _get_repository +from src.core.config_models import Environment, GlobalSettings, AppConfig +from src.core.utils.superset_context_extractor import SupersetContextExtractor +from src.dependencies import get_config_manager, get_current_user, get_task_manager +from src.models.dataset_review import ( + BusinessSummarySource, + ConfidenceState, + DatasetReviewSession, + FindingArea, + FindingSeverity, + ReadinessState, + RecommendedAction, + ResolutionState, + SessionPhase, + SessionStatus, +) +from src.services.dataset_review.orchestrator import DatasetReviewOrchestrator, StartSessionCommand +from src.services.dataset_review.semantic_resolver import SemanticSourceResolver + + +client = TestClient(app) + + +# [DEF:_make_user:Function] +def _make_user(): + admin_role = SimpleNamespace(name="Admin", permissions=[]) + return SimpleNamespace(id="user-1", username="tester", roles=[admin_role]) +# [/DEF:_make_user:Function] + + +# [DEF:_make_config_manager:Function] +def _make_config_manager(): + env = Environment( + id="env-1", + name="DEV", + url="http://superset.local", + username="demo", + password="secret", + ) + config = AppConfig(environments=[env], settings=GlobalSettings()) + manager = MagicMock() + manager.get_environment.side_effect = lambda env_id: env if env_id == "env-1" else None + manager.get_config.return_value = config + return manager +# [/DEF:_make_config_manager:Function] + + +# [DEF:_make_session:Function] +def _make_session(): + now = datetime.now(timezone.utc) + return DatasetReviewSession( + session_id="sess-1", + user_id="user-1", + environment_id="env-1", + source_kind="superset_link", + source_input="http://superset.local/dashboard/10", + dataset_ref="public.sales", + dataset_id=42, + dashboard_id=10, + readiness_state=ReadinessState.REVIEW_READY, + recommended_action=RecommendedAction.REVIEW_DOCUMENTATION, + status=SessionStatus.ACTIVE, + current_phase=SessionPhase.REVIEW, + created_at=now, + updated_at=now, + last_activity_at=now, + ) +# [/DEF:_make_session:Function] + + +# [DEF:dataset_review_api_dependencies:Function] +@pytest.fixture(autouse=True) +def dataset_review_api_dependencies(): + mock_user = _make_user() + config_manager = _make_config_manager() + task_manager = MagicMock() + + app.dependency_overrides[get_current_user] = lambda: mock_user + app.dependency_overrides[get_config_manager] = lambda: config_manager + app.dependency_overrides[get_task_manager] = lambda: task_manager + + yield { + "user": mock_user, + "config_manager": config_manager, + "task_manager": task_manager, + } + app.dependency_overrides.clear() +# [/DEF:dataset_review_api_dependencies:Function] + + +# [DEF:test_parse_superset_link_dashboard_partial_recovery:Function] +# @PURPOSE: Verify dashboard links recover dataset context and preserve explicit partial-recovery markers. +def test_parse_superset_link_dashboard_partial_recovery(): + env = Environment( + id="env-1", + name="DEV", + url="http://superset.local", + username="demo", + password="secret", + ) + fake_client = MagicMock() + fake_client.get_dashboard_detail.return_value = { + "datasets": [{"id": 42}, {"id": 77}], + } + fake_client.get_dataset_detail.return_value = { + "table_name": "sales", + "schema": "public", + } + + extractor = SupersetContextExtractor(environment=env, client=fake_client) + result = extractor.parse_superset_link( + "http://superset.local/dashboard/10/?native_filters=%5B%7B%22name%22%3A%22country%22%2C%22value%22%3A%22DE%22%7D%5D" + ) + + assert result.dataset_id == 42 + assert result.dashboard_id == 10 + assert result.dataset_ref == "public.sales" + assert result.partial_recovery is True + assert "multiple_dashboard_datasets" in result.unresolved_references + assert result.imported_filters[0]["filter_name"] == "country" +# [/DEF:test_parse_superset_link_dashboard_partial_recovery:Function] + + +# [DEF:test_resolve_from_dictionary_prefers_exact_match:Function] +# @PURPOSE: Verify trusted dictionary exact matches outrank fuzzy candidates and unresolved fields stay explicit. +def test_resolve_from_dictionary_prefers_exact_match(): + resolver = SemanticSourceResolver() + result = resolver.resolve_from_dictionary( + { + "source_ref": "dict://finance", + "rows": [ + { + "field_name": "revenue", + "verbose_name": "Revenue", + "description": "Recognized revenue amount", + "display_format": "$,.2f", + }, + { + "field_name": "revnue", + "verbose_name": "Revenue typo", + "description": "Fuzzy variant", + }, + ], + }, + [ + {"field_name": "revenue", "is_locked": False}, + {"field_name": "margin", "is_locked": False}, + ], + ) + + resolved_exact = next(item for item in result.resolved_fields if item["field_name"] == "revenue") + unresolved = next(item for item in result.resolved_fields if item["field_name"] == "margin") + + assert resolved_exact["applied_candidate"]["match_type"] == "exact" + assert resolved_exact["provenance"] == "dictionary_exact" + assert unresolved["status"] == "unresolved" + assert "margin" in result.unresolved_fields + assert result.partial_recovery is True +# [/DEF:test_resolve_from_dictionary_prefers_exact_match:Function] + + +# [DEF:test_orchestrator_start_session_preserves_partial_recovery:Function] +# @PURPOSE: Verify session start persists usable recovery-required state when Superset intake is partial. +def test_orchestrator_start_session_preserves_partial_recovery(dataset_review_api_dependencies): + repository = MagicMock() + created_session = _make_session() + created_session.readiness_state = ReadinessState.RECOVERY_REQUIRED + created_session.current_phase = SessionPhase.RECOVERY + + repository.create_session.return_value = created_session + repository.save_profile_and_findings.return_value = created_session + repository.db = MagicMock() + + orchestrator = DatasetReviewOrchestrator( + repository=repository, + config_manager=dataset_review_api_dependencies["config_manager"], + task_manager=None, + ) + + parsed_context = SimpleNamespace( + dataset_ref="public.sales", + dataset_id=42, + dashboard_id=10, + chart_id=None, + partial_recovery=True, + unresolved_references=["dashboard_dataset_binding_missing"], + ) + + with patch( + "src.services.dataset_review.orchestrator.SupersetContextExtractor.parse_superset_link", + return_value=parsed_context, + ): + result = orchestrator.start_session( + StartSessionCommand( + user=dataset_review_api_dependencies["user"], + environment_id="env-1", + source_kind="superset_link", + source_input="http://superset.local/dashboard/10", + ) + ) + + assert result.session.readiness_state == ReadinessState.RECOVERY_REQUIRED + assert result.findings + assert result.findings[0].severity.value == "warning" + repository.create_session.assert_called_once() + repository.save_profile_and_findings.assert_called_once() +# [/DEF:test_orchestrator_start_session_preserves_partial_recovery:Function] + + +# [DEF:test_start_session_endpoint_returns_created_summary:Function] +# @PURPOSE: Verify POST session lifecycle endpoint returns a persisted ownership-scoped summary. +def test_start_session_endpoint_returns_created_summary(dataset_review_api_dependencies): + session = _make_session() + orchestrator = MagicMock() + orchestrator.start_session.return_value = SimpleNamespace(session=session, findings=[], parsed_context=None) + + app.dependency_overrides[_get_orchestrator] = lambda: orchestrator + + response = client.post( + "/api/dataset-orchestration/sessions", + json={ + "source_kind": "superset_link", + "source_input": "http://superset.local/dashboard/10", + "environment_id": "env-1", + }, + ) + + assert response.status_code == 201 + payload = response.json() + assert payload["session_id"] == "sess-1" + assert payload["dataset_ref"] == "public.sales" + assert payload["environment_id"] == "env-1" +# [/DEF:test_start_session_endpoint_returns_created_summary:Function] + + +# [DEF:test_get_session_detail_export_and_lifecycle_endpoints:Function] +# @PURPOSE: Verify lifecycle get/patch/delete plus documentation and validation exports remain ownership-scoped and usable. +def test_get_session_detail_export_and_lifecycle_endpoints(dataset_review_api_dependencies): + now = datetime.now(timezone.utc) + session = MagicMock(spec=DatasetReviewSession) + session.session_id = "sess-1" + session.user_id = "user-1" + session.environment_id = "env-1" + session.source_kind = "superset_link" + session.source_input = "http://superset.local/dashboard/10" + session.dataset_ref = "public.sales" + session.dataset_id = 42 + session.dashboard_id = 10 + session.readiness_state = ReadinessState.REVIEW_READY + session.recommended_action = RecommendedAction.REVIEW_DOCUMENTATION + session.status = SessionStatus.ACTIVE + session.current_phase = SessionPhase.REVIEW + session.created_at = now + session.updated_at = now + session.last_activity_at = now + session.profile = SimpleNamespace( + dataset_name="sales", + business_summary="Summary text", + confidence_state=ConfidenceState.MOSTLY_CONFIRMED, + dataset_type="unknown", + schema_name=None, + database_name=None, + business_summary_source=BusinessSummarySource.IMPORTED, + description=None, + is_sqllab_view=False, + completeness_score=None, + has_blocking_findings=False, + has_warning_findings=True, + manual_summary_locked=False, + created_at=now, + updated_at=now, + profile_id="profile-1", + session_id="sess-1", + ) + session.findings = [ + SimpleNamespace( + finding_id="f-1", + session_id="sess-1", + area=FindingArea.SOURCE_INTAKE, + severity=FindingSeverity.WARNING, + code="PARTIAL_SUPERSET_RECOVERY", + title="Partial", + message="Some filters require review", + resolution_state=ResolutionState.OPEN, + resolution_note=None, + caused_by_ref=None, + created_at=now, + resolved_at=None, + ) + ] + session.collaborators = [] + session.semantic_sources = [] + session.semantic_fields = [] + session.imported_filters = [] + session.template_variables = [] + session.execution_mappings = [] + session.clarification_sessions = [] + session.previews = [] + session.run_contexts = [] + + repository = MagicMock() + repository.load_session_detail.return_value = session + repository.list_sessions_for_user.return_value = [session] + repository.db = MagicMock() + + app.dependency_overrides[_get_repository] = lambda: repository + + detail_response = client.get("/api/dataset-orchestration/sessions/sess-1") + assert detail_response.status_code == 200 + assert detail_response.json()["session_id"] == "sess-1" + + patch_response = client.patch( + "/api/dataset-orchestration/sessions/sess-1", + json={"status": "paused"}, + ) + assert patch_response.status_code == 200 + assert patch_response.json()["status"] == "paused" + + doc_response = client.get("/api/dataset-orchestration/sessions/sess-1/exports/documentation?format=json") + assert doc_response.status_code == 200 + assert doc_response.json()["artifact_type"] == "documentation" + + validation_response = client.get("/api/dataset-orchestration/sessions/sess-1/exports/validation?format=markdown") + assert validation_response.status_code == 200 + assert validation_response.json()["artifact_type"] == "validation_report" + assert "Validation Report" in validation_response.json()["content"]["markdown"] + + delete_response = client.delete("/api/dataset-orchestration/sessions/sess-1") + assert delete_response.status_code == 204 +# [/DEF:test_get_session_detail_export_and_lifecycle_endpoints:Function] + +# [/DEF:DatasetReviewApiTests:Module] \ No newline at end of file diff --git a/backend/src/api/routes/dataset_review.py b/backend/src/api/routes/dataset_review.py new file mode 100644 index 00000000..484245ee --- /dev/null +++ b/backend/src/api/routes/dataset_review.py @@ -0,0 +1,533 @@ +# [DEF:DatasetReviewApi:Module] +# @COMPLEXITY: 4 +# @SEMANTICS: dataset_review, api, session_lifecycle, exports, rbac, feature_flags +# @PURPOSE: Expose dataset review session lifecycle and export endpoints for backend US1. +# @LAYER: API +# @RELATION: [DEPENDS_ON] ->[AppDependencies] +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository] +# @RELATION: [DEPENDS_ON] ->[DatasetReviewOrchestrator] +# @PRE: Authenticated user and valid environment/session scope are required for all mutations and reads. +# @POST: Returns ownership-scoped session state and export payloads with feature-flag/RBAC enforcement. +# @SIDE_EFFECT: Persists session state and may enqueue recovery task. +# @DATA_CONTRACT: Input[HTTP Request] -> Output[SessionSummary | SessionDetail | ExportArtifactResponse | HTTP 204] +# @INVARIANT: No cross-user session leakage is allowed; export payloads only expose the current user's accessible session. + +from __future__ import annotations + +# [DEF:DatasetReviewApi.imports:Block] +import json +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, Response, status +from pydantic import BaseModel, Field +from sqlalchemy.orm import Session + +from src.core.database import get_db +from src.core.logger import belief_scope, logger +from src.dependencies import get_config_manager, get_current_user, get_task_manager, has_permission +from src.models.auth import User +from src.models.dataset_review import ( + ArtifactFormat, + DatasetReviewSession, + RecommendedAction, + SessionStatus, +) +from src.schemas.dataset_review import SessionDetail, SessionSummary +from src.services.dataset_review.orchestrator import ( + DatasetReviewOrchestrator, + StartSessionCommand, +) +from src.services.dataset_review.repositories.session_repository import ( + DatasetReviewSessionRepository, +) +# [/DEF:DatasetReviewApi.imports:Block] + +router = APIRouter(prefix="/api/dataset-orchestration", tags=["Dataset Orchestration"]) + + +# [DEF:StartSessionRequest:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Request DTO for starting one dataset review session from a Superset link or dataset selection. +class StartSessionRequest(BaseModel): + source_kind: str = Field(..., pattern="^(superset_link|dataset_selection)$") + source_input: str = Field(..., min_length=1) + environment_id: str = Field(..., min_length=1) +# [/DEF:StartSessionRequest:Class] + + +# [DEF:UpdateSessionRequest:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Request DTO for lifecycle state updates on an existing session. +class UpdateSessionRequest(BaseModel): + status: SessionStatus + note: Optional[str] = None +# [/DEF:UpdateSessionRequest:Class] + + +# [DEF:SessionCollectionResponse:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Paginated ownership-scoped dataset review session collection response. +class SessionCollectionResponse(BaseModel): + items: List[SessionSummary] + total: int + page: int + page_size: int + has_next: bool +# [/DEF:SessionCollectionResponse:Class] + + +# [DEF:ExportArtifactResponse:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Inline export response for documentation or validation outputs without introducing unrelated persistence changes. +class ExportArtifactResponse(BaseModel): + artifact_id: str + session_id: str + artifact_type: str + format: str + storage_ref: str + created_by_user_id: str + created_at: Optional[str] = None + content: Dict[str, Any] +# [/DEF:ExportArtifactResponse:Class] + + +# [DEF:_require_auto_review_flag:Function] +# @COMPLEXITY: 3 +# @PURPOSE: Guard US1 dataset review endpoints behind the configured feature flag. +# @RELATION: [DEPENDS_ON] ->[ConfigManager] +def _require_auto_review_flag(config_manager=Depends(get_config_manager)) -> bool: + with belief_scope("dataset_review.require_auto_review_flag"): + if not config_manager.get_config().settings.ff_dataset_auto_review: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Dataset auto review feature is disabled", + ) + return True +# [/DEF:_require_auto_review_flag:Function] + + +# [DEF:_get_repository:Function] +# @COMPLEXITY: 2 +# @PURPOSE: Build repository dependency for dataset review session aggregate access. +def _get_repository(db: Session = Depends(get_db)) -> DatasetReviewSessionRepository: + return DatasetReviewSessionRepository(db) +# [/DEF:_get_repository:Function] + + +# [DEF:_get_orchestrator:Function] +# @COMPLEXITY: 3 +# @PURPOSE: Build orchestrator dependency for session lifecycle actions. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewOrchestrator] +def _get_orchestrator( + repository: DatasetReviewSessionRepository = Depends(_get_repository), + config_manager=Depends(get_config_manager), + task_manager=Depends(get_task_manager), +) -> DatasetReviewOrchestrator: + return DatasetReviewOrchestrator( + repository=repository, + config_manager=config_manager, + task_manager=task_manager, + ) +# [/DEF:_get_orchestrator:Function] + + +# [DEF:_serialize_session_summary:Function] +# @COMPLEXITY: 3 +# @PURPOSE: Map SQLAlchemy session aggregate root into stable API summary DTO. +# @RELATION: [DEPENDS_ON] ->[SessionSummary] +def _serialize_session_summary(session: DatasetReviewSession) -> SessionSummary: + return SessionSummary.model_validate(session, from_attributes=True) +# [/DEF:_serialize_session_summary:Function] + + +# [DEF:_serialize_session_detail:Function] +# @COMPLEXITY: 3 +# @PURPOSE: Map SQLAlchemy session aggregate root into stable API detail DTO. +# @RELATION: [DEPENDS_ON] ->[SessionDetail] +def _serialize_session_detail(session: DatasetReviewSession) -> SessionDetail: + return SessionDetail.model_validate(session, from_attributes=True) +# [/DEF:_serialize_session_detail:Function] + + +# [DEF:_get_owned_session_or_404:Function] +# @COMPLEXITY: 4 +# @PURPOSE: Resolve one session for current user or collaborator scope, returning 404 when inaccessible. +# @RELATION: [CALLS] ->[load_detail] +# @PRE: session_id is a non-empty identifier and current_user is authenticated. +# @POST: returns accessible session detail or raises HTTP 404 without leaking foreign-session existence. +# @SIDE_EFFECT: none. +# @DATA_CONTRACT: Input[session_id:str,current_user:User] -> Output[DatasetReviewSession|HTTPException] +def _get_owned_session_or_404( + repository: DatasetReviewSessionRepository, + session_id: str, + current_user: User, +) -> DatasetReviewSession: + with belief_scope("dataset_review.get_owned_session_or_404"): + session = repository.load_session_detail(session_id, current_user.id) + if session is None: + logger.explore( + "Dataset review session not found in current ownership scope", + extra={"session_id": session_id, "user_id": current_user.id}, + ) + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") + return session +# [/DEF:_get_owned_session_or_404:Function] + + +# [DEF:_build_documentation_export:Function] +# @COMPLEXITY: 3 +# @PURPOSE: Produce session documentation export content from current persisted review state. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] +def _build_documentation_export(session: DatasetReviewSession, export_format: ArtifactFormat) -> Dict[str, Any]: + profile = session.profile + findings = sorted(session.findings, key=lambda item: (item.severity.value, item.code)) + if export_format == ArtifactFormat.MARKDOWN: + lines = [ + f"# Dataset Review: {session.dataset_ref}", + "", + f"- Session ID: {session.session_id}", + f"- Environment: {session.environment_id}", + f"- Readiness: {session.readiness_state.value}", + f"- Recommended action: {session.recommended_action.value}", + "", + "## Business Summary", + profile.business_summary if profile else "No profile summary available.", + "", + "## Findings", + ] + if findings: + for finding in findings: + lines.append( + f"- [{finding.severity.value}] {finding.title}: {finding.message}" + ) + else: + lines.append("- No findings recorded.") + content = {"markdown": "\n".join(lines)} + storage_ref = f"inline://dataset-review/{session.session_id}/documentation.md" + else: + content = { + "session": _serialize_session_summary(session).model_dump(mode="json"), + "profile": profile and { + "dataset_name": profile.dataset_name, + "business_summary": profile.business_summary, + "confidence_state": profile.confidence_state.value, + "dataset_type": profile.dataset_type, + }, + "findings": [ + { + "code": finding.code, + "severity": finding.severity.value, + "title": finding.title, + "message": finding.message, + "resolution_state": finding.resolution_state.value, + } + for finding in findings + ], + } + storage_ref = f"inline://dataset-review/{session.session_id}/documentation.json" + return {"storage_ref": storage_ref, "content": content} +# [/DEF:_build_documentation_export:Function] + + +# [DEF:_build_validation_export:Function] +# @COMPLEXITY: 3 +# @PURPOSE: Produce validation-focused export content from persisted findings and readiness state. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] +def _build_validation_export(session: DatasetReviewSession, export_format: ArtifactFormat) -> Dict[str, Any]: + findings = sorted(session.findings, key=lambda item: (item.severity.value, item.code)) + if export_format == ArtifactFormat.MARKDOWN: + lines = [ + f"# Validation Report: {session.dataset_ref}", + "", + f"- Session ID: {session.session_id}", + f"- Readiness: {session.readiness_state.value}", + "", + "## Findings", + ] + if findings: + for finding in findings: + lines.append( + f"- `{finding.code}` [{finding.severity.value}] {finding.message}" + ) + else: + lines.append("- No findings recorded.") + content = {"markdown": "\n".join(lines)} + storage_ref = f"inline://dataset-review/{session.session_id}/validation.md" + else: + content = { + "session_id": session.session_id, + "dataset_ref": session.dataset_ref, + "readiness_state": session.readiness_state.value, + "findings": [ + { + "finding_id": finding.finding_id, + "area": finding.area.value, + "severity": finding.severity.value, + "code": finding.code, + "title": finding.title, + "message": finding.message, + "resolution_state": finding.resolution_state.value, + } + for finding in findings + ], + } + storage_ref = f"inline://dataset-review/{session.session_id}/validation.json" + return {"storage_ref": storage_ref, "content": content} +# [/DEF:_build_validation_export:Function] + + +# [DEF:list_sessions:Function] +# @COMPLEXITY: 3 +# @PURPOSE: List resumable dataset review sessions for the current user. +# @RELATION: [CALLS] ->[list_user_sess] +@router.get( + "/sessions", + response_model=SessionCollectionResponse, + dependencies=[ + Depends(_require_auto_review_flag), + Depends(has_permission("dataset:session", "READ")), + ], +) +async def list_sessions( + page: int = Query(1, ge=1), + page_size: int = Query(20, ge=1, le=100), + repository: DatasetReviewSessionRepository = Depends(_get_repository), + current_user: User = Depends(get_current_user), +): + with belief_scope("dataset_review.list_sessions"): + sessions = repository.list_sessions_for_user(current_user.id) + start = (page - 1) * page_size + end = start + page_size + items = [_serialize_session_summary(session) for session in sessions[start:end]] + return SessionCollectionResponse( + items=items, + total=len(sessions), + page=page, + page_size=page_size, + has_next=end < len(sessions), + ) +# [/DEF:list_sessions:Function] + + +# [DEF:start_session:Function] +# @COMPLEXITY: 4 +# @PURPOSE: Start a new dataset review session from a Superset link or dataset selection. +# @RELATION: [CALLS] ->[DatasetReviewOrchestrator.start_session] +# @PRE: feature flag enabled, user authenticated, and request body valid. +# @POST: returns persisted session summary scoped to the authenticated user. +# @SIDE_EFFECT: persists session/profile/findings and may enqueue recovery task. +# @DATA_CONTRACT: Input[StartSessionRequest] -> Output[SessionSummary] +@router.post( + "/sessions", + response_model=SessionSummary, + status_code=status.HTTP_201_CREATED, + dependencies=[ + Depends(_require_auto_review_flag), + Depends(has_permission("dataset:session", "MANAGE")), + ], +) +async def start_session( + request: StartSessionRequest, + orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator), + current_user: User = Depends(get_current_user), +): + with belief_scope("dataset_review.start_session"): + try: + result = orchestrator.start_session( + StartSessionCommand( + user=current_user, + environment_id=request.environment_id, + source_kind=request.source_kind, + source_input=request.source_input, + ) + ) + except ValueError as exc: + logger.explore( + "Dataset review session start rejected", + extra={"user_id": current_user.id, "error": str(exc)}, + ) + detail = str(exc) + status_code = status.HTTP_404_NOT_FOUND if detail == "Environment not found" else status.HTTP_400_BAD_REQUEST + raise HTTPException(status_code=status_code, detail=detail) from exc + + return _serialize_session_summary(result.session) +# [/DEF:start_session:Function] + + +# [DEF:get_session_detail:Function] +# @COMPLEXITY: 3 +# @PURPOSE: Return the full accessible dataset review session aggregate for current user scope. +# @RELATION: [CALLS] ->[_get_owned_session_or_404] +@router.get( + "/sessions/{session_id}", + response_model=SessionDetail, + dependencies=[ + Depends(_require_auto_review_flag), + Depends(has_permission("dataset:session", "READ")), + ], +) +async def get_session_detail( + session_id: str, + repository: DatasetReviewSessionRepository = Depends(_get_repository), + current_user: User = Depends(get_current_user), +): + with belief_scope("dataset_review.get_session_detail"): + session = _get_owned_session_or_404(repository, session_id, current_user) + return _serialize_session_detail(session) +# [/DEF:get_session_detail:Function] + + +# [DEF:update_session:Function] +# @COMPLEXITY: 4 +# @PURPOSE: Update resumable lifecycle status for an owned dataset review session. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] +# @PRE: session is accessible to current user and requested status is allowed by lifecycle policy. +# @POST: returns updated summary without changing ownership or unrelated aggregates. +# @SIDE_EFFECT: mutates session lifecycle fields in persistence. +# @DATA_CONTRACT: Input[UpdateSessionRequest] -> Output[SessionSummary] +@router.patch( + "/sessions/{session_id}", + response_model=SessionSummary, + dependencies=[ + Depends(_require_auto_review_flag), + Depends(has_permission("dataset:session", "MANAGE")), + ], +) +async def update_session( + session_id: str, + request: UpdateSessionRequest, + repository: DatasetReviewSessionRepository = Depends(_get_repository), + current_user: User = Depends(get_current_user), +): + with belief_scope("dataset_review.update_session"): + session = _get_owned_session_or_404(repository, session_id, current_user) + if session.user_id != current_user.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only the owner can mutate session lifecycle") + + session.status = request.status + if request.status == SessionStatus.PAUSED: + session.recommended_action = RecommendedAction.RESUME_SESSION + elif request.status in {SessionStatus.ARCHIVED, SessionStatus.CANCELLED, SessionStatus.COMPLETED}: + session.active_task_id = None + + repository.db.commit() + repository.db.refresh(session) + return _serialize_session_summary(session) +# [/DEF:update_session:Function] + + +# [DEF:delete_session:Function] +# @COMPLEXITY: 4 +# @PURPOSE: Archive or hard-delete a session owned by the current user. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] +# @PRE: session is owner-scoped to current user. +# @POST: session is archived or deleted and no foreign-session existence is disclosed. +# @SIDE_EFFECT: mutates or deletes persisted session aggregate. +# @DATA_CONTRACT: Input[session_id:str,hard_delete:bool] -> Output[HTTP 204] +@router.delete( + "/sessions/{session_id}", + status_code=status.HTTP_204_NO_CONTENT, + dependencies=[ + Depends(_require_auto_review_flag), + Depends(has_permission("dataset:session", "MANAGE")), + ], +) +async def delete_session( + session_id: str, + hard_delete: bool = Query(False), + repository: DatasetReviewSessionRepository = Depends(_get_repository), + current_user: User = Depends(get_current_user), +): + with belief_scope("dataset_review.delete_session"): + session = _get_owned_session_or_404(repository, session_id, current_user) + if session.user_id != current_user.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only the owner can delete a session") + + if hard_delete: + repository.db.delete(session) + else: + session.status = SessionStatus.ARCHIVED + session.active_task_id = None + repository.db.commit() + return Response(status_code=status.HTTP_204_NO_CONTENT) +# [/DEF:delete_session:Function] + + +# [DEF:export_documentation:Function] +# @COMPLEXITY: 4 +# @PURPOSE: Export documentation output for the current session in JSON or Markdown form. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] +# @PRE: session is accessible to current user and requested format is supported. +# @POST: returns ownership-scoped export payload without fabricating unrelated artifacts. +# @SIDE_EFFECT: none beyond response construction. +# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse] +@router.get( + "/sessions/{session_id}/exports/documentation", + response_model=ExportArtifactResponse, + dependencies=[ + Depends(_require_auto_review_flag), + Depends(has_permission("dataset:session", "READ")), + ], +) +async def export_documentation( + session_id: str, + format: ArtifactFormat = Query(ArtifactFormat.JSON), + repository: DatasetReviewSessionRepository = Depends(_get_repository), + current_user: User = Depends(get_current_user), +): + with belief_scope("dataset_review.export_documentation"): + if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Only json and markdown exports are supported") + session = _get_owned_session_or_404(repository, session_id, current_user) + export_payload = _build_documentation_export(session, format) + return ExportArtifactResponse( + artifact_id=f"documentation-{session.session_id}-{format.value}", + session_id=session.session_id, + artifact_type="documentation", + format=format.value, + storage_ref=export_payload["storage_ref"], + created_by_user_id=current_user.id, + content=export_payload["content"], + ) +# [/DEF:export_documentation:Function] + + +# [DEF:export_validation:Function] +# @COMPLEXITY: 4 +# @PURPOSE: Export validation findings for the current session in JSON or Markdown form. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] +# @PRE: session is accessible to current user and requested format is supported. +# @POST: returns explicit validation export payload scoped to current user session access. +# @SIDE_EFFECT: none beyond response construction. +# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse] +@router.get( + "/sessions/{session_id}/exports/validation", + response_model=ExportArtifactResponse, + dependencies=[ + Depends(_require_auto_review_flag), + Depends(has_permission("dataset:session", "READ")), + ], +) +async def export_validation( + session_id: str, + format: ArtifactFormat = Query(ArtifactFormat.JSON), + repository: DatasetReviewSessionRepository = Depends(_get_repository), + current_user: User = Depends(get_current_user), +): + with belief_scope("dataset_review.export_validation"): + if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Only json and markdown exports are supported") + session = _get_owned_session_or_404(repository, session_id, current_user) + export_payload = _build_validation_export(session, format) + return ExportArtifactResponse( + artifact_id=f"validation-{session.session_id}-{format.value}", + session_id=session.session_id, + artifact_type="validation_report", + format=format.value, + storage_ref=export_payload["storage_ref"], + created_by_user_id=current_user.id, + content=export_payload["content"], + ) +# [/DEF:export_validation:Function] + +# [/DEF:DatasetReviewApi:Module] \ No newline at end of file diff --git a/backend/src/app.py b/backend/src/app.py index eecaf96d..8a423855 100755 --- a/backend/src/app.py +++ b/backend/src/app.py @@ -3,8 +3,8 @@ # @SEMANTICS: app, main, entrypoint, fastapi # @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming. # @LAYER: UI (API) -# @RELATION: DEPENDS_ON ->[AppDependencies] -# @RELATION: DEPENDS_ON ->[backend.src.api.routes] +# @RELATION: [DEPENDS_ON] ->[AppDependencies] +# @RELATION: [DEPENDS_ON] ->[ApiRoutesModule] # @INVARIANT: Only one FastAPI app instance exists per process. # @INVARIANT: All WebSocket connections must be properly cleaned up on disconnect. # @PRE: Python environment and dependencies installed; configuration database available. @@ -28,7 +28,7 @@ from .dependencies import get_task_manager, get_scheduler_service from .core.encryption_key import ensure_encryption_key from .core.utils.network import NetworkError from .core.logger import logger, belief_scope -from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile, health +from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile, health, dataset_review from .api import auth # [DEF:App:Global] @@ -45,6 +45,7 @@ app = FastAPI( # [DEF:startup_event:Function] # @COMPLEXITY: 3 # @PURPOSE: Handles application startup tasks, such as starting the scheduler. +# @RELATION: [CALLS] ->[AppDependencies] # @PRE: None. # @POST: Scheduler is started. # Startup event @@ -59,6 +60,7 @@ async def startup_event(): # [DEF:shutdown_event:Function] # @COMPLEXITY: 3 # @PURPOSE: Handles application shutdown tasks, such as stopping the scheduler. +# @RELATION: [CALLS] ->[AppDependencies] # @PRE: None. # @POST: Scheduler is stopped. # Shutdown event @@ -106,6 +108,7 @@ async def network_error_handler(request: Request, exc: NetworkError): # [DEF:log_requests:Function] # @COMPLEXITY: 3 # @PURPOSE: Middleware to log incoming HTTP requests and their response status. +# @RELATION: [DEPENDS_ON] ->[LoggerModule] # @PRE: request is a FastAPI Request object. # @POST: Logs request and response details. # @PARAM: request (Request) - The incoming request object. @@ -154,6 +157,7 @@ app.include_router(assistant.router, prefix="/api/assistant", tags=["Assistant"] app.include_router(clean_release.router) app.include_router(clean_release_v2.router) app.include_router(profile.router) +app.include_router(dataset_review.router) app.include_router(health.router) # [/DEF:api_routes:Block] @@ -168,10 +172,13 @@ app.include_router(health.router) # [DEF:websocket_endpoint:Function] # @COMPLEXITY: 5 # @PURPOSE: Provides a WebSocket endpoint for real-time log streaming of a task with server-side filtering. +# @RELATION: [CALLS] ->[TaskManagerPackage] +# @RELATION: [DEPENDS_ON] ->[LoggerModule] # @PRE: task_id must be a valid task ID. # @POST: WebSocket connection is managed and logs are streamed until disconnect. # @SIDE_EFFECT: Subscribes to TaskManager log queue and broadcasts messages over network. # @DATA_CONTRACT: [task_id: str, source: str, level: str] -> [JSON log entry objects] +# @INVARIANT: Every accepted WebSocket subscription is unsubscribed exactly once even when streaming fails or the client disconnects. # @UX_STATE: Connecting -> Streaming -> (Disconnected) # # @TEST_CONTRACT: WebSocketLogStreamApi -> @@ -204,85 +211,121 @@ async def websocket_endpoint( """ with belief_scope("websocket_endpoint", f"task_id={task_id}"): await websocket.accept() - - # Normalize filter parameters - source_filter = source.lower() if source else None - level_filter = level.upper() if level else None - - # Level hierarchy for filtering - level_hierarchy = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3} - min_level = level_hierarchy.get(level_filter, 0) if level_filter else 0 - - logger.info(f"WebSocket connection accepted for task {task_id} (source={source_filter}, level={level_filter})") - task_manager = get_task_manager() - queue = await task_manager.subscribe_logs(task_id) - - def matches_filters(log_entry) -> bool: - """Check if log entry matches the filter criteria.""" - # Check source filter - if source_filter and log_entry.source.lower() != source_filter: - return False - - # Check level filter - if level_filter: - log_level = level_hierarchy.get(log_entry.level.upper(), 0) - if log_level < min_level: + + source_filter = source.lower() if source else None + level_filter = level.upper() if level else None + level_hierarchy = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3} + min_level = level_hierarchy.get(level_filter, 0) if level_filter else 0 + + logger.reason( + "Accepted WebSocket log stream connection", + extra={ + "task_id": task_id, + "source_filter": source_filter, + "level_filter": level_filter, + "min_level": min_level, + }, + ) + + task_manager = get_task_manager() + queue = await task_manager.subscribe_logs(task_id) + logger.reason( + "Subscribed WebSocket client to task log queue", + extra={"task_id": task_id}, + ) + + def matches_filters(log_entry) -> bool: + """Check if log entry matches the filter criteria.""" + log_source = getattr(log_entry, "source", None) + if source_filter and str(log_source or "").lower() != source_filter: return False - - return True - - try: - # Stream new logs - logger.info(f"Starting log stream for task {task_id}") - - # Send initial logs first to build context (apply filters) - initial_logs = task_manager.get_task_logs(task_id) - for log_entry in initial_logs: - if matches_filters(log_entry): + + if level_filter: + log_level = level_hierarchy.get(str(log_entry.level).upper(), 0) + if log_level < min_level: + return False + + return True + + try: + logger.reason( + "Starting task log stream replay and live forwarding", + extra={"task_id": task_id}, + ) + + initial_logs = task_manager.get_task_logs(task_id) + initial_sent = 0 + for log_entry in initial_logs: + if matches_filters(log_entry): + log_dict = log_entry.dict() + log_dict["timestamp"] = log_dict["timestamp"].isoformat() + await websocket.send_json(log_dict) + initial_sent += 1 + + logger.reflect( + "Initial task log replay completed", + extra={ + "task_id": task_id, + "replayed_logs": initial_sent, + "total_available_logs": len(initial_logs), + }, + ) + + task = task_manager.get_task(task_id) + if task and task.status == "AWAITING_INPUT" and task.input_request: + synthetic_log = { + "timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00", + "level": "INFO", + "message": "Task paused for user input (Connection Re-established)", + "context": {"input_request": task.input_request}, + } + await websocket.send_json(synthetic_log) + logger.reason( + "Replayed awaiting-input prompt to restored WebSocket client", + extra={"task_id": task_id, "task_status": task.status}, + ) + + while True: + log_entry = await queue.get() + + if not matches_filters(log_entry): + continue + log_dict = log_entry.dict() - log_dict['timestamp'] = log_dict['timestamp'].isoformat() + log_dict["timestamp"] = log_dict["timestamp"].isoformat() await websocket.send_json(log_dict) + logger.reflect( + "Forwarded task log entry to WebSocket client", + extra={ + "task_id": task_id, + "level": log_dict.get("level"), + }, + ) - # Force a check for AWAITING_INPUT status immediately upon connection - # This ensures that if the task is already waiting when the user connects, they get the prompt. - task = task_manager.get_task(task_id) - if task and task.status == "AWAITING_INPUT" and task.input_request: - # Construct a synthetic log entry to trigger the frontend handler - # This is a bit of a hack but avoids changing the websocket protocol significantly - synthetic_log = { - "timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00", - "level": "INFO", - "message": "Task paused for user input (Connection Re-established)", - "context": {"input_request": task.input_request} - } - await websocket.send_json(synthetic_log) + if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message: + logger.reason( + "Observed terminal task log entry; delaying to preserve client visibility", + extra={"task_id": task_id, "message": log_entry.message}, + ) + await asyncio.sleep(2) - while True: - log_entry = await queue.get() - - # Apply server-side filtering - if not matches_filters(log_entry): - continue - - log_dict = log_entry.dict() - log_dict['timestamp'] = log_dict['timestamp'].isoformat() - await websocket.send_json(log_dict) - - # If task is finished, we could potentially close the connection - # but let's keep it open for a bit or until the client disconnects - if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message: - # Wait a bit to ensure client receives the last message - await asyncio.sleep(2) - # DO NOT BREAK here - allow client to keep connection open if they want to review logs - # or until they disconnect. Breaking closes the socket immediately. - # break - - except WebSocketDisconnect: - logger.info(f"WebSocket connection disconnected for task {task_id}") - except Exception as e: - logger.error(f"WebSocket error for task {task_id}: {e}") - finally: - task_manager.unsubscribe_logs(task_id, queue) + except WebSocketDisconnect: + logger.reason( + "WebSocket client disconnected from task log stream", + extra={"task_id": task_id}, + ) + except Exception as exc: + logger.explore( + "WebSocket log streaming encountered an unexpected failure", + extra={"task_id": task_id, "error": str(exc)}, + ) + raise + finally: + task_manager.unsubscribe_logs(task_id, queue) + logger.reflect( + "Released WebSocket log queue subscription", + extra={"task_id": task_id}, + ) # [/DEF:websocket_endpoint:Function] # [DEF:StaticFiles:Mount] diff --git a/backend/src/core/utils/superset_context_extractor.py b/backend/src/core/utils/superset_context_extractor.py new file mode 100644 index 00000000..f0991bbb --- /dev/null +++ b/backend/src/core/utils/superset_context_extractor.py @@ -0,0 +1,334 @@ +# [DEF:SupersetContextExtractor:Module] +# @COMPLEXITY: 4 +# @SEMANTICS: dataset_review, superset, link_parsing, context_recovery, partial_recovery +# @PURPOSE: Recover dataset and dashboard context from Superset links while preserving explicit partial-recovery markers. +# @LAYER: Infra +# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient:Class] +# @RELATION: [DEPENDS_ON] ->[ImportedFilter] +# @RELATION: [DEPENDS_ON] ->[TemplateVariable] +# @PRE: Superset link or dataset reference must be parseable enough to resolve an environment-scoped target resource. +# @POST: Returns the best available recovered context with explicit provenance and partial-recovery markers when necessary. +# @SIDE_EFFECT: Performs upstream Superset API reads. +# @INVARIANT: Partial recovery is surfaced explicitly and never misrepresented as fully confirmed context. + +from __future__ import annotations + +# [DEF:SupersetContextExtractor.imports:Block] +import json +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional +from urllib.parse import parse_qs, unquote, urlparse + +from src.core.config_models import Environment +from src.core.logger import belief_scope, logger +from src.core.superset_client import SupersetClient +# [/DEF:SupersetContextExtractor.imports:Block] + + +# [DEF:SupersetParsedContext:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Normalized output of Superset link parsing for session intake and recovery. +@dataclass +class SupersetParsedContext: + source_url: str + dataset_ref: str + dataset_id: Optional[int] = None + dashboard_id: Optional[int] = None + chart_id: Optional[int] = None + resource_type: str = "unknown" + query_state: Dict[str, Any] = field(default_factory=dict) + imported_filters: List[Dict[str, Any]] = field(default_factory=list) + unresolved_references: List[str] = field(default_factory=list) + partial_recovery: bool = False +# [/DEF:SupersetParsedContext:Class] + + +# [DEF:SupersetContextExtractor:Class] +# @COMPLEXITY: 4 +# @PURPOSE: Parse supported Superset URLs and recover canonical dataset/dashboard references for review-session intake. +# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient] +# @PRE: constructor receives a configured environment with a usable Superset base URL. +# @POST: extractor instance is ready to parse links against one Superset environment. +# @SIDE_EFFECT: downstream parse operations may call Superset APIs through SupersetClient. +class SupersetContextExtractor: + # [DEF:SupersetContextExtractor.__init__:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Bind extractor to one Superset environment and client instance. + def __init__(self, environment: Environment, client: Optional[SupersetClient] = None) -> None: + self.environment = environment + self.client = client or SupersetClient(environment) + # [/DEF:SupersetContextExtractor.__init__:Function] + + # [DEF:SupersetContextExtractor.parse_superset_link:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Extract candidate identifiers and query state from supported Superset URLs. + # @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient] + # @PRE: link is a non-empty Superset URL compatible with the configured environment. + # @POST: returns resolved dataset/dashboard context, preserving explicit partial-recovery state if some identifiers cannot be confirmed. + # @SIDE_EFFECT: may issue Superset API reads to resolve dataset references from dashboard or chart URLs. + # @DATA_CONTRACT: Input[link:str] -> Output[SupersetParsedContext] + def parse_superset_link(self, link: str) -> SupersetParsedContext: + with belief_scope("SupersetContextExtractor.parse_superset_link"): + normalized_link = str(link or "").strip() + if not normalized_link: + logger.explore("Rejected empty Superset link during intake") + raise ValueError("Superset link must be non-empty") + + parsed_url = urlparse(normalized_link) + if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc: + logger.explore( + "Superset link is not a parseable absolute URL", + extra={"link": normalized_link}, + ) + raise ValueError("Superset link must be an absolute http(s) URL") + + logger.reason( + "Parsing Superset link for dataset review intake", + extra={"path": parsed_url.path, "query": parsed_url.query}, + ) + + path_parts = [part for part in parsed_url.path.split("/") if part] + query_params = parse_qs(parsed_url.query, keep_blank_values=True) + query_state = self._decode_query_state(query_params) + + dataset_id = self._extract_numeric_identifier(path_parts, "dataset") + dashboard_id = self._extract_numeric_identifier(path_parts, "dashboard") + chart_id = self._extract_numeric_identifier(path_parts, "chart") + + resource_type = "unknown" + dataset_ref: Optional[str] = None + partial_recovery = False + unresolved_references: List[str] = [] + + if dataset_id is not None: + resource_type = "dataset" + dataset_ref = f"dataset:{dataset_id}" + logger.reason( + "Resolved direct dataset link", + extra={"dataset_id": dataset_id}, + ) + elif dashboard_id is not None: + resource_type = "dashboard" + logger.reason( + "Resolving dashboard-bound dataset from Superset", + extra={"dashboard_id": dashboard_id}, + ) + dashboard_detail = self.client.get_dashboard_detail(dashboard_id) + datasets = dashboard_detail.get("datasets") or [] + if datasets: + first_dataset = datasets[0] + resolved_dataset_id = first_dataset.get("id") + if resolved_dataset_id is not None: + dataset_id = int(resolved_dataset_id) + dataset_ref = f"dataset:{dataset_id}" + logger.reason( + "Recovered dataset reference from dashboard context", + extra={ + "dashboard_id": dashboard_id, + "dataset_id": dataset_id, + "dataset_count": len(datasets), + }, + ) + if len(datasets) > 1: + partial_recovery = True + unresolved_references.append("multiple_dashboard_datasets") + else: + partial_recovery = True + unresolved_references.append("dashboard_dataset_id_missing") + else: + partial_recovery = True + unresolved_references.append("dashboard_dataset_binding_missing") + elif chart_id is not None: + resource_type = "chart" + partial_recovery = True + unresolved_references.append("chart_dataset_binding_unresolved") + dataset_ref = f"chart:{chart_id}" + logger.reason( + "Accepted chart link with explicit partial recovery", + extra={"chart_id": chart_id}, + ) + else: + logger.explore( + "Unsupported Superset link shape encountered", + extra={"path": parsed_url.path}, + ) + raise ValueError("Unsupported Superset link shape") + + if dataset_id is not None: + try: + dataset_detail = self.client.get_dataset_detail(dataset_id) + table_name = str(dataset_detail.get("table_name") or "").strip() + schema_name = str(dataset_detail.get("schema") or "").strip() + if table_name: + dataset_ref = ( + f"{schema_name}.{table_name}" if schema_name else table_name + ) + logger.reason( + "Canonicalized dataset reference from dataset detail", + extra={"dataset_ref": dataset_ref, "dataset_id": dataset_id}, + ) + except Exception as exc: + partial_recovery = True + unresolved_references.append("dataset_detail_lookup_failed") + logger.explore( + "Dataset detail lookup failed during link parsing; keeping session usable", + extra={"dataset_id": dataset_id, "error": str(exc)}, + ) + + imported_filters = self._extract_imported_filters(query_state) + result = SupersetParsedContext( + source_url=normalized_link, + dataset_ref=dataset_ref or "unresolved", + dataset_id=dataset_id, + dashboard_id=dashboard_id, + chart_id=chart_id, + resource_type=resource_type, + query_state=query_state, + imported_filters=imported_filters, + unresolved_references=unresolved_references, + partial_recovery=partial_recovery, + ) + logger.reflect( + "Superset link parsing completed", + extra={ + "dataset_ref": result.dataset_ref, + "dataset_id": result.dataset_id, + "dashboard_id": result.dashboard_id, + "chart_id": result.chart_id, + "partial_recovery": result.partial_recovery, + "unresolved_references": result.unresolved_references, + "imported_filters": len(result.imported_filters), + }, + ) + return result + # [/DEF:SupersetContextExtractor.parse_superset_link:Function] + + # [DEF:SupersetContextExtractor.recover_imported_filters:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Build imported filter entries from URL state and Superset-side saved context. + def recover_imported_filters(self, parsed_context: SupersetParsedContext) -> List[Dict[str, Any]]: + return list(parsed_context.imported_filters) + # [/DEF:SupersetContextExtractor.recover_imported_filters:Function] + + # [DEF:SupersetContextExtractor.discover_template_variables:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Detect runtime variables and Jinja references from dataset query-bearing fields. + def discover_template_variables(self, dataset_payload: Dict[str, Any]) -> List[Dict[str, Any]]: + return [] + # [/DEF:SupersetContextExtractor.discover_template_variables:Function] + + # [DEF:SupersetContextExtractor.build_recovery_summary:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Summarize recovered, partial, and unresolved context for session state and UX. + def build_recovery_summary(self, parsed_context: SupersetParsedContext) -> Dict[str, Any]: + return { + "dataset_ref": parsed_context.dataset_ref, + "dataset_id": parsed_context.dataset_id, + "dashboard_id": parsed_context.dashboard_id, + "chart_id": parsed_context.chart_id, + "partial_recovery": parsed_context.partial_recovery, + "unresolved_references": list(parsed_context.unresolved_references), + "imported_filter_count": len(parsed_context.imported_filters), + } + # [/DEF:SupersetContextExtractor.build_recovery_summary:Function] + + # [DEF:SupersetContextExtractor._extract_numeric_identifier:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Extract a numeric identifier from a REST-like Superset URL path. + def _extract_numeric_identifier(self, path_parts: List[str], resource_name: str) -> Optional[int]: + if resource_name not in path_parts: + return None + try: + resource_index = path_parts.index(resource_name) + except ValueError: + return None + + if resource_index + 1 >= len(path_parts): + return None + + candidate = str(path_parts[resource_index + 1]).strip() + if not candidate.isdigit(): + return None + return int(candidate) + # [/DEF:SupersetContextExtractor._extract_numeric_identifier:Function] + + # [DEF:SupersetContextExtractor._decode_query_state:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Decode query-string structures used by Superset URL state transport. + def _decode_query_state(self, query_params: Dict[str, List[str]]) -> Dict[str, Any]: + query_state: Dict[str, Any] = {} + for key, values in query_params.items(): + if not values: + continue + raw_value = values[-1] + decoded_value = unquote(raw_value) + if key in {"native_filters", "native_filters_key", "form_data", "q"}: + try: + query_state[key] = json.loads(decoded_value) + continue + except Exception: + logger.explore( + "Failed to decode structured Superset query state; preserving raw value", + extra={"key": key}, + ) + query_state[key] = decoded_value + return query_state + # [/DEF:SupersetContextExtractor._decode_query_state:Function] + + # [DEF:SupersetContextExtractor._extract_imported_filters:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Normalize imported filters from decoded query state without fabricating missing values. + def _extract_imported_filters(self, query_state: Dict[str, Any]) -> List[Dict[str, Any]]: + imported_filters: List[Dict[str, Any]] = [] + + native_filters_payload = query_state.get("native_filters") + if isinstance(native_filters_payload, list): + for index, item in enumerate(native_filters_payload): + if not isinstance(item, dict): + continue + filter_name = ( + item.get("filter_name") + or item.get("column") + or item.get("name") + or f"native_filter_{index}" + ) + imported_filters.append( + { + "filter_name": str(filter_name), + "raw_value": item.get("value"), + "display_name": item.get("label") or item.get("name"), + "source": "superset_url", + "recovery_status": "recovered" + if item.get("value") is not None + else "partial", + "requires_confirmation": item.get("value") is None, + "notes": "Recovered from Superset native filter URL state", + } + ) + + form_data_payload = query_state.get("form_data") + if isinstance(form_data_payload, dict): + extra_filters = form_data_payload.get("extra_filters") or [] + for index, item in enumerate(extra_filters): + if not isinstance(item, dict): + continue + filter_name = item.get("col") or item.get("column") or f"extra_filter_{index}" + imported_filters.append( + { + "filter_name": str(filter_name), + "raw_value": item.get("val"), + "display_name": item.get("label"), + "source": "superset_url", + "recovery_status": "recovered" + if item.get("val") is not None + else "partial", + "requires_confirmation": item.get("val") is None, + "notes": "Recovered from Superset form_data extra_filters", + } + ) + + return imported_filters + # [/DEF:SupersetContextExtractor._extract_imported_filters:Function] +# [/DEF:SupersetContextExtractor:Class] + +# [/DEF:SupersetContextExtractor:Module] \ No newline at end of file diff --git a/backend/src/models/dataset_review.py b/backend/src/models/dataset_review.py index 055bbdbe..5504d059 100644 --- a/backend/src/models/dataset_review.py +++ b/backend/src/models/dataset_review.py @@ -5,7 +5,6 @@ # @SEMANTICS: dataset_review, session, profile, findings, semantics, clarification, execution, sqlalchemy # @PURPOSE: SQLAlchemy models for the dataset review orchestration flow. # @LAYER: Domain -# @RELATION: INHERITS_FROM -> [Base] # @RELATION: DEPENDS_ON -> [AuthModels] # @RELATION: DEPENDS_ON -> [MappingModels] # diff --git a/backend/src/schemas/dataset_review.py b/backend/src/schemas/dataset_review.py index 4bb39f19..ec83aca5 100644 --- a/backend/src/schemas/dataset_review.py +++ b/backend/src/schemas/dataset_review.py @@ -4,7 +4,7 @@ # @SEMANTICS: dataset_review, schemas, pydantic, session, profile, findings # @PURPOSE: Defines API schemas for the dataset review orchestration flow. # @LAYER: API -# @RELATION: DEPENDS_ON -> pydantic +# @RELATION: DEPENDS_ON -> [DatasetReviewModels] # [SECTION: IMPORTS] from datetime import datetime diff --git a/backend/src/services/dataset_review/orchestrator.py b/backend/src/services/dataset_review/orchestrator.py new file mode 100644 index 00000000..54abcffc --- /dev/null +++ b/backend/src/services/dataset_review/orchestrator.py @@ -0,0 +1,386 @@ +# [DEF:DatasetReviewOrchestrator:Module] +# @COMPLEXITY: 5 +# @SEMANTICS: dataset_review, orchestration, session_lifecycle, intake, recovery +# @PURPOSE: Coordinate dataset review session startup and lifecycle-safe intake recovery for one authenticated user. +# @LAYER: Domain +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository] +# @RELATION: [DEPENDS_ON] ->[SemanticSourceResolver] +# @RELATION: [DEPENDS_ON] ->[ClarificationEngine] +# @RELATION: [DEPENDS_ON] ->[SupersetContextExtractor] +# @RELATION: [DEPENDS_ON] ->[SupersetCompilationAdapter] +# @RELATION: [DEPENDS_ON] ->[TaskManager] +# @PRE: session mutations must execute inside a persisted session boundary scoped to one authenticated user. +# @POST: state transitions are persisted atomically and emit observable progress for long-running steps. +# @SIDE_EFFECT: creates task records, updates session aggregates, triggers upstream Superset calls, persists audit artifacts. +# @DATA_CONTRACT: Input[SessionCommand] -> Output[DatasetReviewSession | CompiledPreview | DatasetRunContext] +# @INVARIANT: Launch is blocked unless a current session has no open blocking findings, all launch-sensitive mappings are approved, and a non-stale Superset-generated compiled preview matches the current input fingerprint. + +from __future__ import annotations + +# [DEF:DatasetReviewOrchestrator.imports:Block] +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +from src.core.config_manager import ConfigManager +from src.core.logger import belief_scope, logger +from src.core.task_manager import TaskManager +from src.core.utils.superset_context_extractor import ( + SupersetContextExtractor, + SupersetParsedContext, +) +from src.models.auth import User +from src.models.dataset_review import ( + BusinessSummarySource, + ConfidenceState, + DatasetProfile, + DatasetReviewSession, + FindingArea, + FindingSeverity, + RecommendedAction, + ReadinessState, + ResolutionState, + SessionPhase, + SessionStatus, + ValidationFinding, +) +from src.services.dataset_review.repositories.session_repository import ( + DatasetReviewSessionRepository, +) +from src.services.dataset_review.semantic_resolver import SemanticSourceResolver +# [/DEF:DatasetReviewOrchestrator.imports:Block] + + +# [DEF:StartSessionCommand:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Typed input contract for starting a dataset review session. +@dataclass +class StartSessionCommand: + user: User + environment_id: str + source_kind: str + source_input: str +# [/DEF:StartSessionCommand:Class] + + +# [DEF:StartSessionResult:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Session-start result carrying the persisted session and intake recovery metadata. +@dataclass +class StartSessionResult: + session: DatasetReviewSession + parsed_context: Optional[SupersetParsedContext] = None + findings: List[ValidationFinding] = field(default_factory=list) +# [/DEF:StartSessionResult:Class] + + +# [DEF:DatasetReviewOrchestrator:Class] +# @COMPLEXITY: 5 +# @PURPOSE: Coordinate safe session startup while preserving cross-user isolation and explicit partial recovery. +# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository] +# @RELATION: [DEPENDS_ON] ->[SupersetContextExtractor] +# @RELATION: [DEPENDS_ON] ->[TaskManager] +# @RELATION: [DEPENDS_ON] ->[SessionRepo] +# @RELATION: [DEPENDS_ON] ->[ConfigManager] +# @PRE: constructor dependencies are valid and tied to the current request/task scope. +# @POST: orchestrator instance can execute session-scoped mutations for one authenticated user. +# @SIDE_EFFECT: downstream operations may persist session/profile/finding state and enqueue background tasks. +# @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult] +# @INVARIANT: session ownership is preserved on every mutation and recovery remains explicit when partial. +class DatasetReviewOrchestrator: + # [DEF:DatasetReviewOrchestrator.__init__:Function] + # @COMPLEXITY: 3 + # @PURPOSE: Bind repository, config, and task dependencies required by the orchestration boundary. + # @RELATION: [DEPENDS_ON] ->[SessionRepo] + # @RELATION: [DEPENDS_ON] ->[ConfigManager] + def __init__( + self, + repository: DatasetReviewSessionRepository, + config_manager: ConfigManager, + task_manager: Optional[TaskManager] = None, + semantic_resolver: Optional[SemanticSourceResolver] = None, + ) -> None: + self.repository = repository + self.config_manager = config_manager + self.task_manager = task_manager + self.semantic_resolver = semantic_resolver or SemanticSourceResolver() + # [/DEF:DatasetReviewOrchestrator.__init__:Function] + + # [DEF:DatasetReviewOrchestrator.start_session:Function] + # @COMPLEXITY: 5 + # @PURPOSE: Initialize a new session from a Superset link or dataset selection and trigger context recovery. + # @RELATION: [DEPENDS_ON] ->[SessionRepo] + # @RELATION: [CALLS] ->[SupersetContextExtractor.parse_superset_link] + # @RELATION: [CALLS] ->[create_task] + # @PRE: source input is non-empty and environment is accessible. + # @POST: session exists in persisted storage with intake/recovery state and task linkage when async work is required. + # @SIDE_EFFECT: persists session and may enqueue recovery task. + # @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult] + # @INVARIANT: no cross-user session leakage occurs; session and follow-up task remain owned by the authenticated user. + def start_session(self, command: StartSessionCommand) -> StartSessionResult: + with belief_scope("DatasetReviewOrchestrator.start_session"): + normalized_source_kind = str(command.source_kind or "").strip() + normalized_source_input = str(command.source_input or "").strip() + normalized_environment_id = str(command.environment_id or "").strip() + + if not normalized_source_input: + logger.explore("Blocked dataset review session start due to empty source input") + raise ValueError("source_input must be non-empty") + + if normalized_source_kind not in {"superset_link", "dataset_selection"}: + logger.explore( + "Blocked dataset review session start due to unsupported source kind", + extra={"source_kind": normalized_source_kind}, + ) + raise ValueError("source_kind must be 'superset_link' or 'dataset_selection'") + + environment = self.config_manager.get_environment(normalized_environment_id) + if environment is None: + logger.explore( + "Blocked dataset review session start because environment was not found", + extra={"environment_id": normalized_environment_id}, + ) + raise ValueError("Environment not found") + + logger.reason( + "Starting dataset review session", + extra={ + "user_id": command.user.id, + "environment_id": normalized_environment_id, + "source_kind": normalized_source_kind, + }, + ) + + parsed_context: Optional[SupersetParsedContext] = None + findings: List[ValidationFinding] = [] + dataset_ref = normalized_source_input + dataset_id: Optional[int] = None + dashboard_id: Optional[int] = None + readiness_state = ReadinessState.IMPORTING + recommended_action = RecommendedAction.REVIEW_DOCUMENTATION + current_phase = SessionPhase.RECOVERY + + if normalized_source_kind == "superset_link": + extractor = SupersetContextExtractor(environment) + parsed_context = extractor.parse_superset_link(normalized_source_input) + dataset_ref = parsed_context.dataset_ref + dataset_id = parsed_context.dataset_id + dashboard_id = parsed_context.dashboard_id + + if parsed_context.partial_recovery: + readiness_state = ReadinessState.RECOVERY_REQUIRED + recommended_action = RecommendedAction.REVIEW_DOCUMENTATION + findings.extend(self._build_partial_recovery_findings(parsed_context)) + else: + readiness_state = ReadinessState.REVIEW_READY + else: + dataset_ref, dataset_id = self._parse_dataset_selection(normalized_source_input) + readiness_state = ReadinessState.REVIEW_READY + current_phase = SessionPhase.REVIEW + + session = DatasetReviewSession( + user_id=command.user.id, + environment_id=normalized_environment_id, + source_kind=normalized_source_kind, + source_input=normalized_source_input, + dataset_ref=dataset_ref, + dataset_id=dataset_id, + dashboard_id=dashboard_id, + readiness_state=readiness_state, + recommended_action=recommended_action, + status=SessionStatus.ACTIVE, + current_phase=current_phase, + ) + persisted_session = self.repository.create_session(session) + + profile = self._build_initial_profile( + session_id=persisted_session.session_id, + parsed_context=parsed_context, + dataset_ref=dataset_ref, + ) + persisted_session = self.repository.save_profile_and_findings( + persisted_session.session_id, + command.user.id, + profile, + findings, + ) + + active_task_id = self._enqueue_recovery_task( + command=command, + session=persisted_session, + parsed_context=parsed_context, + ) + if active_task_id: + persisted_session.active_task_id = active_task_id + self.repository.db.commit() + self.repository.db.refresh(persisted_session) + logger.reason( + "Linked recovery task to started dataset review session", + extra={"session_id": persisted_session.session_id, "task_id": active_task_id}, + ) + + logger.reflect( + "Dataset review session start completed", + extra={ + "session_id": persisted_session.session_id, + "dataset_ref": persisted_session.dataset_ref, + "dataset_id": persisted_session.dataset_id, + "dashboard_id": persisted_session.dashboard_id, + "readiness_state": persisted_session.readiness_state.value, + "active_task_id": persisted_session.active_task_id, + "finding_count": len(findings), + }, + ) + return StartSessionResult( + session=persisted_session, + parsed_context=parsed_context, + findings=findings, + ) + # [/DEF:DatasetReviewOrchestrator.start_session:Function] + + # [DEF:DatasetReviewOrchestrator._parse_dataset_selection:Function] + # @COMPLEXITY: 3 + # @PURPOSE: Normalize dataset-selection payload into canonical session references. + # @RELATION: [DEPENDS_ON] ->[DatasetReviewSession] + def _parse_dataset_selection(self, source_input: str) -> tuple[str, Optional[int]]: + normalized = str(source_input or "").strip() + if not normalized: + raise ValueError("dataset selection input must be non-empty") + + if normalized.isdigit(): + dataset_id = int(normalized) + return f"dataset:{dataset_id}", dataset_id + + if normalized.startswith("dataset:"): + suffix = normalized.split(":", 1)[1].strip() + if suffix.isdigit(): + return normalized, int(suffix) + return normalized, None + + return normalized, None + # [/DEF:DatasetReviewOrchestrator._parse_dataset_selection:Function] + + # [DEF:DatasetReviewOrchestrator._build_initial_profile:Function] + # @COMPLEXITY: 3 + # @PURPOSE: Create the first profile snapshot so exports and detail views remain usable immediately after intake. + # @RELATION: [DEPENDS_ON] ->[DatasetProfile] + def _build_initial_profile( + self, + session_id: str, + parsed_context: Optional[SupersetParsedContext], + dataset_ref: str, + ) -> DatasetProfile: + dataset_name = dataset_ref.split(".")[-1] if dataset_ref else "Unresolved dataset" + business_summary = ( + f"Review session initialized for {dataset_ref}." + if dataset_ref + else "Review session initialized with unresolved dataset context." + ) + confidence_state = ( + ConfidenceState.MIXED + if parsed_context and parsed_context.partial_recovery + else ConfidenceState.MOSTLY_CONFIRMED + ) + return DatasetProfile( + session_id=session_id, + dataset_name=dataset_name or "Unresolved dataset", + schema_name=dataset_ref.split(".")[0] if "." in dataset_ref else None, + business_summary=business_summary, + business_summary_source=BusinessSummarySource.IMPORTED, + description="Initial review profile created from source intake.", + dataset_type="unknown", + is_sqllab_view=False, + completeness_score=0.25, + confidence_state=confidence_state, + has_blocking_findings=False, + has_warning_findings=bool(parsed_context and parsed_context.partial_recovery), + manual_summary_locked=False, + ) + # [/DEF:DatasetReviewOrchestrator._build_initial_profile:Function] + + # [DEF:DatasetReviewOrchestrator._build_partial_recovery_findings:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Project partial Superset intake recovery into explicit findings without blocking session usability. + # @RELATION: [DEPENDS_ON] ->[ValidationFinding] + # @PRE: parsed_context.partial_recovery is true. + # @POST: returns warning-level findings that preserve usable but incomplete state. + # @SIDE_EFFECT: none beyond structured finding creation. + # @DATA_CONTRACT: Input[SupersetParsedContext] -> Output[List[ValidationFinding]] + def _build_partial_recovery_findings( + self, + parsed_context: SupersetParsedContext, + ) -> List[ValidationFinding]: + findings: List[ValidationFinding] = [] + for unresolved_ref in parsed_context.unresolved_references: + findings.append( + ValidationFinding( + area=FindingArea.SOURCE_INTAKE, + severity=FindingSeverity.WARNING, + code="PARTIAL_SUPERSET_RECOVERY", + title="Superset context recovered partially", + message=( + "Session remains usable, but some Superset context requires review: " + f"{unresolved_ref.replace('_', ' ')}." + ), + resolution_state=ResolutionState.OPEN, + caused_by_ref=unresolved_ref, + ) + ) + return findings + # [/DEF:DatasetReviewOrchestrator._build_partial_recovery_findings:Function] + + # [DEF:DatasetReviewOrchestrator._enqueue_recovery_task:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Link session start to observable async recovery when task infrastructure is available. + # @RELATION: [CALLS] ->[create_task] + # @PRE: session is already persisted. + # @POST: returns task identifier when a task could be enqueued, otherwise None. + # @SIDE_EFFECT: may create one background task for progressive recovery. + # @DATA_CONTRACT: Input[StartSessionCommand,DatasetReviewSession,SupersetParsedContext|None] -> Output[task_id:str|None] + def _enqueue_recovery_task( + self, + command: StartSessionCommand, + session: DatasetReviewSession, + parsed_context: Optional[SupersetParsedContext], + ) -> Optional[str]: + if self.task_manager is None: + logger.reason( + "Dataset review session started without task manager; continuing synchronously", + extra={"session_id": session.session_id}, + ) + return None + + task_params: Dict[str, Any] = { + "session_id": session.session_id, + "user_id": command.user.id, + "environment_id": session.environment_id, + "source_kind": session.source_kind, + "source_input": session.source_input, + "dataset_ref": session.dataset_ref, + "dataset_id": session.dataset_id, + "dashboard_id": session.dashboard_id, + "partial_recovery": bool(parsed_context and parsed_context.partial_recovery), + } + + create_task = getattr(self.task_manager, "create_task", None) + if create_task is None: + logger.explore("Task manager has no create_task method; skipping recovery enqueue") + return None + + try: + task_object = create_task( + plugin_id="dataset-review-recovery", + params=task_params, + ) + except TypeError: + logger.explore( + "Recovery task enqueue skipped because task manager create_task contract is incompatible", + extra={"session_id": session.session_id}, + ) + return None + + task_id = getattr(task_object, "id", None) + return str(task_id) if task_id else None + # [/DEF:DatasetReviewOrchestrator._enqueue_recovery_task:Function] +# [/DEF:DatasetReviewOrchestrator:Class] + +# [/DEF:DatasetReviewOrchestrator:Module] \ No newline at end of file diff --git a/backend/src/services/dataset_review/repositories/session_repository.py b/backend/src/services/dataset_review/repositories/session_repository.py index 74b7af14..706afc6c 100644 --- a/backend/src/services/dataset_review/repositories/session_repository.py +++ b/backend/src/services/dataset_review/repositories/session_repository.py @@ -8,6 +8,9 @@ # @RELATION: [DEPENDS_ON] -> [CompiledPreview] # @PRE: repository operations execute within authenticated request or task scope. # @POST: session aggregate reads are structurally consistent and writes preserve ownership and version semantics. +# @SIDE_EFFECT: reads and writes SQLAlchemy-backed session aggregates. +# @DATA_CONTRACT: Input[SessionMutation] -> Output[PersistedSessionAggregate] +# @INVARIANT: answers, mapping approvals, preview artifacts, and launch snapshots are never attributed to the wrong user or session. from typing import Optional, List from sqlalchemy import or_ @@ -22,27 +25,51 @@ from src.models.dataset_review import ( ) from src.core.logger import belief_scope +# [DEF:SessionRepo:Class] +# @COMPLEXITY: 4 +# @PURPOSE: Enforce ownership-scoped persistence and retrieval for dataset review session aggregates. +# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] +# @RELATION: [DEPENDS_ON] -> [DatasetProfile] +# @RELATION: [DEPENDS_ON] -> [ValidationFinding] +# @RELATION: [DEPENDS_ON] -> [CompiledPreview] +# @PRE: constructor receives a live SQLAlchemy session and callers provide authenticated user scope for guarded reads and writes. +# @POST: repository methods return ownership-scoped aggregates or persisted child records without changing domain meaning. +# @SIDE_EFFECT: mutates and queries the persistence layer through the injected database session. +# @DATA_CONTRACT: Input[OwnedSessionQuery|SessionMutation] -> Output[PersistedSessionAggregate|PersistedChildRecord] class DatasetReviewSessionRepository: """ @PURPOSE: Persist and retrieve dataset review session aggregates. @INVARIANT: ownership_scope -> All operations must respect the session owner's user_id. """ + + # [DEF:init_repo:Function] def __init__(self, db: Session): self.db = db + # [/DEF:init_repo:Function] + # [DEF:create_sess:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Persist an initial dataset review session shell. + # @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] + # @PRE: session is a new aggregate root bound to the current ownership scope. + # @POST: session is committed, refreshed, and returned with persisted identifiers. + # @SIDE_EFFECT: inserts a session row and commits the active transaction. + # @DATA_CONTRACT: Input[DatasetReviewSession] -> Output[DatasetReviewSession] def create_session(self, session: DatasetReviewSession) -> DatasetReviewSession: - """ - @PURPOSE: Persist initial session shell. - """ with belief_scope("DatasetReviewSessionRepository.create_session"): self.db.add(session) self.db.commit() self.db.refresh(session) return session + # [/DEF:create_sess:Function] + # [DEF:load_detail:Function] + # @COMPLEXITY: 3 + # @PURPOSE: Return the full session aggregate for API and frontend resume flows. + # @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] + # @RELATION: [DEPENDS_ON] -> [SessionCollaborator] def load_session_detail(self, session_id: str, user_id: str) -> Optional[DatasetReviewSession]: """ - @PURPOSE: Return the full session aggregate for API/frontend use. @PRE: user_id must match session owner or authorized collaborator. """ with belief_scope("DatasetReviewSessionRepository.load_session_detail"): @@ -70,17 +97,25 @@ class DatasetReviewSessionRepository: ) )\ .first() + # [/DEF:load_detail:Function] + # [DEF:save_prof_find:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Persist profile state and replace validation findings for an owned session in one transaction. + # @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] + # @RELATION: [DEPENDS_ON] -> [DatasetProfile] + # @RELATION: [DEPENDS_ON] -> [ValidationFinding] + # @PRE: session_id belongs to user_id and the supplied profile/findings belong to the same aggregate scope. + # @POST: stored profile matches the current session and findings are replaced by the supplied collection. + # @SIDE_EFFECT: updates profile rows, deletes stale findings, inserts current findings, and commits the transaction. + # @DATA_CONTRACT: Input[ProfileAndFindingsMutation] -> Output[DatasetReviewSession] def save_profile_and_findings(self, session_id: str, user_id: str, profile: DatasetProfile, findings: List[ValidationFinding]) -> DatasetReviewSession: - """ - @PURPOSE: Persist profile and validation state together. - """ with belief_scope("DatasetReviewSessionRepository.save_profile_and_findings"): session = self.db.query(DatasetReviewSession).filter( DatasetReviewSession.session_id == session_id, DatasetReviewSession.user_id == user_id ).first() - + if not session: raise ValueError("Session not found or access denied") @@ -90,24 +125,31 @@ class DatasetReviewSessionRepository: if existing_profile: profile.profile_id = existing_profile.profile_id self.db.merge(profile) - + # Remove old findings for this session to avoid stale data self.db.query(ValidationFinding).filter( ValidationFinding.session_id == session_id ).delete() - + # Add new findings for finding in findings: finding.session_id = session_id self.db.add(finding) - + self.db.commit() return self.load_session_detail(session_id, user_id) + # [/DEF:save_prof_find:Function] + # [DEF:save_prev:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Persist a preview snapshot and mark prior session previews stale. + # @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] + # @RELATION: [DEPENDS_ON] -> [CompiledPreview] + # @PRE: session_id belongs to user_id and preview is prepared for the same session aggregate. + # @POST: preview is persisted and the session points to the latest preview identifier. + # @SIDE_EFFECT: updates prior preview statuses, inserts a preview row, mutates the parent session, and commits. + # @DATA_CONTRACT: Input[PreviewMutation] -> Output[CompiledPreview] def save_preview(self, session_id: str, user_id: str, preview: CompiledPreview) -> CompiledPreview: - """ - @PURPOSE: Persist compiled preview attempt and mark older fingerprints stale. - """ with belief_scope("DatasetReviewSessionRepository.save_preview"): session = self.db.query(DatasetReviewSession).filter( DatasetReviewSession.session_id == session_id, @@ -125,15 +167,22 @@ class DatasetReviewSessionRepository: self.db.add(preview) self.db.flush() session.last_preview_id = preview.preview_id - + self.db.commit() self.db.refresh(preview) return preview + # [/DEF:save_prev:Function] + # [DEF:save_run_ctx:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Persist an immutable launch audit snapshot for an owned session. + # @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] + # @RELATION: [DEPENDS_ON] -> [DatasetRunContext] + # @PRE: session_id belongs to user_id and run_context targets the same aggregate. + # @POST: run context is persisted and linked as the latest launch snapshot for the session. + # @SIDE_EFFECT: inserts a run-context row, mutates the parent session pointer, and commits. + # @DATA_CONTRACT: Input[RunContextMutation] -> Output[DatasetRunContext] def save_run_context(self, session_id: str, user_id: str, run_context: DatasetRunContext) -> DatasetRunContext: - """ - @PURPOSE: Persist immutable launch audit snapshot. - """ with belief_scope("DatasetReviewSessionRepository.save_run_context"): session = self.db.query(DatasetReviewSession).filter( DatasetReviewSession.session_id == session_id, @@ -146,18 +195,22 @@ class DatasetReviewSessionRepository: self.db.add(run_context) self.db.flush() session.last_run_context_id = run_context.run_context_id - + self.db.commit() self.db.refresh(run_context) return run_context + # [/DEF:save_run_ctx:Function] + # [DEF:list_user_sess:Function] + # @COMPLEXITY: 3 + # @PURPOSE: List review sessions owned by a specific user ordered by most recent update. + # @RELATION: [DEPENDS_ON] -> [DatasetReviewSession] def list_sessions_for_user(self, user_id: str) -> List[DatasetReviewSession]: - """ - @PURPOSE: List all review sessions owned by a user. - """ with belief_scope("DatasetReviewSessionRepository.list_sessions_for_user"): return self.db.query(DatasetReviewSession).filter( DatasetReviewSession.user_id == user_id ).order_by(DatasetReviewSession.updated_at.desc()).all() + # [/DEF:list_user_sess:Function] +# [/DEF:SessionRepo:Class] # [/DEF:DatasetReviewSessionRepository:Module] \ No newline at end of file diff --git a/backend/src/services/dataset_review/semantic_resolver.py b/backend/src/services/dataset_review/semantic_resolver.py new file mode 100644 index 00000000..9bc06287 --- /dev/null +++ b/backend/src/services/dataset_review/semantic_resolver.py @@ -0,0 +1,342 @@ +# [DEF:SemanticSourceResolver:Module] +# @COMPLEXITY: 4 +# @SEMANTICS: dataset_review, semantic_resolution, dictionary, trusted_sources, ranking +# @PURPOSE: Resolve and rank semantic candidates from trusted dictionary-like sources before any inferred fallback. +# @LAYER: Domain +# @RELATION: [DEPENDS_ON] ->[LLMProviderService] +# @RELATION: [DEPENDS_ON] ->[SemanticSource] +# @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] +# @RELATION: [DEPENDS_ON] ->[SemanticCandidate] +# @PRE: selected source and target field set must be known. +# @POST: candidate ranking follows the configured confidence hierarchy and unresolved fuzzy matches remain reviewable. +# @SIDE_EFFECT: may create conflict findings and semantic candidate records. +# @INVARIANT: Manual overrides are never silently replaced by imported, inferred, or AI-generated values. + +from __future__ import annotations + +# [DEF:SemanticSourceResolver.imports:Block] +from dataclasses import dataclass, field +from difflib import SequenceMatcher +from typing import Any, Dict, Iterable, List, Mapping, Optional + +from src.core.logger import belief_scope, logger +from src.models.dataset_review import ( + CandidateMatchType, + CandidateStatus, + FieldProvenance, +) +# [/DEF:SemanticSourceResolver.imports:Block] + + +# [DEF:DictionaryResolutionResult:Class] +# @COMPLEXITY: 2 +# @PURPOSE: Carries field-level dictionary resolution output with explicit review and partial-recovery state. +@dataclass +class DictionaryResolutionResult: + source_ref: str + resolved_fields: List[Dict[str, Any]] = field(default_factory=list) + unresolved_fields: List[str] = field(default_factory=list) + partial_recovery: bool = False +# [/DEF:DictionaryResolutionResult:Class] + + +# [DEF:SemanticSourceResolver:Class] +# @COMPLEXITY: 4 +# @PURPOSE: Resolve semantic candidates from trusted sources while preserving manual locks and confidence ordering. +# @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] +# @RELATION: [DEPENDS_ON] ->[SemanticCandidate] +# @PRE: source payload and target field collection are provided by the caller. +# @POST: result contains confidence-ranked candidates and does not overwrite manual locks implicitly. +# @SIDE_EFFECT: emits semantic trace logs for ranking and fallback decisions. +class SemanticSourceResolver: + # [DEF:SemanticSourceResolver.resolve_from_file:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Normalize uploaded semantic file records into field-level candidates. + def resolve_from_file(self, source_payload: Mapping[str, Any], fields: Iterable[Mapping[str, Any]]) -> DictionaryResolutionResult: + return DictionaryResolutionResult(source_ref=str(source_payload.get("source_ref") or "uploaded_file")) + # [/DEF:SemanticSourceResolver.resolve_from_file:Function] + + # [DEF:SemanticSourceResolver.resolve_from_dictionary:Function] + # @COMPLEXITY: 4 + # @PURPOSE: Resolve candidates from connected tabular dictionary sources. + # @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry] + # @RELATION: [DEPENDS_ON] ->[SemanticCandidate] + # @PRE: dictionary source exists and fields contain stable field_name values. + # @POST: returns confidence-ranked candidates where exact dictionary matches outrank fuzzy matches and unresolved fields stay explicit. + # @SIDE_EFFECT: emits belief-state logs describing trusted-match and partial-recovery outcomes. + # @DATA_CONTRACT: Input[source_payload:Mapping,fields:Iterable] -> Output[DictionaryResolutionResult] + def resolve_from_dictionary( + self, + source_payload: Mapping[str, Any], + fields: Iterable[Mapping[str, Any]], + ) -> DictionaryResolutionResult: + with belief_scope("SemanticSourceResolver.resolve_from_dictionary"): + source_ref = str(source_payload.get("source_ref") or "").strip() + dictionary_rows = source_payload.get("rows") + + if not source_ref: + logger.explore("Dictionary semantic source is missing source_ref") + raise ValueError("Dictionary semantic source must include source_ref") + + if not isinstance(dictionary_rows, list) or not dictionary_rows: + logger.explore( + "Dictionary semantic source has no usable rows", + extra={"source_ref": source_ref}, + ) + raise ValueError("Dictionary semantic source must include non-empty rows") + + logger.reason( + "Resolving semantics from trusted dictionary source", + extra={"source_ref": source_ref, "row_count": len(dictionary_rows)}, + ) + + normalized_rows = [self._normalize_dictionary_row(row) for row in dictionary_rows if isinstance(row, Mapping)] + row_index = { + row["field_key"]: row + for row in normalized_rows + if row.get("field_key") + } + + resolved_fields: List[Dict[str, Any]] = [] + unresolved_fields: List[str] = [] + + for raw_field in fields: + field_name = str(raw_field.get("field_name") or "").strip() + if not field_name: + continue + + is_locked = bool(raw_field.get("is_locked")) + if is_locked: + logger.reason( + "Preserving manual lock during dictionary resolution", + extra={"field_name": field_name}, + ) + resolved_fields.append( + { + "field_name": field_name, + "applied_candidate": None, + "candidates": [], + "provenance": FieldProvenance.MANUAL_OVERRIDE.value, + "needs_review": False, + "has_conflict": False, + "is_locked": True, + "status": "preserved_manual", + } + ) + continue + + exact_match = row_index.get(self._normalize_key(field_name)) + candidates: List[Dict[str, Any]] = [] + + if exact_match is not None: + logger.reason( + "Resolved exact dictionary match", + extra={"field_name": field_name, "source_ref": source_ref}, + ) + candidates.append( + self._build_candidate_payload( + rank=1, + match_type=CandidateMatchType.EXACT, + confidence_score=1.0, + row=exact_match, + ) + ) + else: + fuzzy_matches = self._find_fuzzy_matches(field_name, normalized_rows) + for rank_offset, fuzzy_match in enumerate(fuzzy_matches, start=1): + candidates.append( + self._build_candidate_payload( + rank=rank_offset, + match_type=CandidateMatchType.FUZZY, + confidence_score=float(fuzzy_match["score"]), + row=fuzzy_match["row"], + ) + ) + + if not candidates: + unresolved_fields.append(field_name) + resolved_fields.append( + { + "field_name": field_name, + "applied_candidate": None, + "candidates": [], + "provenance": FieldProvenance.UNRESOLVED.value, + "needs_review": True, + "has_conflict": False, + "is_locked": False, + "status": "unresolved", + } + ) + logger.explore( + "No trusted dictionary match found for field", + extra={"field_name": field_name, "source_ref": source_ref}, + ) + continue + + ranked_candidates = self.rank_candidates(candidates) + applied_candidate = ranked_candidates[0] + has_conflict = len(ranked_candidates) > 1 + provenance = ( + FieldProvenance.DICTIONARY_EXACT.value + if applied_candidate["match_type"] == CandidateMatchType.EXACT.value + else FieldProvenance.FUZZY_INFERRED.value + ) + needs_review = applied_candidate["match_type"] != CandidateMatchType.EXACT.value + + resolved_fields.append( + { + "field_name": field_name, + "applied_candidate": applied_candidate, + "candidates": ranked_candidates, + "provenance": provenance, + "needs_review": needs_review, + "has_conflict": has_conflict, + "is_locked": False, + "status": "resolved", + } + ) + + result = DictionaryResolutionResult( + source_ref=source_ref, + resolved_fields=resolved_fields, + unresolved_fields=unresolved_fields, + partial_recovery=bool(unresolved_fields), + ) + logger.reflect( + "Dictionary resolution completed", + extra={ + "source_ref": source_ref, + "resolved_fields": len(resolved_fields), + "unresolved_fields": len(unresolved_fields), + "partial_recovery": result.partial_recovery, + }, + ) + return result + # [/DEF:SemanticSourceResolver.resolve_from_dictionary:Function] + + # [DEF:SemanticSourceResolver.resolve_from_reference_dataset:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Reuse semantic metadata from trusted Superset datasets. + def resolve_from_reference_dataset( + self, + source_payload: Mapping[str, Any], + fields: Iterable[Mapping[str, Any]], + ) -> DictionaryResolutionResult: + return DictionaryResolutionResult(source_ref=str(source_payload.get("source_ref") or "reference_dataset")) + # [/DEF:SemanticSourceResolver.resolve_from_reference_dataset:Function] + + # [DEF:SemanticSourceResolver.rank_candidates:Function] + # @COMPLEXITY: 3 + # @PURPOSE: Apply confidence ordering and determine best candidate per field. + # @RELATION: [DEPENDS_ON] ->[SemanticCandidate] + def rank_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + ranked = sorted( + candidates, + key=lambda candidate: ( + self._match_priority(candidate.get("match_type")), + -float(candidate.get("confidence_score", 0.0)), + int(candidate.get("candidate_rank", 999)), + ), + ) + for index, candidate in enumerate(ranked, start=1): + candidate["candidate_rank"] = index + return ranked + # [/DEF:SemanticSourceResolver.rank_candidates:Function] + + # [DEF:SemanticSourceResolver.detect_conflicts:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Mark competing candidate sets that require explicit user review. + def detect_conflicts(self, candidates: List[Dict[str, Any]]) -> bool: + return len(candidates) > 1 + # [/DEF:SemanticSourceResolver.detect_conflicts:Function] + + # [DEF:SemanticSourceResolver.apply_field_decision:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Accept, reject, or manually override a field-level semantic value. + def apply_field_decision(self, field_state: Mapping[str, Any], decision: Mapping[str, Any]) -> Dict[str, Any]: + merged = dict(field_state) + merged.update(decision) + return merged + # [/DEF:SemanticSourceResolver.apply_field_decision:Function] + + # [DEF:SemanticSourceResolver._normalize_dictionary_row:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Normalize one dictionary row into a consistent lookup structure. + def _normalize_dictionary_row(self, row: Mapping[str, Any]) -> Dict[str, Any]: + field_name = ( + row.get("field_name") + or row.get("column_name") + or row.get("name") + or row.get("field") + ) + normalized_name = str(field_name or "").strip() + return { + "field_name": normalized_name, + "field_key": self._normalize_key(normalized_name), + "verbose_name": row.get("verbose_name") or row.get("label"), + "description": row.get("description"), + "display_format": row.get("display_format") or row.get("format"), + } + # [/DEF:SemanticSourceResolver._normalize_dictionary_row:Function] + + # [DEF:SemanticSourceResolver._find_fuzzy_matches:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Produce confidence-scored fuzzy matches while keeping them reviewable. + def _find_fuzzy_matches(self, field_name: str, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + normalized_target = self._normalize_key(field_name) + fuzzy_matches: List[Dict[str, Any]] = [] + for row in rows: + candidate_key = str(row.get("field_key") or "") + if not candidate_key: + continue + score = SequenceMatcher(None, normalized_target, candidate_key).ratio() + if score < 0.72: + continue + fuzzy_matches.append({"row": row, "score": round(score, 3)}) + fuzzy_matches.sort(key=lambda item: item["score"], reverse=True) + return fuzzy_matches[:3] + # [/DEF:SemanticSourceResolver._find_fuzzy_matches:Function] + + # [DEF:SemanticSourceResolver._build_candidate_payload:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Project normalized dictionary rows into semantic candidate payloads. + def _build_candidate_payload( + self, + rank: int, + match_type: CandidateMatchType, + confidence_score: float, + row: Mapping[str, Any], + ) -> Dict[str, Any]: + return { + "candidate_rank": rank, + "match_type": match_type.value, + "confidence_score": confidence_score, + "proposed_verbose_name": row.get("verbose_name"), + "proposed_description": row.get("description"), + "proposed_display_format": row.get("display_format"), + "status": CandidateStatus.PROPOSED.value, + } + # [/DEF:SemanticSourceResolver._build_candidate_payload:Function] + + # [DEF:SemanticSourceResolver._match_priority:Function] + # @COMPLEXITY: 2 + # @PURPOSE: Encode trusted-confidence ordering so exact dictionary reuse beats fuzzy invention. + def _match_priority(self, match_type: Optional[str]) -> int: + priority = { + CandidateMatchType.EXACT.value: 0, + CandidateMatchType.REFERENCE.value: 1, + CandidateMatchType.FUZZY.value: 2, + CandidateMatchType.GENERATED.value: 3, + } + return priority.get(str(match_type or ""), 99) + # [/DEF:SemanticSourceResolver._match_priority:Function] + + # [DEF:SemanticSourceResolver._normalize_key:Function] + # @COMPLEXITY: 1 + # @PURPOSE: Normalize field identifiers for stable exact/fuzzy comparisons. + def _normalize_key(self, value: str) -> str: + return "".join(ch for ch in str(value or "").strip().lower() if ch.isalnum() or ch == "_") + # [/DEF:SemanticSourceResolver._normalize_key:Function] +# [/DEF:SemanticSourceResolver:Class] + +# [/DEF:SemanticSourceResolver:Module] \ No newline at end of file diff --git a/frontend/src/lib/components/dataset-review/.gitkeep b/frontend/src/lib/components/dataset-review/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/frontend/src/lib/components/dataset-review/SourceIntakePanel.svelte b/frontend/src/lib/components/dataset-review/SourceIntakePanel.svelte new file mode 100644 index 00000000..65db4acc --- /dev/null +++ b/frontend/src/lib/components/dataset-review/SourceIntakePanel.svelte @@ -0,0 +1,332 @@ + + + + + + + + + + + + + + +
+
+
+

+ {$t.dataset_review?.source?.eyebrow} +

+

+ {$t.dataset_review?.source?.title} +

+

+ {$t.dataset_review?.source?.description} +

+
+ +
+ {#if intakeState === "Rejected"} + {$t.dataset_review?.source?.state_rejected} + {:else if intakeState === "Validating"} + {$t.dataset_review?.source?.state_validating} + {:else} + {$t.dataset_review?.source?.state_idle} + {/if} +
+
+ +
+ + + +
+ +
+
+ + + +
+ +
+ {getInlineHint()} +
+ +
+
+ {#if isSupersetLinkMode} + {$t.dataset_review?.source?.superset_link_recovery_note} + {:else} + {$t.dataset_review?.source?.dataset_selection_recovery_note} + {/if} +
+ + +
+
+
+ + \ No newline at end of file diff --git a/frontend/src/lib/components/dataset-review/ValidationFindingsPanel.svelte b/frontend/src/lib/components/dataset-review/ValidationFindingsPanel.svelte new file mode 100644 index 00000000..a8e05030 --- /dev/null +++ b/frontend/src/lib/components/dataset-review/ValidationFindingsPanel.svelte @@ -0,0 +1,332 @@ + + + + + + + + + + + + + + + +
+
+
+

+ {$t.dataset_review?.findings?.eyebrow} +

+

+ {$t.dataset_review?.findings?.title} +

+

+ {$t.dataset_review?.findings?.description} +

+
+ +
+
+ {$t.dataset_review?.findings?.next_action_label} +
+
+ {getRecommendedActionLabel(recommendedAction)} +
+
+
+ + {#if totalFindings === 0} +
+ {$t.dataset_review?.findings?.empty} +
+ {:else} +
+
+
+

+ {$t.dataset_review?.findings?.blocking_title} +

+ + {blockingFindings.length} + +
+ + {#if blockingFindings.length === 0} +

+ {$t.dataset_review?.findings?.blocking_empty} +

+ {:else} +
+ {#each blockingFindings as finding} +
+
+
+
+

+ {finding.title} +

+ + {finding.code} + + + {getAreaLabel(finding.area)} + +
+ +

+ {finding.message} +

+ +
+ + {$t.dataset_review?.findings?.resolution_label}: + {getResolutionLabel(finding.resolution_state)} + + + {#if finding.caused_by_ref} + + {$t.dataset_review?.findings?.reference_label}: + {finding.caused_by_ref} + + {/if} +
+ + {#if finding.resolution_note} +

{finding.resolution_note}

+ {/if} +
+ +
+ +
+
+
+ {/each} +
+ {/if} +
+ +
+
+

+ {$t.dataset_review?.findings?.warning_title} +

+ + {warningFindings.length} + +
+ + {#if warningFindings.length === 0} +

+ {$t.dataset_review?.findings?.warning_empty} +

+ {:else} +
+ {#each warningFindings as finding} +
+
+
+
+

+ {finding.title} +

+ + {finding.code} + + + {getAreaLabel(finding.area)} + +
+ +

+ {finding.message} +

+ +
+ + {$t.dataset_review?.findings?.resolution_label}: + {getResolutionLabel(finding.resolution_state)} + + + {#if finding.caused_by_ref} + + {$t.dataset_review?.findings?.reference_label}: + {finding.caused_by_ref} + + {/if} +
+ + {#if finding.resolution_note} +

{finding.resolution_note}

+ {/if} +
+ +
+ +
+
+
+ {/each} +
+ {/if} +
+ +
+
+

+ {$t.dataset_review?.findings?.informational_title} +

+ + {informationalFindings.length} + +
+ + {#if informationalFindings.length === 0} +

+ {$t.dataset_review?.findings?.informational_empty} +

+ {:else} +
+ {#each informationalFindings as finding} +
+
+
+
+

+ {finding.title} +

+ + {finding.code} + + + {getAreaLabel(finding.area)} + +
+ +

+ {finding.message} +

+ +
+ + {$t.dataset_review?.findings?.resolution_label}: + {getResolutionLabel(finding.resolution_state)} + + + {#if finding.caused_by_ref} + + {$t.dataset_review?.findings?.reference_label}: + {finding.caused_by_ref} + + {/if} +
+ + {#if finding.resolution_note} +

{finding.resolution_note}

+ {/if} +
+ +
+ +
+
+
+ {/each} +
+ {/if} +
+
+ {/if} +
+ + \ No newline at end of file diff --git a/frontend/src/lib/components/dataset-review/__tests__/source_intake_panel.ux.test.js b/frontend/src/lib/components/dataset-review/__tests__/source_intake_panel.ux.test.js new file mode 100644 index 00000000..980ce7dc --- /dev/null +++ b/frontend/src/lib/components/dataset-review/__tests__/source_intake_panel.ux.test.js @@ -0,0 +1,161 @@ +/** + * @vitest-environment jsdom + */ +// [DEF:SourceIntakePanelUxTests:Module] +// @COMPLEXITY: 3 +// @SEMANTICS: dataset-review, source-intake, ux-tests, validation, recovery +// @PURPOSE: Verify source intake entry paths, validation feedback, and submit payload behavior for US1. +// @LAYER: UI +// @RELATION: [VERIFIES] ->[SourceIntakePanel] +// @UX_STATE: Idle -> Default intake renders both entry paths and waits for user input. +// @UX_STATE: Validating -> Inline feedback confirms recognized links and local payload validation. +// @UX_STATE: Rejected -> Invalid source input remains local and exposes recovery guidance. +// @TEST_CONTRACT: SourceIntakePanelProps -> ObservableIntakeUX +// @TEST_SCENARIO: invalid_superset_link_shows_rejected_state -> Invalid URL input keeps submit local and shows rejection feedback. +// @TEST_SCENARIO: recognized_superset_link_submits_payload -> Recognized link path submits normalized payload. +// @TEST_SCENARIO: dataset_selection_mode_changes_cta -> Dataset selection path switches CTA and payload source kind. +// @TEST_EDGE: missing_environment -> Required environment guard blocks submit. +// @TEST_EDGE: invalid_type -> Invalid Superset URL shows recovery hint. +// @TEST_EDGE: external_fail -> Submit callback failure is rendered inline. +// @TEST_INVARIANT: intake_contract_remains_observable -> VERIFIED_BY: [invalid_superset_link_shows_rejected_state, recognized_superset_link_submits_payload, dataset_selection_mode_changes_cta] + +import { describe, it, expect, vi } from "vitest"; +import { fireEvent, render, screen } from "@testing-library/svelte"; +import SourceIntakePanel from "../SourceIntakePanel.svelte"; + +vi.mock("$lib/i18n", () => ({ + t: { + subscribe: (fn) => { + fn({ + common: { + choose_environment: "Choose environment", + error: "Common error", + }, + dataset_review: { + source: { + eyebrow: "Source intake", + title: "Start dataset review", + description: "Paste link or provide dataset reference.", + state_idle: "Idle", + state_validating: "Validating", + state_rejected: "Rejected", + environment_label: "Environment", + environment_required: "Environment is required", + superset_link_tab: "Superset link", + superset_link_tab_hint: "Paste dashboard or explore URL", + dataset_selection_tab: "Dataset selection", + dataset_selection_tab_hint: "Enter dataset ref", + superset_link_label: "Superset link", + dataset_selection_label: "Dataset reference", + superset_link_placeholder: "https://superset.local/dashboard/10", + dataset_selection_placeholder: "public.sales", + superset_link_hint: "Paste a full Superset URL", + dataset_selection_hint: "Provide schema.dataset reference", + recognized_link_hint: "Recognized Superset link", + superset_link_required: "Superset link is required", + dataset_selection_required: "Dataset reference is required", + superset_link_invalid: "Superset link must start with http", + submit_failed: "Submit failed", + superset_link_recovery_note: "You can fix the link inline", + dataset_selection_recovery_note: "You can fix the dataset inline", + submitting: "Submitting", + submit_superset_link: "Start from link", + submit_dataset_selection: "Start from dataset", + dataset_selection_acknowledged: "Dataset selection acknowledged", + }, + }, + }); + return () => {}; + }, + }, +})); + +describe("SourceIntakePanel UX Contract", () => { + const environments = [{ id: "env-1", name: "DEV" }]; + + it("invalid_superset_link_shows_rejected_state", async () => { + const onsubmit = vi.fn(); + const { container } = render(SourceIntakePanel, { + environments, + onsubmit, + }); + + await fireEvent.change(screen.getByRole("combobox"), { + target: { value: "env-1" }, + }); + await fireEvent.input(screen.getByPlaceholderText("https://superset.local/dashboard/10"), { + target: { value: "not-a-url" }, + }); + await fireEvent.submit(container.querySelector("form")); + + expect(onsubmit).not.toHaveBeenCalled(); + expect(screen.getByText("Rejected")).toBeDefined(); + expect(screen.getByText("Superset link must start with http")).toBeDefined(); + }); + + it("recognized_superset_link_submits_payload", async () => { + const onsubmit = vi.fn().mockResolvedValue(undefined); + const { container } = render(SourceIntakePanel, { + environments, + selectedEnvironmentId: "env-1", + acknowledgment: "Recognized Superset link", + onsubmit, + }); + + await fireEvent.input(screen.getByPlaceholderText("https://superset.local/dashboard/10"), { + target: { value: "https://demo.local/superset/dashboard/42 " }, + }); + await fireEvent.submit(container.querySelector("form")); + + expect(onsubmit).toHaveBeenCalledWith({ + environment_id: "env-1", + source_input: "https://demo.local/superset/dashboard/42", + source_kind: "superset_link", + }); + expect(screen.getByText("Recognized Superset link")).toBeDefined(); + }); + + it("dataset_selection_mode_changes_cta", async () => { + const onsubmit = vi.fn().mockResolvedValue(undefined); + const { container } = render(SourceIntakePanel, { + environments, + onsubmit, + }); + + await fireEvent.click( + screen.getByRole("button", { + name: "Dataset selection Enter dataset ref", + }), + ); + await fireEvent.change(screen.getByRole("combobox"), { + target: { value: "env-1" }, + }); + await fireEvent.input(screen.getByPlaceholderText("public.sales"), { + target: { value: " public.sales " }, + }); + await fireEvent.submit(container.querySelector("form")); + + expect(onsubmit).toHaveBeenCalledWith({ + environment_id: "env-1", + source_input: "public.sales", + source_kind: "dataset_selection", + }); + }); + + it("external_fail_renders_inline_error", async () => { + const onsubmit = vi.fn().mockRejectedValue(new Error("Backend rejected source")); + const { container } = render(SourceIntakePanel, { + environments, + selectedEnvironmentId: "env-1", + onsubmit, + }); + + await fireEvent.input(screen.getByPlaceholderText("https://superset.local/dashboard/10"), { + target: { value: "https://demo.local/dashboard/42" }, + }); + await fireEvent.submit(container.querySelector("form")); + + expect(screen.getByText("Backend rejected source")).toBeDefined(); + }); +}); +// [/DEF:SourceIntakePanelUxTests:Module] \ No newline at end of file diff --git a/frontend/src/lib/components/dataset-review/__tests__/validation_findings_panel.ux.test.js b/frontend/src/lib/components/dataset-review/__tests__/validation_findings_panel.ux.test.js new file mode 100644 index 00000000..91baa31b --- /dev/null +++ b/frontend/src/lib/components/dataset-review/__tests__/validation_findings_panel.ux.test.js @@ -0,0 +1,141 @@ +/** + * @vitest-environment jsdom + */ +// [DEF:ValidationFindingsPanelUxTests:Module] +// @COMPLEXITY: 3 +// @SEMANTICS: dataset-review, findings, severity, jump, ux-tests +// @PURPOSE: Verify grouped findings visibility, empty state, and remediation jump behavior for US1. +// @LAYER: UI +// @RELATION: [VERIFIES] ->[ValidationFindingsPanel] +// @UX_STATE: Blocking -> Blocking findings dominate the panel and expose remediation jumps. +// @UX_STATE: Warning -> Warning findings remain visible with explicit resolution state. +// @UX_STATE: Informational -> Informational notes stay readable without competing with blockers. +// @TEST_CONTRACT: FindingsPanelProps -> ObservableFindingsUX +// @TEST_SCENARIO: blocking_warning_info_groups_render -> Findings render in the proper severity groups with counts. +// @TEST_SCENARIO: jump_action_maps_area_to_workspace_target -> Jump action emits normalized remediation target. +// @TEST_SCENARIO: empty_findings_show_success_state -> Empty list shows ready feedback. +// @TEST_EDGE: missing_field -> Missing optional fields do not crash rendering. +// @TEST_EDGE: invalid_type -> Unknown severity falls back to informational grouping. +// @TEST_EDGE: external_fail -> Jump callback remains optional. +// @TEST_INVARIANT: findings_groups_remain_actionable -> VERIFIED_BY: [blocking_warning_info_groups_render, jump_action_maps_area_to_workspace_target, empty_findings_show_success_state] + +import { describe, it, expect, vi } from "vitest"; +import { fireEvent, render, screen } from "@testing-library/svelte"; +import ValidationFindingsPanel from "../ValidationFindingsPanel.svelte"; + +vi.mock("$lib/i18n", () => ({ + t: { + subscribe: (fn) => { + fn({ + dataset_review: { + findings: { + eyebrow: "Validation findings", + title: "Findings", + description: "Review validation results", + next_action_label: "Next action", + empty: "No findings", + blocking_title: "Blocking", + blocking_empty: "No blocking findings", + warning_title: "Warnings", + warning_empty: "No warning findings", + informational_title: "Info", + informational_empty: "No informational findings", + resolution_label: "Resolution", + reference_label: "Reference", + jump_action: "Jump to area", + resolution: { + open: "Open", + approved: "Approved", + }, + areas: { + source_intake: "Source intake", + dataset_profile: "Dataset summary", + semantic_enrichment: "Semantics", + }, + }, + workspace: { + actions: { + review_documentation: "Review documentation", + import_from_superset: "Import from Superset", + }, + }, + }, + }); + return () => {}; + }, + }, +})); + +describe("ValidationFindingsPanel UX Contract", () => { + const findings = [ + { + finding_id: "f-1", + severity: "blocking", + code: "REQ_ENV", + area: "source_intake", + title: "Environment required", + message: "Select environment", + resolution_state: "open", + }, + { + finding_id: "f-2", + severity: "warning", + code: "PARTIAL_RECOVERY", + area: "dataset_profile", + title: "Partial recovery", + message: "Some metadata needs review", + resolution_state: "approved", + caused_by_ref: "dashboard:42", + }, + { + finding_id: "f-3", + severity: "unexpected", + code: "INFO_NOTE", + area: "semantic_enrichment", + title: "Dictionary note", + message: "Trusted source used", + resolution_state: "open", + }, + ]; + + it("blocking_warning_info_groups_render", () => { + render(ValidationFindingsPanel, { + findings, + recommendedAction: "review_documentation", + }); + + expect(screen.getByText("Blocking")).toBeDefined(); + expect(screen.getByText("Warnings")).toBeDefined(); + expect(screen.getByText("Info")).toBeDefined(); + expect(screen.getByText("Environment required")).toBeDefined(); + expect(screen.getByText("Partial recovery")).toBeDefined(); + expect(screen.getByText("Dictionary note")).toBeDefined(); + expect(screen.getByText("Review documentation")).toBeDefined(); + }); + + it("jump_action_maps_area_to_workspace_target", async () => { + const onjump = vi.fn(); + render(ValidationFindingsPanel, { + findings: [findings[0]], + recommendedAction: "import_from_superset", + onjump, + }); + + await fireEvent.click(screen.getByRole("button", { name: "Jump to area" })); + + expect(onjump).toHaveBeenCalledWith({ + target: "intake", + finding: findings[0], + }); + }); + + it("empty_findings_show_success_state", () => { + render(ValidationFindingsPanel, { + findings: [], + recommendedAction: "review_documentation", + }); + + expect(screen.getByText("No findings")).toBeDefined(); + }); +}); +// [/DEF:ValidationFindingsPanelUxTests:Module] \ No newline at end of file diff --git a/frontend/src/lib/i18n/locales/en.json b/frontend/src/lib/i18n/locales/en.json index 68c3d320..40d8ea76 100644 --- a/frontend/src/lib/i18n/locales/en.json +++ b/frontend/src/lib/i18n/locales/en.json @@ -586,6 +586,187 @@ "task_failed": "Failed", "task_waiting": "Waiting" }, + "dataset_review": { + "source": { + "eyebrow": "Source intake", + "title": "Start dataset review", + "description": "Paste a Superset link or provide a dataset reference to begin a resumable review session.", + "state_idle": "Idle", + "state_validating": "Validating", + "state_rejected": "Rejected", + "environment_label": "Environment", + "environment_required": "Select the Superset environment before starting review.", + "superset_link_tab": "Superset link", + "superset_link_tab_hint": "Import dashboard or explore context and recover filters progressively.", + "dataset_selection_tab": "Dataset selection", + "dataset_selection_tab_hint": "Start from a known dataset reference when you already know the target.", + "superset_link_label": "Superset URL", + "superset_link_placeholder": "https://superset.example.com/superset/dashboard/42/?native_filters=...", + "superset_link_hint": "Recognized Superset links create a session immediately, then continue partial recovery in place.", + "superset_link_required": "Paste a Superset link to continue.", + "superset_link_invalid": "Enter a full Superset URL starting with http:// or https://.", + "superset_link_recovery_note": "Partial recovery stays visible. Missing filters or unresolved context are disclosed instead of hidden.", + "recognized_link_hint": "Superset link recognized. Session recovery will continue after the shell is created.", + "dataset_selection_label": "Dataset reference", + "dataset_selection_placeholder": "dataset_id:123 or sales.daily_revenue", + "dataset_selection_hint": "Use a dataset id or stable dataset reference from the selected environment.", + "dataset_selection_required": "Enter a dataset reference to continue.", + "dataset_selection_recovery_note": "Dataset-based intake still surfaces confidence and unresolved context explicitly.", + "dataset_selection_acknowledged": "Dataset reference accepted. Loading the resumable review shell.", + "submit_superset_link": "Start from link", + "submit_dataset_selection": "Start from dataset", + "submitting": "Starting review...", + "submit_failed": "Unable to start the dataset review session." + }, + "findings": { + "eyebrow": "Validation findings", + "title": "Review blockers, warnings, and informational notes", + "description": "Findings stay grouped by severity so the highest-risk issues remain visible while partial recovery stays usable.", + "next_action_label": "Recommended next step", + "empty": "No findings recorded yet. The session can continue with the current recovered context.", + "blocking_title": "Blocking", + "blocking_empty": "No blocking findings are currently open.", + "warning_title": "Warnings", + "warning_empty": "No warning-level findings need attention right now.", + "informational_title": "Informational", + "informational_empty": "No informational findings are currently recorded.", + "resolution_label": "Resolution", + "reference_label": "Reference", + "jump_action": "Open related area", + "resolution": { + "open": "Open", + "resolved": "Resolved", + "approved": "Approved", + "skipped": "Skipped", + "deferred": "Deferred", + "expert_review": "Expert review" + }, + "areas": { + "source_intake": "Source intake", + "dataset_profile": "Dataset profile", + "semantic_enrichment": "Semantic enrichment", + "clarification": "Clarification", + "filter_recovery": "Filter recovery", + "template_mapping": "Template mapping", + "compiled_preview": "Compiled preview", + "launch": "Launch", + "audit": "Audit" + } + }, + "workspace": { + "eyebrow": "Dataset orchestration", + "title": "Dataset review workspace", + "description": "Resume a prior session or inspect recovered context, findings, provenance, and exports in one place.", + "loading": "Loading review workspace...", + "load_failed": "Failed to load the dataset review session.", + "resume_failed": "Failed to resume the current review session.", + "save_failed": "Failed to update the review session.", + "export_failed": "Failed to export the requested review artifact.", + "empty_state_title": "No session loaded", + "state_label": "Workspace", + "readiness_label": "Readiness", + "source_badge_fallback": "review session", + "save_session_action": "Save session", + "export_summary_action": "Export summary", + "save_summary_action": "Save summary", + "summary_edit_saved": "Summary draft updated for the current review workspace.", + "import_progress_eyebrow": "Import progress", + "import_progress_title": "Recovering dataset context progressively", + "import_progress_body": "The workspace reveals milestones as source, filters, variables, and semantic candidates become available.", + "import_milestones": { + "recognized": "Dataset recognized", + "filters": "Saved native filters recovered", + "variables": "Dataset template variables detected", + "semantic_sources": "Nearby semantic sources identified", + "summary": "Preliminary summary prepared" + }, + "source_session_title": "Source & session", + "source_label": "Dataset source", + "import_status_title": "Superset import status", + "recent_actions_title": "Recent actions timeline", + "timeline": { + "source": "Source accepted", + "status": "Session state updated", + "filters": "Recovered filters", + "findings": "Validation findings recorded", + "exports": "Latest export generated" + }, + "next_action_label": "Next action", + "next_action_card_eyebrow": "Primary action", + "next_action_card_body": "The main action always reflects the highest-value next step for the current session state.", + "session_label": "Session summary", + "session_id_label": "Session ID", + "summary_source_label": "Summary source", + "confidence_label": "Confidence", + "business_summary_title": "Business summary", + "summary_missing": "No readable business summary is available yet.", + "partial_recovery_badge": "Partial recovery", + "phase_label": "Phase", + "status_label": "Status", + "active_task_label": "Active task", + "recovery_title": "Recovered filters and provenance", + "recovery_empty": "No imported filters have been recovered yet.", + "provenance_label": "Provenance", + "recovered_value_label": "Recovered value", + "health_title": "Findings overview", + "open_findings_label": "Open findings", + "exports_title": "Exports", + "exports_description": "Generate inline documentation and validation artifacts from the current reviewed state.", + "export_documentation_json": "Export documentation (JSON)", + "export_documentation_markdown": "Export documentation (Markdown)", + "export_validation_json": "Export validation (JSON)", + "export_validation_markdown": "Export validation (Markdown)", + "preview_title": "Preview status", + "preview_status_label": "Status", + "preview_compiler_label": "Compiled by", + "preview_pending_note": "Compiled preview is not part of this US1 batch yet, but the workspace keeps the state visible when present.", + "jump_target_label": "Focused area", + "resume_action": "Resume session", + "pause_action": "Pause session", + "actions": { + "import_from_superset": "Import from Superset", + "review_documentation": "Review documentation", + "apply_semantic_source": "Apply semantic source", + "start_clarification": "Start clarification", + "answer_next_question": "Answer next question", + "approve_mapping": "Approve mapping", + "generate_sql_preview": "Generate SQL preview", + "complete_required_values": "Complete required values", + "launch_dataset": "Launch dataset", + "resume_session": "Resume session", + "export_outputs": "Export outputs" + }, + "readiness": { + "empty": "Empty", + "importing": "Importing", + "review_ready": "Review ready", + "semantic_source_review_needed": "Semantic source review needed", + "clarification_needed": "Clarification needed", + "clarification_active": "Clarification active", + "mapping_review_needed": "Mapping review needed", + "compiled_preview_ready": "Compiled preview ready", + "partially_ready": "Partially ready", + "run_ready": "Run ready", + "run_in_progress": "Run in progress", + "completed": "Completed", + "recovery_required": "Recovery required" + }, + "summary_sources": { + "confirmed": "Confirmed", + "imported": "Imported", + "inferred": "Inferred", + "ai_draft": "AI draft", + "manual_override": "Manual override" + }, + "confidence": { + "confirmed": "Confirmed", + "mostly_confirmed": "Mostly confirmed", + "mixed": "Mixed", + "low_confidence": "Low confidence", + "unresolved": "Unresolved" + } + } + }, "tasks": { "management": "Task Management", "run_backup": "Run Backup", diff --git a/frontend/src/lib/i18n/locales/ru.json b/frontend/src/lib/i18n/locales/ru.json index df34b423..7a869a4e 100644 --- a/frontend/src/lib/i18n/locales/ru.json +++ b/frontend/src/lib/i18n/locales/ru.json @@ -584,6 +584,187 @@ "task_failed": "Ошибка", "task_waiting": "Ожидание" }, + "dataset_review": { + "source": { + "eyebrow": "Источник", + "title": "Запуск review датасета", + "description": "Вставьте ссылку Superset или укажите ссылку на датасет, чтобы начать возобновляемую review-сессию.", + "state_idle": "Ожидание", + "state_validating": "Проверка", + "state_rejected": "Отклонено", + "environment_label": "Окружение", + "environment_required": "Перед запуском review выберите окружение Superset.", + "superset_link_tab": "Ссылка Superset", + "superset_link_tab_hint": "Импортирует контекст dashboard/explore и поэтапно восстанавливает фильтры.", + "dataset_selection_tab": "Выбор датасета", + "dataset_selection_tab_hint": "Используйте известную ссылку на датасет, если цель уже определена.", + "superset_link_label": "URL Superset", + "superset_link_placeholder": "https://superset.example.com/superset/dashboard/42/?native_filters=...", + "superset_link_hint": "Распознанная ссылка Superset сразу создает сессию, а затем продолжает частичное восстановление в фоне.", + "superset_link_required": "Вставьте ссылку Superset, чтобы продолжить.", + "superset_link_invalid": "Введите полный URL Superset, начинающийся с http:// или https://.", + "superset_link_recovery_note": "Частичное восстановление остается видимым. Отсутствующие фильтры и неразрешенный контекст явно показываются.", + "recognized_link_hint": "Ссылка Superset распознана. Восстановление продолжится после создания оболочки сессии.", + "dataset_selection_label": "Ссылка на датасет", + "dataset_selection_placeholder": "dataset_id:123 или sales.daily_revenue", + "dataset_selection_hint": "Используйте id датасета или стабильную ссылку на датасет из выбранного окружения.", + "dataset_selection_required": "Введите ссылку на датасет, чтобы продолжить.", + "dataset_selection_recovery_note": "Для intake по датасету также явно показываются confidence и неразрешенный контекст.", + "dataset_selection_acknowledged": "Ссылка на датасет принята. Загружается возобновляемая review-оболочка.", + "submit_superset_link": "Старт по ссылке", + "submit_dataset_selection": "Старт по датасету", + "submitting": "Запуск review...", + "submit_failed": "Не удалось запустить review-сессию датасета." + }, + "findings": { + "eyebrow": "Результаты валидации", + "title": "Проверьте блокеры, предупреждения и информационные заметки", + "description": "Результаты сгруппированы по severity, чтобы самые важные риски оставались видимыми, а частичное восстановление оставалось пригодным к использованию.", + "next_action_label": "Рекомендованный следующий шаг", + "empty": "Результаты пока не записаны. Сессию можно продолжать с текущим восстановленным контекстом.", + "blocking_title": "Блокеры", + "blocking_empty": "Открытых блокирующих результатов сейчас нет.", + "warning_title": "Предупреждения", + "warning_empty": "Сейчас нет warning-результатов, требующих внимания.", + "informational_title": "Информационные", + "informational_empty": "Информационные результаты сейчас отсутствуют.", + "resolution_label": "Статус", + "reference_label": "Ссылка", + "jump_action": "Открыть связанный раздел", + "resolution": { + "open": "Открыто", + "resolved": "Решено", + "approved": "Подтверждено", + "skipped": "Пропущено", + "deferred": "Отложено", + "expert_review": "Экспертная проверка" + }, + "areas": { + "source_intake": "Источник", + "dataset_profile": "Профиль датасета", + "semantic_enrichment": "Семантическое обогащение", + "clarification": "Уточнение", + "filter_recovery": "Восстановление фильтров", + "template_mapping": "Маппинг шаблонов", + "compiled_preview": "Скомпилированный preview", + "launch": "Запуск", + "audit": "Аудит" + } + }, + "workspace": { + "eyebrow": "Оркестрация датасета", + "title": "Workspace review датасета", + "description": "Возобновляйте предыдущую сессию или просматривайте восстановленный контекст, findings, provenance и exports в одном месте.", + "loading": "Загрузка review workspace...", + "load_failed": "Не удалось загрузить review-сессию датасета.", + "resume_failed": "Не удалось возобновить текущую review-сессию.", + "save_failed": "Не удалось обновить review-сессию.", + "export_failed": "Не удалось экспортировать выбранный артефакт review.", + "empty_state_title": "Сессия не загружена", + "state_label": "Workspace", + "readiness_label": "Готовность", + "source_badge_fallback": "review-сессия", + "save_session_action": "Сохранить сессию", + "export_summary_action": "Экспортировать summary", + "save_summary_action": "Сохранить summary", + "summary_edit_saved": "Черновик summary обновлен для текущего review workspace.", + "import_progress_eyebrow": "Прогресс импорта", + "import_progress_title": "Контекст датасета восстанавливается поэтапно", + "import_progress_body": "Workspace последовательно показывает этапы распознавания источника, фильтров, переменных и семантических кандидатов.", + "import_milestones": { + "recognized": "Датасет распознан", + "filters": "Сохраненные native filters восстановлены", + "variables": "Переменные шаблона датасета обнаружены", + "semantic_sources": "Найдены ближайшие семантические источники", + "summary": "Подготовлено предварительное summary" + }, + "source_session_title": "Источник и сессия", + "source_label": "Источник датасета", + "import_status_title": "Статус импорта Superset", + "recent_actions_title": "Таймлайн последних действий", + "timeline": { + "source": "Источник принят", + "status": "Состояние сессии обновлено", + "filters": "Фильтры восстановлены", + "findings": "Findings записаны", + "exports": "Последний экспорт создан" + }, + "next_action_label": "Следующее действие", + "next_action_card_eyebrow": "Основное действие", + "next_action_card_body": "Основное действие всегда соответствует самому ценному следующему шагу для текущего состояния сессии.", + "session_label": "Сводка сессии", + "session_id_label": "ID сессии", + "summary_source_label": "Источник summary", + "confidence_label": "Уверенность", + "business_summary_title": "Бизнес-summary", + "summary_missing": "Читаемое бизнес-summary пока недоступно.", + "partial_recovery_badge": "Частичное восстановление", + "phase_label": "Фаза", + "status_label": "Статус", + "active_task_label": "Активная задача", + "recovery_title": "Восстановленные фильтры и provenance", + "recovery_empty": "Импортированные фильтры пока не восстановлены.", + "provenance_label": "Provenance", + "recovered_value_label": "Восстановленное значение", + "health_title": "Обзор findings", + "open_findings_label": "Открытые findings", + "exports_title": "Экспорт", + "exports_description": "Генерируйте inline-артефакты документации и валидации из текущего review-состояния.", + "export_documentation_json": "Экспорт документации (JSON)", + "export_documentation_markdown": "Экспорт документации (Markdown)", + "export_validation_json": "Экспорт валидации (JSON)", + "export_validation_markdown": "Экспорт валидации (Markdown)", + "preview_title": "Статус preview", + "preview_status_label": "Статус", + "preview_compiler_label": "Скомпилировано", + "preview_pending_note": "Скомпилированный preview не входит в этот пакет US1, но workspace сохраняет его видимым, если он уже существует.", + "jump_target_label": "Выбранная зона", + "resume_action": "Возобновить сессию", + "pause_action": "Поставить сессию на паузу", + "actions": { + "import_from_superset": "Импорт из Superset", + "review_documentation": "Проверить документацию", + "apply_semantic_source": "Применить семантический источник", + "start_clarification": "Начать уточнение", + "answer_next_question": "Ответить на следующий вопрос", + "approve_mapping": "Подтвердить маппинг", + "generate_sql_preview": "Сгенерировать SQL preview", + "complete_required_values": "Заполнить обязательные значения", + "launch_dataset": "Запустить датасет", + "resume_session": "Возобновить сессию", + "export_outputs": "Экспортировать результаты" + }, + "readiness": { + "empty": "Пусто", + "importing": "Импорт", + "review_ready": "Готово к review", + "semantic_source_review_needed": "Нужен review источника семантики", + "clarification_needed": "Нужно уточнение", + "clarification_active": "Уточнение активно", + "mapping_review_needed": "Нужен review маппинга", + "compiled_preview_ready": "Скомпилированный preview готов", + "partially_ready": "Частично готово", + "run_ready": "Готово к запуску", + "run_in_progress": "Запуск выполняется", + "completed": "Завершено", + "recovery_required": "Требуется восстановление" + }, + "summary_sources": { + "confirmed": "Подтверждено", + "imported": "Импортировано", + "inferred": "Выведено", + "ai_draft": "Черновик AI", + "manual_override": "Ручное переопределение" + }, + "confidence": { + "confirmed": "Подтверждено", + "mostly_confirmed": "Почти подтверждено", + "mixed": "Смешанное", + "low_confidence": "Низкая уверенность", + "unresolved": "Не разрешено" + } + } + }, "tasks": { "management": "Управление задачами", "run_backup": "Запустить бэкап", diff --git a/frontend/src/lib/stores/__tests__/mocks/state.js b/frontend/src/lib/stores/__tests__/mocks/state.js new file mode 100644 index 00000000..9ea13e8a --- /dev/null +++ b/frontend/src/lib/stores/__tests__/mocks/state.js @@ -0,0 +1,16 @@ +// [DEF:state_mock:Module] +// @COMPLEXITY: 2 +// @PURPOSE: Mock for $app/state in vitest route/component tests. +// @LAYER: UI Tests + +export const page = { + params: {}, + route: { id: "test" }, + url: new URL("http://localhost"), + status: 200, + error: null, + data: {}, + form: null, +}; + +// [/DEF:state_mock:Module] \ No newline at end of file diff --git a/frontend/src/lib/stores/__tests__/test_datasetReviewSession.js b/frontend/src/lib/stores/__tests__/test_datasetReviewSession.js index 5b818c5e..136bf82f 100644 --- a/frontend/src/lib/stores/__tests__/test_datasetReviewSession.js +++ b/frontend/src/lib/stores/__tests__/test_datasetReviewSession.js @@ -1,9 +1,10 @@ -// [DEF:frontend.src.lib.stores.__tests__.test_datasetReviewSession:Module] +// [DEF:DatasetReviewSessionStoreTests:Module] // @COMPLEXITY: 3 // @SEMANTICS: dataset-review, store, session, tests -// @PURPOSE: Unit tests for dataset review session store +// @PURPOSE: Unit tests for dataset review session store. // @LAYER: UI // @RELATION: VERIFIES -> [datasetReviewSession:Store] +// @UX_STATE: Idle -> Store helpers are exercised without asynchronous UI transitions. import { describe, it, expect, beforeEach, vi } from 'vitest'; @@ -109,4 +110,4 @@ describe('datasetReviewSession store', () => { }); }); -// [/DEF:frontend.src.lib.stores.__tests__.test_datasetReviewSession:Module] \ No newline at end of file +// [/DEF:DatasetReviewSessionStoreTests:Module] \ No newline at end of file diff --git a/frontend/src/lib/stores/datasetReviewSession.js b/frontend/src/lib/stores/datasetReviewSession.js index 48bf1d05..5ba104e9 100644 --- a/frontend/src/lib/stores/datasetReviewSession.js +++ b/frontend/src/lib/stores/datasetReviewSession.js @@ -1,8 +1,12 @@ // [DEF:datasetReviewSession:Store] // @COMPLEXITY: 4 -// @PURPOSE: Manage active dataset review session state, including loading, updates, and navigation guards. +// @PURPOSE: Manage active dataset review session state, including loading, local edits, error capture, and reset semantics for the active review workspace. // @LAYER: UI -// @RELATION: DEPENDS_ON -> api_module (requestApi/fetchApi) +// @RELATION: DEPENDS_ON -> [api_module] +// @PRE: Consumers provide session-shaped payloads when setting or patching state and initialize the store before reading derived session fields. +// @POST: Store transitions keep session, loading, error, and dirty flags internally consistent after each helper call. +// @SIDE_EFFECT: Mutates the writable dataset review session store in frontend memory. +// @DATA_CONTRACT: Input[SessionDetail | Partial | boolean | string | null] -> Output[DatasetReviewSessionStoreState] // // @UX_STATE: Loading -> Session detail is being fetched. // @UX_STATE: Ready -> Session detail is available for UI binding. diff --git a/frontend/src/routes/datasets/review/[id]/+page.svelte b/frontend/src/routes/datasets/review/[id]/+page.svelte new file mode 100644 index 00000000..49997698 --- /dev/null +++ b/frontend/src/routes/datasets/review/[id]/+page.svelte @@ -0,0 +1,896 @@ + + + + + + + + + + + + + + + + + + + + + + + +
+
+
+

+ {$t.dataset_review?.workspace?.eyebrow} +

+

+ {$t.dataset_review?.workspace?.title} +

+

+ {$t.dataset_review?.workspace?.description} +

+
+ +
+ {#if session} + + {session.source_kind || $t.dataset_review?.workspace?.source_badge_fallback} + + + {profile?.dataset_name || session.dataset_ref} + + {/if} + + {$t.dataset_review?.workspace?.state_label}: {currentWorkspaceState} + + {#if session} + + {$t.dataset_review?.workspace?.readiness_label}: {readinessLabel} + + + + {/if} +
+
+ + {#if isBootstrapping} +
+ {$t.dataset_review?.workspace?.loading} +
+ {:else} + {#if loadError} +
+ {loadError} +
+ {/if} + + {#if !session} + + + {#if currentWorkspaceState === "Importing"} +
+
+
+

+ {$t.dataset_review?.workspace?.import_progress_eyebrow} +

+

+ {$t.dataset_review?.workspace?.import_progress_title} +

+

+ {$t.dataset_review?.workspace?.import_progress_body} +

+
+ + {currentWorkspaceState} + +
+ +
    + {#each importMilestones as milestone} +
  1. +
    + + {milestone.label} +
    +
  2. + {/each} +
+
+ {/if} + + {#if submitError} +
+ {submitError} +
+ {/if} + {:else} +
+
+
+

+ {$t.dataset_review?.workspace?.source_session_title} +

+ +
+
+ {$t.dataset_review?.workspace?.source_label} +
+
+ {session.source_input || session.dataset_ref} +
+
+ +
+
+
+
+ {$t.dataset_review?.workspace?.import_status_title} +
+
+ {currentWorkspaceState} +
+
+ + {readinessLabel} + +
+ +
    + {#each importMilestones as milestone} +
  1. + + {milestone.label} +
  2. + {/each} +
+
+ +
+ + +
+ +
+

+ {$t.dataset_review?.workspace?.recent_actions_title} +

+
    + {#each recentActions as action} +
  1. +
    {action.title}
    +
    {action.detail}
    +
  2. + {/each} +
+
+
+
+ +
+
+
+
+

+ {$t.dataset_review?.workspace?.session_label} +

+

+ {profile?.dataset_name || session.dataset_ref} +

+

+ {profile?.schema_name || "—"} • {profile?.database_name || "—"} +

+
+ +
+
+ {$t.dataset_review?.workspace?.next_action_label} +
+
+ {nextActionLabel} +
+
+
+ +
+
+
+ {$t.dataset_review?.workspace?.summary_source_label} +
+
+ {summarySourceLabel} +
+
+ +
+
+ {$t.dataset_review?.workspace?.confidence_label} +
+
+ {confidenceLabel} +
+
+ +
+
+ {$t.dataset_review?.workspace?.session_id_label} +
+
+ {session.session_id} +
+
+
+ +
+
+

+ {$t.dataset_review?.workspace?.business_summary_title} +

+
+ + {#if session.readiness_state === "recovery_required" || session.readiness_state === "partially_ready"} + + {$t.dataset_review?.workspace?.partial_recovery_badge} + + {/if} +
+
+ + {#if isEditingSummary} + +
+ + +
+ {:else} +

+ {summaryDraft || profile?.business_summary || $t.dataset_review?.workspace?.summary_missing} +

+ {/if} + + {#if summaryFeedback} +
+ {summaryFeedback} +
+ {/if} + +
+ + {$t.dataset_review?.workspace?.phase_label}: {session.current_phase} + + + {$t.dataset_review?.workspace?.status_label}: {session.status} + + {#if session.active_task_id} + + {$t.dataset_review?.workspace?.active_task_label}: {session.active_task_id} + + {/if} +
+
+ +
+
+

+ {$t.dataset_review?.workspace?.recovery_title} +

+ + {importedFilters.length} + +
+ + {#if importedFilters.length === 0} +

+ {$t.dataset_review?.workspace?.recovery_empty} +

+ {:else} +
+ {#each importedFilters as filterItem} +
+
+
+ {filterItem.display_name || filterItem.filter_name} +
+ + {$t.dataset_review?.workspace?.provenance_label}: + {filterItem.source} + + + {$t.dataset_review?.workspace?.confidence_label}: + {filterItem.confidence_state} + + + {filterItem.recovery_status} + +
+ +

+ {$t.dataset_review?.workspace?.recovered_value_label}: + {JSON.stringify(filterItem.normalized_value ?? filterItem.raw_value)} +

+ + {#if filterItem.notes} +

{filterItem.notes}

+ {/if} +
+ {/each} +
+ {/if} +
+ + +
+
+ + +
+ {/if} + {/if} +
+ + \ No newline at end of file diff --git a/frontend/src/routes/datasets/review/[id]/__tests__/dataset_review_workspace.ux.test.js b/frontend/src/routes/datasets/review/[id]/__tests__/dataset_review_workspace.ux.test.js new file mode 100644 index 00000000..6a4e1678 --- /dev/null +++ b/frontend/src/routes/datasets/review/[id]/__tests__/dataset_review_workspace.ux.test.js @@ -0,0 +1,453 @@ +/** + * @vitest-environment jsdom + */ +// @ts-nocheck +// [DEF:DatasetReviewWorkspaceUxTests:Module] +// @COMPLEXITY: 3 +// @SEMANTICS: dataset-review, workspace, route, ux-tests, review-state, exports, recovery +// @PURPOSE: Verify US1 dataset review workspace flow evidence for empty, load, import, recovery, and export behaviors. +// @LAYER: UI +// @RELATION: [VERIFIES] ->[DatasetReviewWorkspace] +// @UX_STATE: Empty -> Intake-first workspace renders clear starting actions. +// @UX_STATE: Importing -> Progressive milestones remain visible while review context is assembled. +// @UX_STATE: Review -> Workspace exposes summary controls, timeline, findings, and export affordances. +// @TEST_CONTRACT: DatasetReviewWorkspaceDependencies -> ObservableWorkspaceUX +// @TEST_SCENARIO: empty_state_renders_source_intake -> No session route data shows intake-first workspace. +// @TEST_SCENARIO: session_load_surfaces_partial_recovery_state -> Existing session route shows review summary, findings, and partial recovery evidence. +// @TEST_SCENARIO: import_submission_transitions_to_review_state -> Intake submit creates a session, navigates, and renders review state. +// @TEST_SCENARIO: export_feedback_surfaces_success_and_error -> Export action exposes both artifact success detail and failure feedback. +// @TEST_EDGE: missing_field -> Empty session context still renders stable workspace copy. +// @TEST_EDGE: invalid_type -> Unknown route params default to empty-state handling. +// @TEST_EDGE: external_fail -> API failures surface explicit feedback without collapsing the workspace. +// @TEST_INVARIANT: us1_workspace_remains_readable_and_actionable -> VERIFIED_BY: [empty_state_renders_source_intake, session_load_surfaces_partial_recovery_state, import_submission_transitions_to_review_state, export_feedback_surfaces_success_and_error] + +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { render, screen, waitFor, fireEvent } from "@testing-library/svelte"; +import DatasetReviewWorkspace from "../+page.svelte"; +import { api } from "$lib/api.js"; + +const mockedGoto = vi.mocked(await import("$app/navigation")).goto; +const mockedPage = vi.mocked(await import("$app/state")).page; +const routeState = mockedPage.params; + +function createSessionDetail(overrides = {}) { + return { + session_id: "session-1", + environment_id: "env-1", + source_kind: "superset_link", + source_input: "https://superset.local/dashboard/10", + dataset_ref: "public.sales", + readiness_state: "partially_ready", + recommended_action: "review_documentation", + status: "active", + current_phase: "review", + active_task_id: "task-77", + findings: [ + { + title: "Missing semantic description", + code: "SEM-1", + area: "dataset_profile", + severity: "warning", + message: "Business summary needs confirmation", + resolution_state: "open", + caused_by_ref: "profile.summary", + resolution_note: "Confirm with reviewer", + }, + ], + imported_filters: [ + { + filter_name: "region", + display_name: "Region", + source: "superset", + confidence_state: "mostly_confirmed", + recovery_status: "partial", + normalized_value: ["EMEA"], + raw_value: ["EMEA"], + notes: "Recovered from dashboard state", + }, + ], + previews: [ + { + preview_status: "compiled", + compiled_by: "semantic-review", + }, + ], + profile: { + dataset_name: "Sales Dataset", + schema_name: "public", + database_name: "analytics", + business_summary_source: "imported", + confidence_state: "mostly_confirmed", + business_summary: "Imported business description", + }, + ...overrides, + }; +} + +vi.mock("$app/navigation", () => ({ + goto: vi.fn(), +})); + +vi.mock("$app/state", () => ({ + page: { + params: { + id: "", + }, + }, +})); + +vi.mock("$lib/i18n", () => ({ + t: { + subscribe: (fn) => { + fn({ + common: { + error: "Common error", + choose_environment: "Choose environment", + edit: "Edit", + cancel: "Cancel", + }, + dataset_review: { + source: { + eyebrow: "Source intake", + title: "Start dataset review", + description: "Paste link or provide dataset reference.", + state_idle: "Idle", + state_validating: "Validating", + state_rejected: "Rejected", + environment_label: "Environment", + environment_required: "Environment is required", + superset_link_tab: "Superset link", + superset_link_tab_hint: "Paste dashboard or explore URL", + dataset_selection_tab: "Dataset selection", + dataset_selection_tab_hint: "Enter dataset ref", + superset_link_label: "Superset link", + dataset_selection_label: "Dataset reference", + superset_link_placeholder: "https://superset.local/dashboard/10", + dataset_selection_placeholder: "public.sales", + superset_link_hint: "Paste a full Superset URL", + dataset_selection_hint: "Provide schema.dataset reference", + recognized_link_hint: "Recognized Superset link", + superset_link_required: "Superset link is required", + dataset_selection_required: "Dataset reference is required", + superset_link_invalid: "Superset link must start with http", + submit_failed: "Submit failed", + superset_link_recovery_note: "You can fix the link inline", + dataset_selection_recovery_note: "You can fix the dataset inline", + submitting: "Submitting", + submit_superset_link: "Start from link", + submit_dataset_selection: "Start from dataset", + dataset_selection_acknowledged: "Dataset selection acknowledged", + }, + findings: { + eyebrow: "Validation findings", + title: "Findings", + description: "Review validation results", + next_action_label: "Next action", + empty: "No findings", + blocking_title: "Blocking", + blocking_empty: "No blocking findings", + warning_title: "Warnings", + warning_empty: "No warning findings", + informational_title: "Info", + informational_empty: "No informational findings", + resolution_label: "Resolution", + reference_label: "Reference", + jump_action: "Jump to area", + resolution: { + open: "Open", + }, + areas: { + source_intake: "Source intake", + dataset_profile: "Dataset summary", + filter_recovery: "Recovered filters", + }, + }, + workspace: { + eyebrow: "Dataset review", + title: "Dataset review workspace", + description: "Review imported dataset context.", + state_label: "Workspace state", + readiness_label: "Readiness", + loading: "Loading workspace", + load_failed: "Failed to load review session", + resume_failed: "Failed to resume session", + save_failed: "Failed to save session", + export_failed: "Failed to export artifact", + empty_state_title: "No session loaded", + source_badge_fallback: "review session", + save_session_action: "Save session", + export_summary_action: "Export summary", + save_summary_action: "Save summary", + summary_edit_saved: "Summary draft updated for the current review workspace.", + import_progress_eyebrow: "Import progress", + import_progress_title: "Recovering dataset context progressively", + import_progress_body: + "The workspace reveals milestones as source, filters, variables, and semantic candidates become available.", + import_milestones: { + recognized: "Dataset recognized", + filters: "Saved native filters recovered", + variables: "Dataset template variables detected", + semantic_sources: "Nearby semantic sources identified", + summary: "Preliminary summary prepared", + }, + source_session_title: "Source & session", + source_label: "Dataset source", + import_status_title: "Superset import status", + recent_actions_title: "Recent actions timeline", + timeline: { + source: "Source accepted", + status: "Session state updated", + filters: "Recovered filters", + findings: "Validation findings recorded", + exports: "Latest export generated", + }, + summary_sources: { + ai_draft: "AI draft", + imported: "Imported", + }, + confidence: { + unresolved: "Unresolved", + mostly_confirmed: "Mostly confirmed", + }, + actions: { + import_from_superset: "Import from Superset", + review_documentation: "Review documentation", + resume_session: "Resume session", + }, + readiness: { + empty: "Empty", + partially_ready: "Partially ready", + review_ready: "Review ready", + }, + session_label: "Session", + next_action_label: "Next action", + summary_source_label: "Summary source", + confidence_label: "Confidence", + session_id_label: "Session ID", + business_summary_title: "Business summary", + partial_recovery_badge: "Partial recovery", + summary_missing: "Summary unavailable", + phase_label: "Phase", + status_label: "Status", + active_task_label: "Active task", + recovery_title: "Recovered filters", + recovery_empty: "No recovered filters", + provenance_label: "Provenance", + recovered_value_label: "Recovered value", + next_action_card_eyebrow: "Recommended next step", + next_action_card_body: "Use the recommended action to continue review.", + resume_action: "Resume session", + pause_action: "Pause session", + jump_target_label: "Jump target", + health_title: "Review health", + open_findings_label: "Open findings", + exports_title: "Exports", + exports_description: "Export review artifacts.", + export_documentation_json: "Documentation JSON", + export_documentation_markdown: "Documentation Markdown", + export_validation_json: "Validation JSON", + export_validation_markdown: "Validation Markdown", + preview_title: "Preview", + preview_pending_note: "Preview not generated yet", + preview_status_label: "Preview status", + preview_compiler_label: "Compiled by", + }, + }, + }); + return () => {}; + }, + }, +})); + +vi.mock("$lib/api.js", () => ({ + api: { + fetchApi: vi.fn(), + postApi: vi.fn(), + requestApi: vi.fn(), + }, +})); + +vi.mock("$lib/stores/datasetReviewSession.js", () => ({ + setError: vi.fn(), + setLoading: vi.fn(), + setSession: vi.fn(), + resetSession: vi.fn(), +})); + +vi.mock("$lib/stores/environmentContext.js", () => ({ + environmentContextStore: { + subscribe: (run) => { + run({ + environments: [{ id: "env-1", name: "DEV" }], + selectedEnvId: "env-1", + }); + return () => {}; + }, + }, + initializeEnvironmentContext: vi.fn().mockResolvedValue(undefined), +})); + +describe("DatasetReviewWorkspace UX Contract", () => { + beforeEach(() => { + routeState.id = ""; + vi.clearAllMocks(); + api.fetchApi.mockReset(); + api.postApi.mockReset(); + api.requestApi.mockReset(); + mockedGoto.mockReset(); + }); + + it("empty_state_renders_source_intake", async () => { + api.fetchApi.mockResolvedValue(undefined); + + render(DatasetReviewWorkspace); + + await waitFor(() => { + expect(screen.getByText("Start dataset review")).toBeDefined(); + }); + expect(screen.getByText("Workspace state: Empty")).toBeDefined(); + expect(screen.getByText("Paste link or provide dataset reference.")).toBeDefined(); + }); + + it("session_load_surfaces_partial_recovery_state", async () => { + routeState.id = "session-1"; + api.fetchApi.mockResolvedValue(createSessionDetail()); + + render(DatasetReviewWorkspace); + + await waitFor(() => { + expect(api.fetchApi).toHaveBeenCalledWith("/dataset-orchestration/sessions/session-1"); + }); + + expect(screen.getByText("Workspace state: Review")).toBeDefined(); + expect(screen.getByText("Readiness: Partially ready")).toBeDefined(); + expect(screen.getAllByText("Sales Dataset").length).toBeGreaterThan(0); + expect(screen.getByText("Partial recovery")).toBeDefined(); + expect(screen.getAllByText("Recovered filters").length).toBeGreaterThan(0); + expect(screen.getByText("Imported business description")).toBeDefined(); + expect(screen.getAllByRole("button", { name: "Save session" }).length).toBeGreaterThan(0); + expect(screen.getAllByRole("button", { name: "Export summary" }).length).toBeGreaterThan(0); + expect(screen.getByText("Source & session")).toBeDefined(); + expect(screen.getByText("Recent actions timeline")).toBeDefined(); + expect(screen.getByText("Source accepted")).toBeDefined(); + }); + + it("summary_edit_affordance_updates_visible_draft", async () => { + routeState.id = "session-1"; + api.fetchApi.mockResolvedValue(createSessionDetail()); + + render(DatasetReviewWorkspace); + + await waitFor(() => { + expect(screen.getByText("Business summary")).toBeDefined(); + }); + + await fireEvent.click(screen.getByRole("button", { name: "[Edit]" })); + + const summaryEditor = screen.getByRole("textbox"); + await fireEvent.input(summaryEditor, { + target: { value: "Updated analyst-facing summary" }, + }); + + await fireEvent.click(screen.getByRole("button", { name: "Save summary" })); + + expect(screen.getByText("Updated analyst-facing summary")).toBeDefined(); + expect( + screen.getByText("Summary draft updated for the current review workspace."), + ).toBeDefined(); + }); + + it("import_submission_transitions_to_review_state", async () => { + const createdSession = createSessionDetail(); + let resolveSessionLoad; + api.postApi.mockResolvedValue({ session_id: "session-1" }); + api.fetchApi.mockImplementation( + () => + new Promise((resolve) => { + resolveSessionLoad = resolve; + }), + ); + + render(DatasetReviewWorkspace); + + await waitFor(() => { + expect(screen.getByText("Start dataset review")).toBeDefined(); + }); + + const environmentSelect = screen.getByRole("combobox"); + const sourceInput = screen.getByPlaceholderText("https://superset.local/dashboard/10"); + const submitButton = screen.getByRole("button", { name: "Start from link" }); + + await fireEvent.change(environmentSelect, { target: { value: "env-1" } }); + await fireEvent.input(sourceInput, { + target: { value: "https://superset.local/dashboard/10" }, + }); + await fireEvent.click(submitButton); + + await waitFor(() => { + expect(api.postApi).toHaveBeenCalledWith("/dataset-orchestration/sessions", { + source_kind: "superset_link", + source_input: "https://superset.local/dashboard/10", + environment_id: "env-1", + }); + }); + + await waitFor(() => { + expect(screen.getByText("Workspace state: Importing")).toBeDefined(); + }); + expect(screen.getByText("Import progress")).toBeDefined(); + expect(screen.getByText("Recovering dataset context progressively")).toBeDefined(); + expect(screen.getByText("Dataset recognized")).toBeDefined(); + expect(screen.getByText("Saved native filters recovered")).toBeDefined(); + + resolveSessionLoad(createdSession); + + await waitFor(() => { + expect(mockedGoto).toHaveBeenCalledWith("/datasets/review/session-1"); + }); + + await waitFor(() => { + expect(screen.getByText("Workspace state: Review")).toBeDefined(); + }); + expect(screen.getByText("Readiness: Partially ready")).toBeDefined(); + expect(screen.getAllByText("Review documentation").length).toBeGreaterThan(0); + }); + + it("export_feedback_surfaces_success_and_error", async () => { + routeState.id = "session-1"; + api.fetchApi + .mockResolvedValueOnce(createSessionDetail()) + .mockResolvedValueOnce({ + artifact_type: "documentation", + format: "json", + storage_ref: "artifacts/doc.json", + }) + .mockRejectedValueOnce(new Error("Export unavailable")); + + render(DatasetReviewWorkspace); + + await waitFor(() => { + expect(screen.getByText("Exports")).toBeDefined(); + }); + + const documentationJsonButton = screen.getByRole("button", { + name: "Documentation JSON", + }); + const documentationMarkdownButton = screen.getByRole("button", { + name: "Documentation Markdown", + }); + + await fireEvent.click(documentationJsonButton); + + await waitFor(() => { + expect(screen.getAllByText("documentation • json • artifacts/doc.json").length).toBeGreaterThan(0); + }); + + await fireEvent.click(documentationMarkdownButton); + + await waitFor(() => { + expect(screen.getAllByText("Export unavailable").length).toBeGreaterThan(0); + }); + }); +}); +// [/DEF:DatasetReviewWorkspaceUxTests:Module] \ No newline at end of file diff --git a/frontend/vitest.config.js b/frontend/vitest.config.js index 803cca21..25b972d6 100644 --- a/frontend/vitest.config.js +++ b/frontend/vitest.config.js @@ -31,6 +31,7 @@ export default defineConfig({ alias: [ { find: '$app/environment', replacement: path.resolve(__dirname, './src/lib/stores/__tests__/mocks/environment.js') }, { find: '$app/stores', replacement: path.resolve(__dirname, './src/lib/stores/__tests__/mocks/stores.js') }, + { find: '$app/state', replacement: path.resolve(__dirname, './src/lib/stores/__tests__/mocks/state.js') }, { find: '$app/navigation', replacement: path.resolve(__dirname, './src/lib/stores/__tests__/mocks/navigation.js') }, { find: '$env/static/public', replacement: path.resolve(__dirname, './src/lib/stores/__tests__/mocks/env_public.js') }, { find: '$components', replacement: path.resolve(__dirname, './src/components') } diff --git a/specs/027-dataset-llm-orchestration/tasks.md b/specs/027-dataset-llm-orchestration/tasks.md index 9a82b46f..71011a21 100644 --- a/specs/027-dataset-llm-orchestration/tasks.md +++ b/specs/027-dataset-llm-orchestration/tasks.md @@ -7,21 +7,21 @@ ## Phase 1: Setup -- [ ] T001 Initialize backend service directory structure for `dataset_review` in `backend/src/services/dataset_review/` -- [ ] T002 Initialize frontend component directory for `dataset-review` in `frontend/src/lib/components/dataset-review/` -- [ ] T003 Register `ff_dataset_auto_review`, `ff_dataset_clarification`, and `ff_dataset_execution` feature flags in configuration -- [ ] T004 [P] Seed new `DATASET_REVIEW_*` permissions in `backend/src/scripts/seed_permissions.py` +- [x] T001 Initialize backend service directory structure for `dataset_review` in `backend/src/services/dataset_review/` +- [x] T002 Initialize frontend component directory for `dataset-review` in `frontend/src/lib/components/dataset-review/` +- [x] T003 Register `ff_dataset_auto_review`, `ff_dataset_clarification`, and `ff_dataset_execution` feature flags in configuration +- [x] T004 [P] Seed new `DATASET_REVIEW_*` permissions in `backend/src/scripts/seed_permissions.py` --- ## Phase 2: Foundational Layer -- [ ] T005 [P] Implement Core SQLAlchemy models for session, profile, and findings in `backend/src/models/dataset_review.py` -- [ ] T006 [P] Implement Semantic, Mapping, and Clarification models in `backend/src/models/dataset_review.py` -- [ ] T007 [P] Implement Preview and Launch Audit models in `backend/src/models/dataset_review.py` -- [ ] T008 [P] Implement `DatasetReviewSessionRepository` (CRITICAL: C5, PRE: auth scope, POST: consistent aggregates, INVARIANTS: ownership scope) in `backend/src/services/dataset_review/repositories/session_repository.py` -- [ ] T009 [P] Create Pydantic schemas for Session Summary and Detail in `backend/src/schemas/dataset_review.py` -- [ ] T010 [P] Create Svelte store for session management in `frontend/src/lib/stores/datasetReviewSession.js` +- [x] T005 [P] Implement Core SQLAlchemy models for session, profile, and findings in `backend/src/models/dataset_review.py` +- [x] T006 [P] Implement Semantic, Mapping, and Clarification models in `backend/src/models/dataset_review.py` +- [x] T007 [P] Implement Preview and Launch Audit models in `backend/src/models/dataset_review.py` +- [x] T008 [P] Implement `DatasetReviewSessionRepository` (CRITICAL: C5, PRE: auth scope, POST: consistent aggregates, INVARIANTS: ownership scope) in `backend/src/services/dataset_review/repositories/session_repository.py` +- [x] T009 [P] Create Pydantic schemas for Session Summary and Detail in `backend/src/schemas/dataset_review.py` +- [x] T010 [P] Create Svelte store for session management in `frontend/src/lib/stores/datasetReviewSession.js` --- @@ -31,14 +31,14 @@ **Independent Test**: Submit a Superset link; verify session created, summary generated, and findings populated without manual intervention. -- [ ] T011 [P] [US1] Implement `StartSessionRequest` and lifecycle endpoints in `backend/src/api/routes/dataset_review.py` -- [ ] T012 [US1] Implement `DatasetReviewOrchestrator.start_session` (CRITICAL: C5, PRE: non-empty input, POST: enqueued recovery, BELIEF: uses `belief_scope`) in `backend/src/services/dataset_review/orchestrator.py` -- [ ] T013 [P] [US1] Implement `SupersetContextExtractor.parse_superset_link` (CRITICAL: C4, PRE: parseable link, POST: resolved target, REL: uses `SupersetClient`) in `backend/src/core/utils/superset_context_extractor.py` -- [ ] T014 [US1] Implement `SemanticSourceResolver.resolve_from_dictionary` (CRITICAL: C4, PRE: source exists, POST: confidence-ranked candidates) in `backend/src/services/dataset_review/semantic_resolver.py` -- [ ] T015 [US1] Implement Documentation and Validation export endpoints (JSON/Markdown) in `backend/src/api/routes/dataset_review.py` -- [ ] T016 [P] [US1] Implement `SourceIntakePanel` (C3, UX_STATE: Idle/Validating/Rejected) in `frontend/src/lib/components/dataset-review/SourceIntakePanel.svelte` -- [ ] T017 [P] [US1] Implement `ValidationFindingsPanel` (C3, UX_STATE: Blocking/Warning/Info) in `frontend/src/lib/components/dataset-review/ValidationFindingsPanel.svelte` -- [ ] T018 [US1] Create main `DatasetReviewWorkspace` (CRITICAL: C5, UX_STATE: Empty/Importing/Review) in `frontend/src/routes/datasets/review/[id]/+page.svelte` +- [X] T011 [P] [US1] Implement `StartSessionRequest` and lifecycle endpoints in `backend/src/api/routes/dataset_review.py` +- [X] T012 [US1] Implement `DatasetReviewOrchestrator.start_session` (CRITICAL: C5, PRE: non-empty input, POST: enqueued recovery, BELIEF: uses `belief_scope`) in `backend/src/services/dataset_review/orchestrator.py` +- [X] T013 [P] [US1] Implement `SupersetContextExtractor.parse_superset_link` (CRITICAL: C4, PRE: parseable link, POST: resolved target, REL: uses `SupersetClient`) in `backend/src/core/utils/superset_context_extractor.py` +- [X] T014 [US1] Implement `SemanticSourceResolver.resolve_from_dictionary` (CRITICAL: C4, PRE: source exists, POST: confidence-ranked candidates) in `backend/src/services/dataset_review/semantic_resolver.py` +- [X] T015 [US1] Implement Documentation and Validation export endpoints (JSON/Markdown) in `backend/src/api/routes/dataset_review.py` +- [X] T016 [P] [US1] Implement `SourceIntakePanel` (C3, UX_STATE: Idle/Validating/Rejected) in `frontend/src/lib/components/dataset-review/SourceIntakePanel.svelte` +- [X] T017 [P] [US1] Implement `ValidationFindingsPanel` (C3, UX_STATE: Blocking/Warning/Info) in `frontend/src/lib/components/dataset-review/ValidationFindingsPanel.svelte` +- [X] T018 [US1] Create main `DatasetReviewWorkspace` (CRITICAL: C5, UX_STATE: Empty/Importing/Review) in `frontend/src/routes/datasets/review/[id]/+page.svelte` - [ ] T019 [US1] Verify implementation matches ux_reference.md (Happy Path & Errors) - [ ] T020 [US1] Acceptance: Perform semantic audit & algorithm emulation by Tester