feat(us1): add dataset review orchestration automatic review slice

This commit is contained in:
2026-03-17 10:57:49 +03:00
parent e916cb1f17
commit 023bacde39
24 changed files with 4870 additions and 131 deletions

View File

@@ -1,17 +1,18 @@
# [DEF:backend.src.api.routes.__init__:Module]
# [DEF:ApiRoutesModule:Module]
# @COMPLEXITY: 3
# @SEMANTICS: routes, lazy-import, module-registry
# @PURPOSE: Provide lazy route module loading to avoid heavyweight imports during tests.
# @LAYER: API
# @RELATION: DEPENDS_ON -> importlib
# @RELATION: [CALLS] ->[ApiRoutesGetAttr]
# @INVARIANT: Only names listed in __all__ are importable via __getattr__.
__all__ = ['plugins', 'tasks', 'settings', 'connections', 'environments', 'mappings', 'migration', 'git', 'storage', 'admin', 'reports', 'assistant', 'clean_release', 'profile']
__all__ = ['plugins', 'tasks', 'settings', 'connections', 'environments', 'mappings', 'migration', 'git', 'storage', 'admin', 'reports', 'assistant', 'clean_release', 'profile', 'dataset_review']
# [DEF:__getattr__:Function]
# @COMPLEXITY: 1
# [DEF:ApiRoutesGetAttr:Function]
# @COMPLEXITY: 3
# @PURPOSE: Lazily import route module by attribute name.
# @RELATION: [DEPENDS_ON] ->[ApiRoutesModule]
# @PRE: name is module candidate exposed in __all__.
# @POST: Returns imported submodule or raises AttributeError.
def __getattr__(name):
@@ -19,5 +20,5 @@ def __getattr__(name):
import importlib
return importlib.import_module(f".{name}", __name__)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
# [/DEF:__getattr__:Function]
# [/DEF:backend.src.api.routes.__init__:Module]
# [/DEF:ApiRoutesGetAttr:Function]
# [/DEF:ApiRoutesModule:Module]

View File

@@ -0,0 +1,349 @@
# [DEF:DatasetReviewApiTests:Module]
# @COMPLEXITY: 3
# @SEMANTICS: dataset_review, api, tests, lifecycle, exports, orchestration
# @PURPOSE: Verify backend US1 dataset review lifecycle, export, parsing, and dictionary-resolution contracts.
# @LAYER: API
# @RELATION: [BINDS_TO] ->[DatasetReviewApi]
# @RELATION: [BINDS_TO] ->[DatasetReviewOrchestrator]
from datetime import datetime, timezone
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from src.app import app
from src.api.routes.dataset_review import _get_orchestrator, _get_repository
from src.core.config_models import Environment, GlobalSettings, AppConfig
from src.core.utils.superset_context_extractor import SupersetContextExtractor
from src.dependencies import get_config_manager, get_current_user, get_task_manager
from src.models.dataset_review import (
BusinessSummarySource,
ConfidenceState,
DatasetReviewSession,
FindingArea,
FindingSeverity,
ReadinessState,
RecommendedAction,
ResolutionState,
SessionPhase,
SessionStatus,
)
from src.services.dataset_review.orchestrator import DatasetReviewOrchestrator, StartSessionCommand
from src.services.dataset_review.semantic_resolver import SemanticSourceResolver
client = TestClient(app)
# [DEF:_make_user:Function]
def _make_user():
admin_role = SimpleNamespace(name="Admin", permissions=[])
return SimpleNamespace(id="user-1", username="tester", roles=[admin_role])
# [/DEF:_make_user:Function]
# [DEF:_make_config_manager:Function]
def _make_config_manager():
env = Environment(
id="env-1",
name="DEV",
url="http://superset.local",
username="demo",
password="secret",
)
config = AppConfig(environments=[env], settings=GlobalSettings())
manager = MagicMock()
manager.get_environment.side_effect = lambda env_id: env if env_id == "env-1" else None
manager.get_config.return_value = config
return manager
# [/DEF:_make_config_manager:Function]
# [DEF:_make_session:Function]
def _make_session():
now = datetime.now(timezone.utc)
return DatasetReviewSession(
session_id="sess-1",
user_id="user-1",
environment_id="env-1",
source_kind="superset_link",
source_input="http://superset.local/dashboard/10",
dataset_ref="public.sales",
dataset_id=42,
dashboard_id=10,
readiness_state=ReadinessState.REVIEW_READY,
recommended_action=RecommendedAction.REVIEW_DOCUMENTATION,
status=SessionStatus.ACTIVE,
current_phase=SessionPhase.REVIEW,
created_at=now,
updated_at=now,
last_activity_at=now,
)
# [/DEF:_make_session:Function]
# [DEF:dataset_review_api_dependencies:Function]
@pytest.fixture(autouse=True)
def dataset_review_api_dependencies():
mock_user = _make_user()
config_manager = _make_config_manager()
task_manager = MagicMock()
app.dependency_overrides[get_current_user] = lambda: mock_user
app.dependency_overrides[get_config_manager] = lambda: config_manager
app.dependency_overrides[get_task_manager] = lambda: task_manager
yield {
"user": mock_user,
"config_manager": config_manager,
"task_manager": task_manager,
}
app.dependency_overrides.clear()
# [/DEF:dataset_review_api_dependencies:Function]
# [DEF:test_parse_superset_link_dashboard_partial_recovery:Function]
# @PURPOSE: Verify dashboard links recover dataset context and preserve explicit partial-recovery markers.
def test_parse_superset_link_dashboard_partial_recovery():
env = Environment(
id="env-1",
name="DEV",
url="http://superset.local",
username="demo",
password="secret",
)
fake_client = MagicMock()
fake_client.get_dashboard_detail.return_value = {
"datasets": [{"id": 42}, {"id": 77}],
}
fake_client.get_dataset_detail.return_value = {
"table_name": "sales",
"schema": "public",
}
extractor = SupersetContextExtractor(environment=env, client=fake_client)
result = extractor.parse_superset_link(
"http://superset.local/dashboard/10/?native_filters=%5B%7B%22name%22%3A%22country%22%2C%22value%22%3A%22DE%22%7D%5D"
)
assert result.dataset_id == 42
assert result.dashboard_id == 10
assert result.dataset_ref == "public.sales"
assert result.partial_recovery is True
assert "multiple_dashboard_datasets" in result.unresolved_references
assert result.imported_filters[0]["filter_name"] == "country"
# [/DEF:test_parse_superset_link_dashboard_partial_recovery:Function]
# [DEF:test_resolve_from_dictionary_prefers_exact_match:Function]
# @PURPOSE: Verify trusted dictionary exact matches outrank fuzzy candidates and unresolved fields stay explicit.
def test_resolve_from_dictionary_prefers_exact_match():
resolver = SemanticSourceResolver()
result = resolver.resolve_from_dictionary(
{
"source_ref": "dict://finance",
"rows": [
{
"field_name": "revenue",
"verbose_name": "Revenue",
"description": "Recognized revenue amount",
"display_format": "$,.2f",
},
{
"field_name": "revnue",
"verbose_name": "Revenue typo",
"description": "Fuzzy variant",
},
],
},
[
{"field_name": "revenue", "is_locked": False},
{"field_name": "margin", "is_locked": False},
],
)
resolved_exact = next(item for item in result.resolved_fields if item["field_name"] == "revenue")
unresolved = next(item for item in result.resolved_fields if item["field_name"] == "margin")
assert resolved_exact["applied_candidate"]["match_type"] == "exact"
assert resolved_exact["provenance"] == "dictionary_exact"
assert unresolved["status"] == "unresolved"
assert "margin" in result.unresolved_fields
assert result.partial_recovery is True
# [/DEF:test_resolve_from_dictionary_prefers_exact_match:Function]
# [DEF:test_orchestrator_start_session_preserves_partial_recovery:Function]
# @PURPOSE: Verify session start persists usable recovery-required state when Superset intake is partial.
def test_orchestrator_start_session_preserves_partial_recovery(dataset_review_api_dependencies):
repository = MagicMock()
created_session = _make_session()
created_session.readiness_state = ReadinessState.RECOVERY_REQUIRED
created_session.current_phase = SessionPhase.RECOVERY
repository.create_session.return_value = created_session
repository.save_profile_and_findings.return_value = created_session
repository.db = MagicMock()
orchestrator = DatasetReviewOrchestrator(
repository=repository,
config_manager=dataset_review_api_dependencies["config_manager"],
task_manager=None,
)
parsed_context = SimpleNamespace(
dataset_ref="public.sales",
dataset_id=42,
dashboard_id=10,
chart_id=None,
partial_recovery=True,
unresolved_references=["dashboard_dataset_binding_missing"],
)
with patch(
"src.services.dataset_review.orchestrator.SupersetContextExtractor.parse_superset_link",
return_value=parsed_context,
):
result = orchestrator.start_session(
StartSessionCommand(
user=dataset_review_api_dependencies["user"],
environment_id="env-1",
source_kind="superset_link",
source_input="http://superset.local/dashboard/10",
)
)
assert result.session.readiness_state == ReadinessState.RECOVERY_REQUIRED
assert result.findings
assert result.findings[0].severity.value == "warning"
repository.create_session.assert_called_once()
repository.save_profile_and_findings.assert_called_once()
# [/DEF:test_orchestrator_start_session_preserves_partial_recovery:Function]
# [DEF:test_start_session_endpoint_returns_created_summary:Function]
# @PURPOSE: Verify POST session lifecycle endpoint returns a persisted ownership-scoped summary.
def test_start_session_endpoint_returns_created_summary(dataset_review_api_dependencies):
session = _make_session()
orchestrator = MagicMock()
orchestrator.start_session.return_value = SimpleNamespace(session=session, findings=[], parsed_context=None)
app.dependency_overrides[_get_orchestrator] = lambda: orchestrator
response = client.post(
"/api/dataset-orchestration/sessions",
json={
"source_kind": "superset_link",
"source_input": "http://superset.local/dashboard/10",
"environment_id": "env-1",
},
)
assert response.status_code == 201
payload = response.json()
assert payload["session_id"] == "sess-1"
assert payload["dataset_ref"] == "public.sales"
assert payload["environment_id"] == "env-1"
# [/DEF:test_start_session_endpoint_returns_created_summary:Function]
# [DEF:test_get_session_detail_export_and_lifecycle_endpoints:Function]
# @PURPOSE: Verify lifecycle get/patch/delete plus documentation and validation exports remain ownership-scoped and usable.
def test_get_session_detail_export_and_lifecycle_endpoints(dataset_review_api_dependencies):
now = datetime.now(timezone.utc)
session = MagicMock(spec=DatasetReviewSession)
session.session_id = "sess-1"
session.user_id = "user-1"
session.environment_id = "env-1"
session.source_kind = "superset_link"
session.source_input = "http://superset.local/dashboard/10"
session.dataset_ref = "public.sales"
session.dataset_id = 42
session.dashboard_id = 10
session.readiness_state = ReadinessState.REVIEW_READY
session.recommended_action = RecommendedAction.REVIEW_DOCUMENTATION
session.status = SessionStatus.ACTIVE
session.current_phase = SessionPhase.REVIEW
session.created_at = now
session.updated_at = now
session.last_activity_at = now
session.profile = SimpleNamespace(
dataset_name="sales",
business_summary="Summary text",
confidence_state=ConfidenceState.MOSTLY_CONFIRMED,
dataset_type="unknown",
schema_name=None,
database_name=None,
business_summary_source=BusinessSummarySource.IMPORTED,
description=None,
is_sqllab_view=False,
completeness_score=None,
has_blocking_findings=False,
has_warning_findings=True,
manual_summary_locked=False,
created_at=now,
updated_at=now,
profile_id="profile-1",
session_id="sess-1",
)
session.findings = [
SimpleNamespace(
finding_id="f-1",
session_id="sess-1",
area=FindingArea.SOURCE_INTAKE,
severity=FindingSeverity.WARNING,
code="PARTIAL_SUPERSET_RECOVERY",
title="Partial",
message="Some filters require review",
resolution_state=ResolutionState.OPEN,
resolution_note=None,
caused_by_ref=None,
created_at=now,
resolved_at=None,
)
]
session.collaborators = []
session.semantic_sources = []
session.semantic_fields = []
session.imported_filters = []
session.template_variables = []
session.execution_mappings = []
session.clarification_sessions = []
session.previews = []
session.run_contexts = []
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.list_sessions_for_user.return_value = [session]
repository.db = MagicMock()
app.dependency_overrides[_get_repository] = lambda: repository
detail_response = client.get("/api/dataset-orchestration/sessions/sess-1")
assert detail_response.status_code == 200
assert detail_response.json()["session_id"] == "sess-1"
patch_response = client.patch(
"/api/dataset-orchestration/sessions/sess-1",
json={"status": "paused"},
)
assert patch_response.status_code == 200
assert patch_response.json()["status"] == "paused"
doc_response = client.get("/api/dataset-orchestration/sessions/sess-1/exports/documentation?format=json")
assert doc_response.status_code == 200
assert doc_response.json()["artifact_type"] == "documentation"
validation_response = client.get("/api/dataset-orchestration/sessions/sess-1/exports/validation?format=markdown")
assert validation_response.status_code == 200
assert validation_response.json()["artifact_type"] == "validation_report"
assert "Validation Report" in validation_response.json()["content"]["markdown"]
delete_response = client.delete("/api/dataset-orchestration/sessions/sess-1")
assert delete_response.status_code == 204
# [/DEF:test_get_session_detail_export_and_lifecycle_endpoints:Function]
# [/DEF:DatasetReviewApiTests:Module]

View File

@@ -0,0 +1,533 @@
# [DEF:DatasetReviewApi:Module]
# @COMPLEXITY: 4
# @SEMANTICS: dataset_review, api, session_lifecycle, exports, rbac, feature_flags
# @PURPOSE: Expose dataset review session lifecycle and export endpoints for backend US1.
# @LAYER: API
# @RELATION: [DEPENDS_ON] ->[AppDependencies]
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
# @RELATION: [DEPENDS_ON] ->[DatasetReviewOrchestrator]
# @PRE: Authenticated user and valid environment/session scope are required for all mutations and reads.
# @POST: Returns ownership-scoped session state and export payloads with feature-flag/RBAC enforcement.
# @SIDE_EFFECT: Persists session state and may enqueue recovery task.
# @DATA_CONTRACT: Input[HTTP Request] -> Output[SessionSummary | SessionDetail | ExportArtifactResponse | HTTP 204]
# @INVARIANT: No cross-user session leakage is allowed; export payloads only expose the current user's accessible session.
from __future__ import annotations
# [DEF:DatasetReviewApi.imports:Block]
import json
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from src.core.database import get_db
from src.core.logger import belief_scope, logger
from src.dependencies import get_config_manager, get_current_user, get_task_manager, has_permission
from src.models.auth import User
from src.models.dataset_review import (
ArtifactFormat,
DatasetReviewSession,
RecommendedAction,
SessionStatus,
)
from src.schemas.dataset_review import SessionDetail, SessionSummary
from src.services.dataset_review.orchestrator import (
DatasetReviewOrchestrator,
StartSessionCommand,
)
from src.services.dataset_review.repositories.session_repository import (
DatasetReviewSessionRepository,
)
# [/DEF:DatasetReviewApi.imports:Block]
router = APIRouter(prefix="/api/dataset-orchestration", tags=["Dataset Orchestration"])
# [DEF:StartSessionRequest:Class]
# @COMPLEXITY: 2
# @PURPOSE: Request DTO for starting one dataset review session from a Superset link or dataset selection.
class StartSessionRequest(BaseModel):
source_kind: str = Field(..., pattern="^(superset_link|dataset_selection)$")
source_input: str = Field(..., min_length=1)
environment_id: str = Field(..., min_length=1)
# [/DEF:StartSessionRequest:Class]
# [DEF:UpdateSessionRequest:Class]
# @COMPLEXITY: 2
# @PURPOSE: Request DTO for lifecycle state updates on an existing session.
class UpdateSessionRequest(BaseModel):
status: SessionStatus
note: Optional[str] = None
# [/DEF:UpdateSessionRequest:Class]
# [DEF:SessionCollectionResponse:Class]
# @COMPLEXITY: 2
# @PURPOSE: Paginated ownership-scoped dataset review session collection response.
class SessionCollectionResponse(BaseModel):
items: List[SessionSummary]
total: int
page: int
page_size: int
has_next: bool
# [/DEF:SessionCollectionResponse:Class]
# [DEF:ExportArtifactResponse:Class]
# @COMPLEXITY: 2
# @PURPOSE: Inline export response for documentation or validation outputs without introducing unrelated persistence changes.
class ExportArtifactResponse(BaseModel):
artifact_id: str
session_id: str
artifact_type: str
format: str
storage_ref: str
created_by_user_id: str
created_at: Optional[str] = None
content: Dict[str, Any]
# [/DEF:ExportArtifactResponse:Class]
# [DEF:_require_auto_review_flag:Function]
# @COMPLEXITY: 3
# @PURPOSE: Guard US1 dataset review endpoints behind the configured feature flag.
# @RELATION: [DEPENDS_ON] ->[ConfigManager]
def _require_auto_review_flag(config_manager=Depends(get_config_manager)) -> bool:
with belief_scope("dataset_review.require_auto_review_flag"):
if not config_manager.get_config().settings.ff_dataset_auto_review:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Dataset auto review feature is disabled",
)
return True
# [/DEF:_require_auto_review_flag:Function]
# [DEF:_get_repository:Function]
# @COMPLEXITY: 2
# @PURPOSE: Build repository dependency for dataset review session aggregate access.
def _get_repository(db: Session = Depends(get_db)) -> DatasetReviewSessionRepository:
return DatasetReviewSessionRepository(db)
# [/DEF:_get_repository:Function]
# [DEF:_get_orchestrator:Function]
# @COMPLEXITY: 3
# @PURPOSE: Build orchestrator dependency for session lifecycle actions.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewOrchestrator]
def _get_orchestrator(
repository: DatasetReviewSessionRepository = Depends(_get_repository),
config_manager=Depends(get_config_manager),
task_manager=Depends(get_task_manager),
) -> DatasetReviewOrchestrator:
return DatasetReviewOrchestrator(
repository=repository,
config_manager=config_manager,
task_manager=task_manager,
)
# [/DEF:_get_orchestrator:Function]
# [DEF:_serialize_session_summary:Function]
# @COMPLEXITY: 3
# @PURPOSE: Map SQLAlchemy session aggregate root into stable API summary DTO.
# @RELATION: [DEPENDS_ON] ->[SessionSummary]
def _serialize_session_summary(session: DatasetReviewSession) -> SessionSummary:
return SessionSummary.model_validate(session, from_attributes=True)
# [/DEF:_serialize_session_summary:Function]
# [DEF:_serialize_session_detail:Function]
# @COMPLEXITY: 3
# @PURPOSE: Map SQLAlchemy session aggregate root into stable API detail DTO.
# @RELATION: [DEPENDS_ON] ->[SessionDetail]
def _serialize_session_detail(session: DatasetReviewSession) -> SessionDetail:
return SessionDetail.model_validate(session, from_attributes=True)
# [/DEF:_serialize_session_detail:Function]
# [DEF:_get_owned_session_or_404:Function]
# @COMPLEXITY: 4
# @PURPOSE: Resolve one session for current user or collaborator scope, returning 404 when inaccessible.
# @RELATION: [CALLS] ->[load_detail]
# @PRE: session_id is a non-empty identifier and current_user is authenticated.
# @POST: returns accessible session detail or raises HTTP 404 without leaking foreign-session existence.
# @SIDE_EFFECT: none.
# @DATA_CONTRACT: Input[session_id:str,current_user:User] -> Output[DatasetReviewSession|HTTPException]
def _get_owned_session_or_404(
repository: DatasetReviewSessionRepository,
session_id: str,
current_user: User,
) -> DatasetReviewSession:
with belief_scope("dataset_review.get_owned_session_or_404"):
session = repository.load_session_detail(session_id, current_user.id)
if session is None:
logger.explore(
"Dataset review session not found in current ownership scope",
extra={"session_id": session_id, "user_id": current_user.id},
)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
return session
# [/DEF:_get_owned_session_or_404:Function]
# [DEF:_build_documentation_export:Function]
# @COMPLEXITY: 3
# @PURPOSE: Produce session documentation export content from current persisted review state.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
def _build_documentation_export(session: DatasetReviewSession, export_format: ArtifactFormat) -> Dict[str, Any]:
profile = session.profile
findings = sorted(session.findings, key=lambda item: (item.severity.value, item.code))
if export_format == ArtifactFormat.MARKDOWN:
lines = [
f"# Dataset Review: {session.dataset_ref}",
"",
f"- Session ID: {session.session_id}",
f"- Environment: {session.environment_id}",
f"- Readiness: {session.readiness_state.value}",
f"- Recommended action: {session.recommended_action.value}",
"",
"## Business Summary",
profile.business_summary if profile else "No profile summary available.",
"",
"## Findings",
]
if findings:
for finding in findings:
lines.append(
f"- [{finding.severity.value}] {finding.title}: {finding.message}"
)
else:
lines.append("- No findings recorded.")
content = {"markdown": "\n".join(lines)}
storage_ref = f"inline://dataset-review/{session.session_id}/documentation.md"
else:
content = {
"session": _serialize_session_summary(session).model_dump(mode="json"),
"profile": profile and {
"dataset_name": profile.dataset_name,
"business_summary": profile.business_summary,
"confidence_state": profile.confidence_state.value,
"dataset_type": profile.dataset_type,
},
"findings": [
{
"code": finding.code,
"severity": finding.severity.value,
"title": finding.title,
"message": finding.message,
"resolution_state": finding.resolution_state.value,
}
for finding in findings
],
}
storage_ref = f"inline://dataset-review/{session.session_id}/documentation.json"
return {"storage_ref": storage_ref, "content": content}
# [/DEF:_build_documentation_export:Function]
# [DEF:_build_validation_export:Function]
# @COMPLEXITY: 3
# @PURPOSE: Produce validation-focused export content from persisted findings and readiness state.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
def _build_validation_export(session: DatasetReviewSession, export_format: ArtifactFormat) -> Dict[str, Any]:
findings = sorted(session.findings, key=lambda item: (item.severity.value, item.code))
if export_format == ArtifactFormat.MARKDOWN:
lines = [
f"# Validation Report: {session.dataset_ref}",
"",
f"- Session ID: {session.session_id}",
f"- Readiness: {session.readiness_state.value}",
"",
"## Findings",
]
if findings:
for finding in findings:
lines.append(
f"- `{finding.code}` [{finding.severity.value}] {finding.message}"
)
else:
lines.append("- No findings recorded.")
content = {"markdown": "\n".join(lines)}
storage_ref = f"inline://dataset-review/{session.session_id}/validation.md"
else:
content = {
"session_id": session.session_id,
"dataset_ref": session.dataset_ref,
"readiness_state": session.readiness_state.value,
"findings": [
{
"finding_id": finding.finding_id,
"area": finding.area.value,
"severity": finding.severity.value,
"code": finding.code,
"title": finding.title,
"message": finding.message,
"resolution_state": finding.resolution_state.value,
}
for finding in findings
],
}
storage_ref = f"inline://dataset-review/{session.session_id}/validation.json"
return {"storage_ref": storage_ref, "content": content}
# [/DEF:_build_validation_export:Function]
# [DEF:list_sessions:Function]
# @COMPLEXITY: 3
# @PURPOSE: List resumable dataset review sessions for the current user.
# @RELATION: [CALLS] ->[list_user_sess]
@router.get(
"/sessions",
response_model=SessionCollectionResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def list_sessions(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.list_sessions"):
sessions = repository.list_sessions_for_user(current_user.id)
start = (page - 1) * page_size
end = start + page_size
items = [_serialize_session_summary(session) for session in sessions[start:end]]
return SessionCollectionResponse(
items=items,
total=len(sessions),
page=page,
page_size=page_size,
has_next=end < len(sessions),
)
# [/DEF:list_sessions:Function]
# [DEF:start_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Start a new dataset review session from a Superset link or dataset selection.
# @RELATION: [CALLS] ->[DatasetReviewOrchestrator.start_session]
# @PRE: feature flag enabled, user authenticated, and request body valid.
# @POST: returns persisted session summary scoped to the authenticated user.
# @SIDE_EFFECT: persists session/profile/findings and may enqueue recovery task.
# @DATA_CONTRACT: Input[StartSessionRequest] -> Output[SessionSummary]
@router.post(
"/sessions",
response_model=SessionSummary,
status_code=status.HTTP_201_CREATED,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def start_session(
request: StartSessionRequest,
orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.start_session"):
try:
result = orchestrator.start_session(
StartSessionCommand(
user=current_user,
environment_id=request.environment_id,
source_kind=request.source_kind,
source_input=request.source_input,
)
)
except ValueError as exc:
logger.explore(
"Dataset review session start rejected",
extra={"user_id": current_user.id, "error": str(exc)},
)
detail = str(exc)
status_code = status.HTTP_404_NOT_FOUND if detail == "Environment not found" else status.HTTP_400_BAD_REQUEST
raise HTTPException(status_code=status_code, detail=detail) from exc
return _serialize_session_summary(result.session)
# [/DEF:start_session:Function]
# [DEF:get_session_detail:Function]
# @COMPLEXITY: 3
# @PURPOSE: Return the full accessible dataset review session aggregate for current user scope.
# @RELATION: [CALLS] ->[_get_owned_session_or_404]
@router.get(
"/sessions/{session_id}",
response_model=SessionDetail,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def get_session_detail(
session_id: str,
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.get_session_detail"):
session = _get_owned_session_or_404(repository, session_id, current_user)
return _serialize_session_detail(session)
# [/DEF:get_session_detail:Function]
# [DEF:update_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Update resumable lifecycle status for an owned dataset review session.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
# @PRE: session is accessible to current user and requested status is allowed by lifecycle policy.
# @POST: returns updated summary without changing ownership or unrelated aggregates.
# @SIDE_EFFECT: mutates session lifecycle fields in persistence.
# @DATA_CONTRACT: Input[UpdateSessionRequest] -> Output[SessionSummary]
@router.patch(
"/sessions/{session_id}",
response_model=SessionSummary,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def update_session(
session_id: str,
request: UpdateSessionRequest,
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.update_session"):
session = _get_owned_session_or_404(repository, session_id, current_user)
if session.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only the owner can mutate session lifecycle")
session.status = request.status
if request.status == SessionStatus.PAUSED:
session.recommended_action = RecommendedAction.RESUME_SESSION
elif request.status in {SessionStatus.ARCHIVED, SessionStatus.CANCELLED, SessionStatus.COMPLETED}:
session.active_task_id = None
repository.db.commit()
repository.db.refresh(session)
return _serialize_session_summary(session)
# [/DEF:update_session:Function]
# [DEF:delete_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Archive or hard-delete a session owned by the current user.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
# @PRE: session is owner-scoped to current user.
# @POST: session is archived or deleted and no foreign-session existence is disclosed.
# @SIDE_EFFECT: mutates or deletes persisted session aggregate.
# @DATA_CONTRACT: Input[session_id:str,hard_delete:bool] -> Output[HTTP 204]
@router.delete(
"/sessions/{session_id}",
status_code=status.HTTP_204_NO_CONTENT,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def delete_session(
session_id: str,
hard_delete: bool = Query(False),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.delete_session"):
session = _get_owned_session_or_404(repository, session_id, current_user)
if session.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only the owner can delete a session")
if hard_delete:
repository.db.delete(session)
else:
session.status = SessionStatus.ARCHIVED
session.active_task_id = None
repository.db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
# [/DEF:delete_session:Function]
# [DEF:export_documentation:Function]
# @COMPLEXITY: 4
# @PURPOSE: Export documentation output for the current session in JSON or Markdown form.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
# @PRE: session is accessible to current user and requested format is supported.
# @POST: returns ownership-scoped export payload without fabricating unrelated artifacts.
# @SIDE_EFFECT: none beyond response construction.
# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse]
@router.get(
"/sessions/{session_id}/exports/documentation",
response_model=ExportArtifactResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def export_documentation(
session_id: str,
format: ArtifactFormat = Query(ArtifactFormat.JSON),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.export_documentation"):
if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Only json and markdown exports are supported")
session = _get_owned_session_or_404(repository, session_id, current_user)
export_payload = _build_documentation_export(session, format)
return ExportArtifactResponse(
artifact_id=f"documentation-{session.session_id}-{format.value}",
session_id=session.session_id,
artifact_type="documentation",
format=format.value,
storage_ref=export_payload["storage_ref"],
created_by_user_id=current_user.id,
content=export_payload["content"],
)
# [/DEF:export_documentation:Function]
# [DEF:export_validation:Function]
# @COMPLEXITY: 4
# @PURPOSE: Export validation findings for the current session in JSON or Markdown form.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
# @PRE: session is accessible to current user and requested format is supported.
# @POST: returns explicit validation export payload scoped to current user session access.
# @SIDE_EFFECT: none beyond response construction.
# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse]
@router.get(
"/sessions/{session_id}/exports/validation",
response_model=ExportArtifactResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def export_validation(
session_id: str,
format: ArtifactFormat = Query(ArtifactFormat.JSON),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.export_validation"):
if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Only json and markdown exports are supported")
session = _get_owned_session_or_404(repository, session_id, current_user)
export_payload = _build_validation_export(session, format)
return ExportArtifactResponse(
artifact_id=f"validation-{session.session_id}-{format.value}",
session_id=session.session_id,
artifact_type="validation_report",
format=format.value,
storage_ref=export_payload["storage_ref"],
created_by_user_id=current_user.id,
content=export_payload["content"],
)
# [/DEF:export_validation:Function]
# [/DEF:DatasetReviewApi:Module]

View File

@@ -3,8 +3,8 @@
# @SEMANTICS: app, main, entrypoint, fastapi
# @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming.
# @LAYER: UI (API)
# @RELATION: DEPENDS_ON ->[AppDependencies]
# @RELATION: DEPENDS_ON ->[backend.src.api.routes]
# @RELATION: [DEPENDS_ON] ->[AppDependencies]
# @RELATION: [DEPENDS_ON] ->[ApiRoutesModule]
# @INVARIANT: Only one FastAPI app instance exists per process.
# @INVARIANT: All WebSocket connections must be properly cleaned up on disconnect.
# @PRE: Python environment and dependencies installed; configuration database available.
@@ -28,7 +28,7 @@ from .dependencies import get_task_manager, get_scheduler_service
from .core.encryption_key import ensure_encryption_key
from .core.utils.network import NetworkError
from .core.logger import logger, belief_scope
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile, health
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports, assistant, clean_release, clean_release_v2, profile, health, dataset_review
from .api import auth
# [DEF:App:Global]
@@ -45,6 +45,7 @@ app = FastAPI(
# [DEF:startup_event:Function]
# @COMPLEXITY: 3
# @PURPOSE: Handles application startup tasks, such as starting the scheduler.
# @RELATION: [CALLS] ->[AppDependencies]
# @PRE: None.
# @POST: Scheduler is started.
# Startup event
@@ -59,6 +60,7 @@ async def startup_event():
# [DEF:shutdown_event:Function]
# @COMPLEXITY: 3
# @PURPOSE: Handles application shutdown tasks, such as stopping the scheduler.
# @RELATION: [CALLS] ->[AppDependencies]
# @PRE: None.
# @POST: Scheduler is stopped.
# Shutdown event
@@ -106,6 +108,7 @@ async def network_error_handler(request: Request, exc: NetworkError):
# [DEF:log_requests:Function]
# @COMPLEXITY: 3
# @PURPOSE: Middleware to log incoming HTTP requests and their response status.
# @RELATION: [DEPENDS_ON] ->[LoggerModule]
# @PRE: request is a FastAPI Request object.
# @POST: Logs request and response details.
# @PARAM: request (Request) - The incoming request object.
@@ -154,6 +157,7 @@ app.include_router(assistant.router, prefix="/api/assistant", tags=["Assistant"]
app.include_router(clean_release.router)
app.include_router(clean_release_v2.router)
app.include_router(profile.router)
app.include_router(dataset_review.router)
app.include_router(health.router)
# [/DEF:api_routes:Block]
@@ -168,10 +172,13 @@ app.include_router(health.router)
# [DEF:websocket_endpoint:Function]
# @COMPLEXITY: 5
# @PURPOSE: Provides a WebSocket endpoint for real-time log streaming of a task with server-side filtering.
# @RELATION: [CALLS] ->[TaskManagerPackage]
# @RELATION: [DEPENDS_ON] ->[LoggerModule]
# @PRE: task_id must be a valid task ID.
# @POST: WebSocket connection is managed and logs are streamed until disconnect.
# @SIDE_EFFECT: Subscribes to TaskManager log queue and broadcasts messages over network.
# @DATA_CONTRACT: [task_id: str, source: str, level: str] -> [JSON log entry objects]
# @INVARIANT: Every accepted WebSocket subscription is unsubscribed exactly once even when streaming fails or the client disconnects.
# @UX_STATE: Connecting -> Streaming -> (Disconnected)
#
# @TEST_CONTRACT: WebSocketLogStreamApi ->
@@ -204,85 +211,121 @@ async def websocket_endpoint(
"""
with belief_scope("websocket_endpoint", f"task_id={task_id}"):
await websocket.accept()
# Normalize filter parameters
source_filter = source.lower() if source else None
level_filter = level.upper() if level else None
# Level hierarchy for filtering
level_hierarchy = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
min_level = level_hierarchy.get(level_filter, 0) if level_filter else 0
logger.info(f"WebSocket connection accepted for task {task_id} (source={source_filter}, level={level_filter})")
task_manager = get_task_manager()
queue = await task_manager.subscribe_logs(task_id)
def matches_filters(log_entry) -> bool:
"""Check if log entry matches the filter criteria."""
# Check source filter
if source_filter and log_entry.source.lower() != source_filter:
return False
# Check level filter
if level_filter:
log_level = level_hierarchy.get(log_entry.level.upper(), 0)
if log_level < min_level:
source_filter = source.lower() if source else None
level_filter = level.upper() if level else None
level_hierarchy = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
min_level = level_hierarchy.get(level_filter, 0) if level_filter else 0
logger.reason(
"Accepted WebSocket log stream connection",
extra={
"task_id": task_id,
"source_filter": source_filter,
"level_filter": level_filter,
"min_level": min_level,
},
)
task_manager = get_task_manager()
queue = await task_manager.subscribe_logs(task_id)
logger.reason(
"Subscribed WebSocket client to task log queue",
extra={"task_id": task_id},
)
def matches_filters(log_entry) -> bool:
"""Check if log entry matches the filter criteria."""
log_source = getattr(log_entry, "source", None)
if source_filter and str(log_source or "").lower() != source_filter:
return False
return True
try:
# Stream new logs
logger.info(f"Starting log stream for task {task_id}")
# Send initial logs first to build context (apply filters)
initial_logs = task_manager.get_task_logs(task_id)
for log_entry in initial_logs:
if matches_filters(log_entry):
if level_filter:
log_level = level_hierarchy.get(str(log_entry.level).upper(), 0)
if log_level < min_level:
return False
return True
try:
logger.reason(
"Starting task log stream replay and live forwarding",
extra={"task_id": task_id},
)
initial_logs = task_manager.get_task_logs(task_id)
initial_sent = 0
for log_entry in initial_logs:
if matches_filters(log_entry):
log_dict = log_entry.dict()
log_dict["timestamp"] = log_dict["timestamp"].isoformat()
await websocket.send_json(log_dict)
initial_sent += 1
logger.reflect(
"Initial task log replay completed",
extra={
"task_id": task_id,
"replayed_logs": initial_sent,
"total_available_logs": len(initial_logs),
},
)
task = task_manager.get_task(task_id)
if task and task.status == "AWAITING_INPUT" and task.input_request:
synthetic_log = {
"timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00",
"level": "INFO",
"message": "Task paused for user input (Connection Re-established)",
"context": {"input_request": task.input_request},
}
await websocket.send_json(synthetic_log)
logger.reason(
"Replayed awaiting-input prompt to restored WebSocket client",
extra={"task_id": task_id, "task_status": task.status},
)
while True:
log_entry = await queue.get()
if not matches_filters(log_entry):
continue
log_dict = log_entry.dict()
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
log_dict["timestamp"] = log_dict["timestamp"].isoformat()
await websocket.send_json(log_dict)
logger.reflect(
"Forwarded task log entry to WebSocket client",
extra={
"task_id": task_id,
"level": log_dict.get("level"),
},
)
# Force a check for AWAITING_INPUT status immediately upon connection
# This ensures that if the task is already waiting when the user connects, they get the prompt.
task = task_manager.get_task(task_id)
if task and task.status == "AWAITING_INPUT" and task.input_request:
# Construct a synthetic log entry to trigger the frontend handler
# This is a bit of a hack but avoids changing the websocket protocol significantly
synthetic_log = {
"timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00",
"level": "INFO",
"message": "Task paused for user input (Connection Re-established)",
"context": {"input_request": task.input_request}
}
await websocket.send_json(synthetic_log)
if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message:
logger.reason(
"Observed terminal task log entry; delaying to preserve client visibility",
extra={"task_id": task_id, "message": log_entry.message},
)
await asyncio.sleep(2)
while True:
log_entry = await queue.get()
# Apply server-side filtering
if not matches_filters(log_entry):
continue
log_dict = log_entry.dict()
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
await websocket.send_json(log_dict)
# If task is finished, we could potentially close the connection
# but let's keep it open for a bit or until the client disconnects
if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message:
# Wait a bit to ensure client receives the last message
await asyncio.sleep(2)
# DO NOT BREAK here - allow client to keep connection open if they want to review logs
# or until they disconnect. Breaking closes the socket immediately.
# break
except WebSocketDisconnect:
logger.info(f"WebSocket connection disconnected for task {task_id}")
except Exception as e:
logger.error(f"WebSocket error for task {task_id}: {e}")
finally:
task_manager.unsubscribe_logs(task_id, queue)
except WebSocketDisconnect:
logger.reason(
"WebSocket client disconnected from task log stream",
extra={"task_id": task_id},
)
except Exception as exc:
logger.explore(
"WebSocket log streaming encountered an unexpected failure",
extra={"task_id": task_id, "error": str(exc)},
)
raise
finally:
task_manager.unsubscribe_logs(task_id, queue)
logger.reflect(
"Released WebSocket log queue subscription",
extra={"task_id": task_id},
)
# [/DEF:websocket_endpoint:Function]
# [DEF:StaticFiles:Mount]

View File

@@ -0,0 +1,334 @@
# [DEF:SupersetContextExtractor:Module]
# @COMPLEXITY: 4
# @SEMANTICS: dataset_review, superset, link_parsing, context_recovery, partial_recovery
# @PURPOSE: Recover dataset and dashboard context from Superset links while preserving explicit partial-recovery markers.
# @LAYER: Infra
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient:Class]
# @RELATION: [DEPENDS_ON] ->[ImportedFilter]
# @RELATION: [DEPENDS_ON] ->[TemplateVariable]
# @PRE: Superset link or dataset reference must be parseable enough to resolve an environment-scoped target resource.
# @POST: Returns the best available recovered context with explicit provenance and partial-recovery markers when necessary.
# @SIDE_EFFECT: Performs upstream Superset API reads.
# @INVARIANT: Partial recovery is surfaced explicitly and never misrepresented as fully confirmed context.
from __future__ import annotations
# [DEF:SupersetContextExtractor.imports:Block]
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs, unquote, urlparse
from src.core.config_models import Environment
from src.core.logger import belief_scope, logger
from src.core.superset_client import SupersetClient
# [/DEF:SupersetContextExtractor.imports:Block]
# [DEF:SupersetParsedContext:Class]
# @COMPLEXITY: 2
# @PURPOSE: Normalized output of Superset link parsing for session intake and recovery.
@dataclass
class SupersetParsedContext:
source_url: str
dataset_ref: str
dataset_id: Optional[int] = None
dashboard_id: Optional[int] = None
chart_id: Optional[int] = None
resource_type: str = "unknown"
query_state: Dict[str, Any] = field(default_factory=dict)
imported_filters: List[Dict[str, Any]] = field(default_factory=list)
unresolved_references: List[str] = field(default_factory=list)
partial_recovery: bool = False
# [/DEF:SupersetParsedContext:Class]
# [DEF:SupersetContextExtractor:Class]
# @COMPLEXITY: 4
# @PURPOSE: Parse supported Superset URLs and recover canonical dataset/dashboard references for review-session intake.
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient]
# @PRE: constructor receives a configured environment with a usable Superset base URL.
# @POST: extractor instance is ready to parse links against one Superset environment.
# @SIDE_EFFECT: downstream parse operations may call Superset APIs through SupersetClient.
class SupersetContextExtractor:
# [DEF:SupersetContextExtractor.__init__:Function]
# @COMPLEXITY: 2
# @PURPOSE: Bind extractor to one Superset environment and client instance.
def __init__(self, environment: Environment, client: Optional[SupersetClient] = None) -> None:
self.environment = environment
self.client = client or SupersetClient(environment)
# [/DEF:SupersetContextExtractor.__init__:Function]
# [DEF:SupersetContextExtractor.parse_superset_link:Function]
# @COMPLEXITY: 4
# @PURPOSE: Extract candidate identifiers and query state from supported Superset URLs.
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient]
# @PRE: link is a non-empty Superset URL compatible with the configured environment.
# @POST: returns resolved dataset/dashboard context, preserving explicit partial-recovery state if some identifiers cannot be confirmed.
# @SIDE_EFFECT: may issue Superset API reads to resolve dataset references from dashboard or chart URLs.
# @DATA_CONTRACT: Input[link:str] -> Output[SupersetParsedContext]
def parse_superset_link(self, link: str) -> SupersetParsedContext:
with belief_scope("SupersetContextExtractor.parse_superset_link"):
normalized_link = str(link or "").strip()
if not normalized_link:
logger.explore("Rejected empty Superset link during intake")
raise ValueError("Superset link must be non-empty")
parsed_url = urlparse(normalized_link)
if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc:
logger.explore(
"Superset link is not a parseable absolute URL",
extra={"link": normalized_link},
)
raise ValueError("Superset link must be an absolute http(s) URL")
logger.reason(
"Parsing Superset link for dataset review intake",
extra={"path": parsed_url.path, "query": parsed_url.query},
)
path_parts = [part for part in parsed_url.path.split("/") if part]
query_params = parse_qs(parsed_url.query, keep_blank_values=True)
query_state = self._decode_query_state(query_params)
dataset_id = self._extract_numeric_identifier(path_parts, "dataset")
dashboard_id = self._extract_numeric_identifier(path_parts, "dashboard")
chart_id = self._extract_numeric_identifier(path_parts, "chart")
resource_type = "unknown"
dataset_ref: Optional[str] = None
partial_recovery = False
unresolved_references: List[str] = []
if dataset_id is not None:
resource_type = "dataset"
dataset_ref = f"dataset:{dataset_id}"
logger.reason(
"Resolved direct dataset link",
extra={"dataset_id": dataset_id},
)
elif dashboard_id is not None:
resource_type = "dashboard"
logger.reason(
"Resolving dashboard-bound dataset from Superset",
extra={"dashboard_id": dashboard_id},
)
dashboard_detail = self.client.get_dashboard_detail(dashboard_id)
datasets = dashboard_detail.get("datasets") or []
if datasets:
first_dataset = datasets[0]
resolved_dataset_id = first_dataset.get("id")
if resolved_dataset_id is not None:
dataset_id = int(resolved_dataset_id)
dataset_ref = f"dataset:{dataset_id}"
logger.reason(
"Recovered dataset reference from dashboard context",
extra={
"dashboard_id": dashboard_id,
"dataset_id": dataset_id,
"dataset_count": len(datasets),
},
)
if len(datasets) > 1:
partial_recovery = True
unresolved_references.append("multiple_dashboard_datasets")
else:
partial_recovery = True
unresolved_references.append("dashboard_dataset_id_missing")
else:
partial_recovery = True
unresolved_references.append("dashboard_dataset_binding_missing")
elif chart_id is not None:
resource_type = "chart"
partial_recovery = True
unresolved_references.append("chart_dataset_binding_unresolved")
dataset_ref = f"chart:{chart_id}"
logger.reason(
"Accepted chart link with explicit partial recovery",
extra={"chart_id": chart_id},
)
else:
logger.explore(
"Unsupported Superset link shape encountered",
extra={"path": parsed_url.path},
)
raise ValueError("Unsupported Superset link shape")
if dataset_id is not None:
try:
dataset_detail = self.client.get_dataset_detail(dataset_id)
table_name = str(dataset_detail.get("table_name") or "").strip()
schema_name = str(dataset_detail.get("schema") or "").strip()
if table_name:
dataset_ref = (
f"{schema_name}.{table_name}" if schema_name else table_name
)
logger.reason(
"Canonicalized dataset reference from dataset detail",
extra={"dataset_ref": dataset_ref, "dataset_id": dataset_id},
)
except Exception as exc:
partial_recovery = True
unresolved_references.append("dataset_detail_lookup_failed")
logger.explore(
"Dataset detail lookup failed during link parsing; keeping session usable",
extra={"dataset_id": dataset_id, "error": str(exc)},
)
imported_filters = self._extract_imported_filters(query_state)
result = SupersetParsedContext(
source_url=normalized_link,
dataset_ref=dataset_ref or "unresolved",
dataset_id=dataset_id,
dashboard_id=dashboard_id,
chart_id=chart_id,
resource_type=resource_type,
query_state=query_state,
imported_filters=imported_filters,
unresolved_references=unresolved_references,
partial_recovery=partial_recovery,
)
logger.reflect(
"Superset link parsing completed",
extra={
"dataset_ref": result.dataset_ref,
"dataset_id": result.dataset_id,
"dashboard_id": result.dashboard_id,
"chart_id": result.chart_id,
"partial_recovery": result.partial_recovery,
"unresolved_references": result.unresolved_references,
"imported_filters": len(result.imported_filters),
},
)
return result
# [/DEF:SupersetContextExtractor.parse_superset_link:Function]
# [DEF:SupersetContextExtractor.recover_imported_filters:Function]
# @COMPLEXITY: 2
# @PURPOSE: Build imported filter entries from URL state and Superset-side saved context.
def recover_imported_filters(self, parsed_context: SupersetParsedContext) -> List[Dict[str, Any]]:
return list(parsed_context.imported_filters)
# [/DEF:SupersetContextExtractor.recover_imported_filters:Function]
# [DEF:SupersetContextExtractor.discover_template_variables:Function]
# @COMPLEXITY: 2
# @PURPOSE: Detect runtime variables and Jinja references from dataset query-bearing fields.
def discover_template_variables(self, dataset_payload: Dict[str, Any]) -> List[Dict[str, Any]]:
return []
# [/DEF:SupersetContextExtractor.discover_template_variables:Function]
# [DEF:SupersetContextExtractor.build_recovery_summary:Function]
# @COMPLEXITY: 2
# @PURPOSE: Summarize recovered, partial, and unresolved context for session state and UX.
def build_recovery_summary(self, parsed_context: SupersetParsedContext) -> Dict[str, Any]:
return {
"dataset_ref": parsed_context.dataset_ref,
"dataset_id": parsed_context.dataset_id,
"dashboard_id": parsed_context.dashboard_id,
"chart_id": parsed_context.chart_id,
"partial_recovery": parsed_context.partial_recovery,
"unresolved_references": list(parsed_context.unresolved_references),
"imported_filter_count": len(parsed_context.imported_filters),
}
# [/DEF:SupersetContextExtractor.build_recovery_summary:Function]
# [DEF:SupersetContextExtractor._extract_numeric_identifier:Function]
# @COMPLEXITY: 2
# @PURPOSE: Extract a numeric identifier from a REST-like Superset URL path.
def _extract_numeric_identifier(self, path_parts: List[str], resource_name: str) -> Optional[int]:
if resource_name not in path_parts:
return None
try:
resource_index = path_parts.index(resource_name)
except ValueError:
return None
if resource_index + 1 >= len(path_parts):
return None
candidate = str(path_parts[resource_index + 1]).strip()
if not candidate.isdigit():
return None
return int(candidate)
# [/DEF:SupersetContextExtractor._extract_numeric_identifier:Function]
# [DEF:SupersetContextExtractor._decode_query_state:Function]
# @COMPLEXITY: 2
# @PURPOSE: Decode query-string structures used by Superset URL state transport.
def _decode_query_state(self, query_params: Dict[str, List[str]]) -> Dict[str, Any]:
query_state: Dict[str, Any] = {}
for key, values in query_params.items():
if not values:
continue
raw_value = values[-1]
decoded_value = unquote(raw_value)
if key in {"native_filters", "native_filters_key", "form_data", "q"}:
try:
query_state[key] = json.loads(decoded_value)
continue
except Exception:
logger.explore(
"Failed to decode structured Superset query state; preserving raw value",
extra={"key": key},
)
query_state[key] = decoded_value
return query_state
# [/DEF:SupersetContextExtractor._decode_query_state:Function]
# [DEF:SupersetContextExtractor._extract_imported_filters:Function]
# @COMPLEXITY: 2
# @PURPOSE: Normalize imported filters from decoded query state without fabricating missing values.
def _extract_imported_filters(self, query_state: Dict[str, Any]) -> List[Dict[str, Any]]:
imported_filters: List[Dict[str, Any]] = []
native_filters_payload = query_state.get("native_filters")
if isinstance(native_filters_payload, list):
for index, item in enumerate(native_filters_payload):
if not isinstance(item, dict):
continue
filter_name = (
item.get("filter_name")
or item.get("column")
or item.get("name")
or f"native_filter_{index}"
)
imported_filters.append(
{
"filter_name": str(filter_name),
"raw_value": item.get("value"),
"display_name": item.get("label") or item.get("name"),
"source": "superset_url",
"recovery_status": "recovered"
if item.get("value") is not None
else "partial",
"requires_confirmation": item.get("value") is None,
"notes": "Recovered from Superset native filter URL state",
}
)
form_data_payload = query_state.get("form_data")
if isinstance(form_data_payload, dict):
extra_filters = form_data_payload.get("extra_filters") or []
for index, item in enumerate(extra_filters):
if not isinstance(item, dict):
continue
filter_name = item.get("col") or item.get("column") or f"extra_filter_{index}"
imported_filters.append(
{
"filter_name": str(filter_name),
"raw_value": item.get("val"),
"display_name": item.get("label"),
"source": "superset_url",
"recovery_status": "recovered"
if item.get("val") is not None
else "partial",
"requires_confirmation": item.get("val") is None,
"notes": "Recovered from Superset form_data extra_filters",
}
)
return imported_filters
# [/DEF:SupersetContextExtractor._extract_imported_filters:Function]
# [/DEF:SupersetContextExtractor:Class]
# [/DEF:SupersetContextExtractor:Module]

View File

@@ -5,7 +5,6 @@
# @SEMANTICS: dataset_review, session, profile, findings, semantics, clarification, execution, sqlalchemy
# @PURPOSE: SQLAlchemy models for the dataset review orchestration flow.
# @LAYER: Domain
# @RELATION: INHERITS_FROM -> [Base]
# @RELATION: DEPENDS_ON -> [AuthModels]
# @RELATION: DEPENDS_ON -> [MappingModels]
#

View File

@@ -4,7 +4,7 @@
# @SEMANTICS: dataset_review, schemas, pydantic, session, profile, findings
# @PURPOSE: Defines API schemas for the dataset review orchestration flow.
# @LAYER: API
# @RELATION: DEPENDS_ON -> pydantic
# @RELATION: DEPENDS_ON -> [DatasetReviewModels]
# [SECTION: IMPORTS]
from datetime import datetime

View File

@@ -0,0 +1,386 @@
# [DEF:DatasetReviewOrchestrator:Module]
# @COMPLEXITY: 5
# @SEMANTICS: dataset_review, orchestration, session_lifecycle, intake, recovery
# @PURPOSE: Coordinate dataset review session startup and lifecycle-safe intake recovery for one authenticated user.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
# @RELATION: [DEPENDS_ON] ->[SemanticSourceResolver]
# @RELATION: [DEPENDS_ON] ->[ClarificationEngine]
# @RELATION: [DEPENDS_ON] ->[SupersetContextExtractor]
# @RELATION: [DEPENDS_ON] ->[SupersetCompilationAdapter]
# @RELATION: [DEPENDS_ON] ->[TaskManager]
# @PRE: session mutations must execute inside a persisted session boundary scoped to one authenticated user.
# @POST: state transitions are persisted atomically and emit observable progress for long-running steps.
# @SIDE_EFFECT: creates task records, updates session aggregates, triggers upstream Superset calls, persists audit artifacts.
# @DATA_CONTRACT: Input[SessionCommand] -> Output[DatasetReviewSession | CompiledPreview | DatasetRunContext]
# @INVARIANT: Launch is blocked unless a current session has no open blocking findings, all launch-sensitive mappings are approved, and a non-stale Superset-generated compiled preview matches the current input fingerprint.
from __future__ import annotations
# [DEF:DatasetReviewOrchestrator.imports:Block]
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from src.core.config_manager import ConfigManager
from src.core.logger import belief_scope, logger
from src.core.task_manager import TaskManager
from src.core.utils.superset_context_extractor import (
SupersetContextExtractor,
SupersetParsedContext,
)
from src.models.auth import User
from src.models.dataset_review import (
BusinessSummarySource,
ConfidenceState,
DatasetProfile,
DatasetReviewSession,
FindingArea,
FindingSeverity,
RecommendedAction,
ReadinessState,
ResolutionState,
SessionPhase,
SessionStatus,
ValidationFinding,
)
from src.services.dataset_review.repositories.session_repository import (
DatasetReviewSessionRepository,
)
from src.services.dataset_review.semantic_resolver import SemanticSourceResolver
# [/DEF:DatasetReviewOrchestrator.imports:Block]
# [DEF:StartSessionCommand:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed input contract for starting a dataset review session.
@dataclass
class StartSessionCommand:
user: User
environment_id: str
source_kind: str
source_input: str
# [/DEF:StartSessionCommand:Class]
# [DEF:StartSessionResult:Class]
# @COMPLEXITY: 2
# @PURPOSE: Session-start result carrying the persisted session and intake recovery metadata.
@dataclass
class StartSessionResult:
session: DatasetReviewSession
parsed_context: Optional[SupersetParsedContext] = None
findings: List[ValidationFinding] = field(default_factory=list)
# [/DEF:StartSessionResult:Class]
# [DEF:DatasetReviewOrchestrator:Class]
# @COMPLEXITY: 5
# @PURPOSE: Coordinate safe session startup while preserving cross-user isolation and explicit partial recovery.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
# @RELATION: [DEPENDS_ON] ->[SupersetContextExtractor]
# @RELATION: [DEPENDS_ON] ->[TaskManager]
# @RELATION: [DEPENDS_ON] ->[SessionRepo]
# @RELATION: [DEPENDS_ON] ->[ConfigManager]
# @PRE: constructor dependencies are valid and tied to the current request/task scope.
# @POST: orchestrator instance can execute session-scoped mutations for one authenticated user.
# @SIDE_EFFECT: downstream operations may persist session/profile/finding state and enqueue background tasks.
# @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult]
# @INVARIANT: session ownership is preserved on every mutation and recovery remains explicit when partial.
class DatasetReviewOrchestrator:
# [DEF:DatasetReviewOrchestrator.__init__:Function]
# @COMPLEXITY: 3
# @PURPOSE: Bind repository, config, and task dependencies required by the orchestration boundary.
# @RELATION: [DEPENDS_ON] ->[SessionRepo]
# @RELATION: [DEPENDS_ON] ->[ConfigManager]
def __init__(
self,
repository: DatasetReviewSessionRepository,
config_manager: ConfigManager,
task_manager: Optional[TaskManager] = None,
semantic_resolver: Optional[SemanticSourceResolver] = None,
) -> None:
self.repository = repository
self.config_manager = config_manager
self.task_manager = task_manager
self.semantic_resolver = semantic_resolver or SemanticSourceResolver()
# [/DEF:DatasetReviewOrchestrator.__init__:Function]
# [DEF:DatasetReviewOrchestrator.start_session:Function]
# @COMPLEXITY: 5
# @PURPOSE: Initialize a new session from a Superset link or dataset selection and trigger context recovery.
# @RELATION: [DEPENDS_ON] ->[SessionRepo]
# @RELATION: [CALLS] ->[SupersetContextExtractor.parse_superset_link]
# @RELATION: [CALLS] ->[create_task]
# @PRE: source input is non-empty and environment is accessible.
# @POST: session exists in persisted storage with intake/recovery state and task linkage when async work is required.
# @SIDE_EFFECT: persists session and may enqueue recovery task.
# @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult]
# @INVARIANT: no cross-user session leakage occurs; session and follow-up task remain owned by the authenticated user.
def start_session(self, command: StartSessionCommand) -> StartSessionResult:
with belief_scope("DatasetReviewOrchestrator.start_session"):
normalized_source_kind = str(command.source_kind or "").strip()
normalized_source_input = str(command.source_input or "").strip()
normalized_environment_id = str(command.environment_id or "").strip()
if not normalized_source_input:
logger.explore("Blocked dataset review session start due to empty source input")
raise ValueError("source_input must be non-empty")
if normalized_source_kind not in {"superset_link", "dataset_selection"}:
logger.explore(
"Blocked dataset review session start due to unsupported source kind",
extra={"source_kind": normalized_source_kind},
)
raise ValueError("source_kind must be 'superset_link' or 'dataset_selection'")
environment = self.config_manager.get_environment(normalized_environment_id)
if environment is None:
logger.explore(
"Blocked dataset review session start because environment was not found",
extra={"environment_id": normalized_environment_id},
)
raise ValueError("Environment not found")
logger.reason(
"Starting dataset review session",
extra={
"user_id": command.user.id,
"environment_id": normalized_environment_id,
"source_kind": normalized_source_kind,
},
)
parsed_context: Optional[SupersetParsedContext] = None
findings: List[ValidationFinding] = []
dataset_ref = normalized_source_input
dataset_id: Optional[int] = None
dashboard_id: Optional[int] = None
readiness_state = ReadinessState.IMPORTING
recommended_action = RecommendedAction.REVIEW_DOCUMENTATION
current_phase = SessionPhase.RECOVERY
if normalized_source_kind == "superset_link":
extractor = SupersetContextExtractor(environment)
parsed_context = extractor.parse_superset_link(normalized_source_input)
dataset_ref = parsed_context.dataset_ref
dataset_id = parsed_context.dataset_id
dashboard_id = parsed_context.dashboard_id
if parsed_context.partial_recovery:
readiness_state = ReadinessState.RECOVERY_REQUIRED
recommended_action = RecommendedAction.REVIEW_DOCUMENTATION
findings.extend(self._build_partial_recovery_findings(parsed_context))
else:
readiness_state = ReadinessState.REVIEW_READY
else:
dataset_ref, dataset_id = self._parse_dataset_selection(normalized_source_input)
readiness_state = ReadinessState.REVIEW_READY
current_phase = SessionPhase.REVIEW
session = DatasetReviewSession(
user_id=command.user.id,
environment_id=normalized_environment_id,
source_kind=normalized_source_kind,
source_input=normalized_source_input,
dataset_ref=dataset_ref,
dataset_id=dataset_id,
dashboard_id=dashboard_id,
readiness_state=readiness_state,
recommended_action=recommended_action,
status=SessionStatus.ACTIVE,
current_phase=current_phase,
)
persisted_session = self.repository.create_session(session)
profile = self._build_initial_profile(
session_id=persisted_session.session_id,
parsed_context=parsed_context,
dataset_ref=dataset_ref,
)
persisted_session = self.repository.save_profile_and_findings(
persisted_session.session_id,
command.user.id,
profile,
findings,
)
active_task_id = self._enqueue_recovery_task(
command=command,
session=persisted_session,
parsed_context=parsed_context,
)
if active_task_id:
persisted_session.active_task_id = active_task_id
self.repository.db.commit()
self.repository.db.refresh(persisted_session)
logger.reason(
"Linked recovery task to started dataset review session",
extra={"session_id": persisted_session.session_id, "task_id": active_task_id},
)
logger.reflect(
"Dataset review session start completed",
extra={
"session_id": persisted_session.session_id,
"dataset_ref": persisted_session.dataset_ref,
"dataset_id": persisted_session.dataset_id,
"dashboard_id": persisted_session.dashboard_id,
"readiness_state": persisted_session.readiness_state.value,
"active_task_id": persisted_session.active_task_id,
"finding_count": len(findings),
},
)
return StartSessionResult(
session=persisted_session,
parsed_context=parsed_context,
findings=findings,
)
# [/DEF:DatasetReviewOrchestrator.start_session:Function]
# [DEF:DatasetReviewOrchestrator._parse_dataset_selection:Function]
# @COMPLEXITY: 3
# @PURPOSE: Normalize dataset-selection payload into canonical session references.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
def _parse_dataset_selection(self, source_input: str) -> tuple[str, Optional[int]]:
normalized = str(source_input or "").strip()
if not normalized:
raise ValueError("dataset selection input must be non-empty")
if normalized.isdigit():
dataset_id = int(normalized)
return f"dataset:{dataset_id}", dataset_id
if normalized.startswith("dataset:"):
suffix = normalized.split(":", 1)[1].strip()
if suffix.isdigit():
return normalized, int(suffix)
return normalized, None
return normalized, None
# [/DEF:DatasetReviewOrchestrator._parse_dataset_selection:Function]
# [DEF:DatasetReviewOrchestrator._build_initial_profile:Function]
# @COMPLEXITY: 3
# @PURPOSE: Create the first profile snapshot so exports and detail views remain usable immediately after intake.
# @RELATION: [DEPENDS_ON] ->[DatasetProfile]
def _build_initial_profile(
self,
session_id: str,
parsed_context: Optional[SupersetParsedContext],
dataset_ref: str,
) -> DatasetProfile:
dataset_name = dataset_ref.split(".")[-1] if dataset_ref else "Unresolved dataset"
business_summary = (
f"Review session initialized for {dataset_ref}."
if dataset_ref
else "Review session initialized with unresolved dataset context."
)
confidence_state = (
ConfidenceState.MIXED
if parsed_context and parsed_context.partial_recovery
else ConfidenceState.MOSTLY_CONFIRMED
)
return DatasetProfile(
session_id=session_id,
dataset_name=dataset_name or "Unresolved dataset",
schema_name=dataset_ref.split(".")[0] if "." in dataset_ref else None,
business_summary=business_summary,
business_summary_source=BusinessSummarySource.IMPORTED,
description="Initial review profile created from source intake.",
dataset_type="unknown",
is_sqllab_view=False,
completeness_score=0.25,
confidence_state=confidence_state,
has_blocking_findings=False,
has_warning_findings=bool(parsed_context and parsed_context.partial_recovery),
manual_summary_locked=False,
)
# [/DEF:DatasetReviewOrchestrator._build_initial_profile:Function]
# [DEF:DatasetReviewOrchestrator._build_partial_recovery_findings:Function]
# @COMPLEXITY: 4
# @PURPOSE: Project partial Superset intake recovery into explicit findings without blocking session usability.
# @RELATION: [DEPENDS_ON] ->[ValidationFinding]
# @PRE: parsed_context.partial_recovery is true.
# @POST: returns warning-level findings that preserve usable but incomplete state.
# @SIDE_EFFECT: none beyond structured finding creation.
# @DATA_CONTRACT: Input[SupersetParsedContext] -> Output[List[ValidationFinding]]
def _build_partial_recovery_findings(
self,
parsed_context: SupersetParsedContext,
) -> List[ValidationFinding]:
findings: List[ValidationFinding] = []
for unresolved_ref in parsed_context.unresolved_references:
findings.append(
ValidationFinding(
area=FindingArea.SOURCE_INTAKE,
severity=FindingSeverity.WARNING,
code="PARTIAL_SUPERSET_RECOVERY",
title="Superset context recovered partially",
message=(
"Session remains usable, but some Superset context requires review: "
f"{unresolved_ref.replace('_', ' ')}."
),
resolution_state=ResolutionState.OPEN,
caused_by_ref=unresolved_ref,
)
)
return findings
# [/DEF:DatasetReviewOrchestrator._build_partial_recovery_findings:Function]
# [DEF:DatasetReviewOrchestrator._enqueue_recovery_task:Function]
# @COMPLEXITY: 4
# @PURPOSE: Link session start to observable async recovery when task infrastructure is available.
# @RELATION: [CALLS] ->[create_task]
# @PRE: session is already persisted.
# @POST: returns task identifier when a task could be enqueued, otherwise None.
# @SIDE_EFFECT: may create one background task for progressive recovery.
# @DATA_CONTRACT: Input[StartSessionCommand,DatasetReviewSession,SupersetParsedContext|None] -> Output[task_id:str|None]
def _enqueue_recovery_task(
self,
command: StartSessionCommand,
session: DatasetReviewSession,
parsed_context: Optional[SupersetParsedContext],
) -> Optional[str]:
if self.task_manager is None:
logger.reason(
"Dataset review session started without task manager; continuing synchronously",
extra={"session_id": session.session_id},
)
return None
task_params: Dict[str, Any] = {
"session_id": session.session_id,
"user_id": command.user.id,
"environment_id": session.environment_id,
"source_kind": session.source_kind,
"source_input": session.source_input,
"dataset_ref": session.dataset_ref,
"dataset_id": session.dataset_id,
"dashboard_id": session.dashboard_id,
"partial_recovery": bool(parsed_context and parsed_context.partial_recovery),
}
create_task = getattr(self.task_manager, "create_task", None)
if create_task is None:
logger.explore("Task manager has no create_task method; skipping recovery enqueue")
return None
try:
task_object = create_task(
plugin_id="dataset-review-recovery",
params=task_params,
)
except TypeError:
logger.explore(
"Recovery task enqueue skipped because task manager create_task contract is incompatible",
extra={"session_id": session.session_id},
)
return None
task_id = getattr(task_object, "id", None)
return str(task_id) if task_id else None
# [/DEF:DatasetReviewOrchestrator._enqueue_recovery_task:Function]
# [/DEF:DatasetReviewOrchestrator:Class]
# [/DEF:DatasetReviewOrchestrator:Module]

View File

@@ -8,6 +8,9 @@
# @RELATION: [DEPENDS_ON] -> [CompiledPreview]
# @PRE: repository operations execute within authenticated request or task scope.
# @POST: session aggregate reads are structurally consistent and writes preserve ownership and version semantics.
# @SIDE_EFFECT: reads and writes SQLAlchemy-backed session aggregates.
# @DATA_CONTRACT: Input[SessionMutation] -> Output[PersistedSessionAggregate]
# @INVARIANT: answers, mapping approvals, preview artifacts, and launch snapshots are never attributed to the wrong user or session.
from typing import Optional, List
from sqlalchemy import or_
@@ -22,27 +25,51 @@ from src.models.dataset_review import (
)
from src.core.logger import belief_scope
# [DEF:SessionRepo:Class]
# @COMPLEXITY: 4
# @PURPOSE: Enforce ownership-scoped persistence and retrieval for dataset review session aggregates.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @RELATION: [DEPENDS_ON] -> [DatasetProfile]
# @RELATION: [DEPENDS_ON] -> [ValidationFinding]
# @RELATION: [DEPENDS_ON] -> [CompiledPreview]
# @PRE: constructor receives a live SQLAlchemy session and callers provide authenticated user scope for guarded reads and writes.
# @POST: repository methods return ownership-scoped aggregates or persisted child records without changing domain meaning.
# @SIDE_EFFECT: mutates and queries the persistence layer through the injected database session.
# @DATA_CONTRACT: Input[OwnedSessionQuery|SessionMutation] -> Output[PersistedSessionAggregate|PersistedChildRecord]
class DatasetReviewSessionRepository:
"""
@PURPOSE: Persist and retrieve dataset review session aggregates.
@INVARIANT: ownership_scope -> All operations must respect the session owner's user_id.
"""
# [DEF:init_repo:Function]
def __init__(self, db: Session):
self.db = db
# [/DEF:init_repo:Function]
# [DEF:create_sess:Function]
# @COMPLEXITY: 4
# @PURPOSE: Persist an initial dataset review session shell.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @PRE: session is a new aggregate root bound to the current ownership scope.
# @POST: session is committed, refreshed, and returned with persisted identifiers.
# @SIDE_EFFECT: inserts a session row and commits the active transaction.
# @DATA_CONTRACT: Input[DatasetReviewSession] -> Output[DatasetReviewSession]
def create_session(self, session: DatasetReviewSession) -> DatasetReviewSession:
"""
@PURPOSE: Persist initial session shell.
"""
with belief_scope("DatasetReviewSessionRepository.create_session"):
self.db.add(session)
self.db.commit()
self.db.refresh(session)
return session
# [/DEF:create_sess:Function]
# [DEF:load_detail:Function]
# @COMPLEXITY: 3
# @PURPOSE: Return the full session aggregate for API and frontend resume flows.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @RELATION: [DEPENDS_ON] -> [SessionCollaborator]
def load_session_detail(self, session_id: str, user_id: str) -> Optional[DatasetReviewSession]:
"""
@PURPOSE: Return the full session aggregate for API/frontend use.
@PRE: user_id must match session owner or authorized collaborator.
"""
with belief_scope("DatasetReviewSessionRepository.load_session_detail"):
@@ -70,17 +97,25 @@ class DatasetReviewSessionRepository:
)
)\
.first()
# [/DEF:load_detail:Function]
# [DEF:save_prof_find:Function]
# @COMPLEXITY: 4
# @PURPOSE: Persist profile state and replace validation findings for an owned session in one transaction.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @RELATION: [DEPENDS_ON] -> [DatasetProfile]
# @RELATION: [DEPENDS_ON] -> [ValidationFinding]
# @PRE: session_id belongs to user_id and the supplied profile/findings belong to the same aggregate scope.
# @POST: stored profile matches the current session and findings are replaced by the supplied collection.
# @SIDE_EFFECT: updates profile rows, deletes stale findings, inserts current findings, and commits the transaction.
# @DATA_CONTRACT: Input[ProfileAndFindingsMutation] -> Output[DatasetReviewSession]
def save_profile_and_findings(self, session_id: str, user_id: str, profile: DatasetProfile, findings: List[ValidationFinding]) -> DatasetReviewSession:
"""
@PURPOSE: Persist profile and validation state together.
"""
with belief_scope("DatasetReviewSessionRepository.save_profile_and_findings"):
session = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.session_id == session_id,
DatasetReviewSession.user_id == user_id
).first()
if not session:
raise ValueError("Session not found or access denied")
@@ -90,24 +125,31 @@ class DatasetReviewSessionRepository:
if existing_profile:
profile.profile_id = existing_profile.profile_id
self.db.merge(profile)
# Remove old findings for this session to avoid stale data
self.db.query(ValidationFinding).filter(
ValidationFinding.session_id == session_id
).delete()
# Add new findings
for finding in findings:
finding.session_id = session_id
self.db.add(finding)
self.db.commit()
return self.load_session_detail(session_id, user_id)
# [/DEF:save_prof_find:Function]
# [DEF:save_prev:Function]
# @COMPLEXITY: 4
# @PURPOSE: Persist a preview snapshot and mark prior session previews stale.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @RELATION: [DEPENDS_ON] -> [CompiledPreview]
# @PRE: session_id belongs to user_id and preview is prepared for the same session aggregate.
# @POST: preview is persisted and the session points to the latest preview identifier.
# @SIDE_EFFECT: updates prior preview statuses, inserts a preview row, mutates the parent session, and commits.
# @DATA_CONTRACT: Input[PreviewMutation] -> Output[CompiledPreview]
def save_preview(self, session_id: str, user_id: str, preview: CompiledPreview) -> CompiledPreview:
"""
@PURPOSE: Persist compiled preview attempt and mark older fingerprints stale.
"""
with belief_scope("DatasetReviewSessionRepository.save_preview"):
session = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.session_id == session_id,
@@ -125,15 +167,22 @@ class DatasetReviewSessionRepository:
self.db.add(preview)
self.db.flush()
session.last_preview_id = preview.preview_id
self.db.commit()
self.db.refresh(preview)
return preview
# [/DEF:save_prev:Function]
# [DEF:save_run_ctx:Function]
# @COMPLEXITY: 4
# @PURPOSE: Persist an immutable launch audit snapshot for an owned session.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @RELATION: [DEPENDS_ON] -> [DatasetRunContext]
# @PRE: session_id belongs to user_id and run_context targets the same aggregate.
# @POST: run context is persisted and linked as the latest launch snapshot for the session.
# @SIDE_EFFECT: inserts a run-context row, mutates the parent session pointer, and commits.
# @DATA_CONTRACT: Input[RunContextMutation] -> Output[DatasetRunContext]
def save_run_context(self, session_id: str, user_id: str, run_context: DatasetRunContext) -> DatasetRunContext:
"""
@PURPOSE: Persist immutable launch audit snapshot.
"""
with belief_scope("DatasetReviewSessionRepository.save_run_context"):
session = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.session_id == session_id,
@@ -146,18 +195,22 @@ class DatasetReviewSessionRepository:
self.db.add(run_context)
self.db.flush()
session.last_run_context_id = run_context.run_context_id
self.db.commit()
self.db.refresh(run_context)
return run_context
# [/DEF:save_run_ctx:Function]
# [DEF:list_user_sess:Function]
# @COMPLEXITY: 3
# @PURPOSE: List review sessions owned by a specific user ordered by most recent update.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
def list_sessions_for_user(self, user_id: str) -> List[DatasetReviewSession]:
"""
@PURPOSE: List all review sessions owned by a user.
"""
with belief_scope("DatasetReviewSessionRepository.list_sessions_for_user"):
return self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.user_id == user_id
).order_by(DatasetReviewSession.updated_at.desc()).all()
# [/DEF:list_user_sess:Function]
# [/DEF:SessionRepo:Class]
# [/DEF:DatasetReviewSessionRepository:Module]

View File

@@ -0,0 +1,342 @@
# [DEF:SemanticSourceResolver:Module]
# @COMPLEXITY: 4
# @SEMANTICS: dataset_review, semantic_resolution, dictionary, trusted_sources, ranking
# @PURPOSE: Resolve and rank semantic candidates from trusted dictionary-like sources before any inferred fallback.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[LLMProviderService]
# @RELATION: [DEPENDS_ON] ->[SemanticSource]
# @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry]
# @RELATION: [DEPENDS_ON] ->[SemanticCandidate]
# @PRE: selected source and target field set must be known.
# @POST: candidate ranking follows the configured confidence hierarchy and unresolved fuzzy matches remain reviewable.
# @SIDE_EFFECT: may create conflict findings and semantic candidate records.
# @INVARIANT: Manual overrides are never silently replaced by imported, inferred, or AI-generated values.
from __future__ import annotations
# [DEF:SemanticSourceResolver.imports:Block]
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Any, Dict, Iterable, List, Mapping, Optional
from src.core.logger import belief_scope, logger
from src.models.dataset_review import (
CandidateMatchType,
CandidateStatus,
FieldProvenance,
)
# [/DEF:SemanticSourceResolver.imports:Block]
# [DEF:DictionaryResolutionResult:Class]
# @COMPLEXITY: 2
# @PURPOSE: Carries field-level dictionary resolution output with explicit review and partial-recovery state.
@dataclass
class DictionaryResolutionResult:
source_ref: str
resolved_fields: List[Dict[str, Any]] = field(default_factory=list)
unresolved_fields: List[str] = field(default_factory=list)
partial_recovery: bool = False
# [/DEF:DictionaryResolutionResult:Class]
# [DEF:SemanticSourceResolver:Class]
# @COMPLEXITY: 4
# @PURPOSE: Resolve semantic candidates from trusted sources while preserving manual locks and confidence ordering.
# @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry]
# @RELATION: [DEPENDS_ON] ->[SemanticCandidate]
# @PRE: source payload and target field collection are provided by the caller.
# @POST: result contains confidence-ranked candidates and does not overwrite manual locks implicitly.
# @SIDE_EFFECT: emits semantic trace logs for ranking and fallback decisions.
class SemanticSourceResolver:
# [DEF:SemanticSourceResolver.resolve_from_file:Function]
# @COMPLEXITY: 2
# @PURPOSE: Normalize uploaded semantic file records into field-level candidates.
def resolve_from_file(self, source_payload: Mapping[str, Any], fields: Iterable[Mapping[str, Any]]) -> DictionaryResolutionResult:
return DictionaryResolutionResult(source_ref=str(source_payload.get("source_ref") or "uploaded_file"))
# [/DEF:SemanticSourceResolver.resolve_from_file:Function]
# [DEF:SemanticSourceResolver.resolve_from_dictionary:Function]
# @COMPLEXITY: 4
# @PURPOSE: Resolve candidates from connected tabular dictionary sources.
# @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry]
# @RELATION: [DEPENDS_ON] ->[SemanticCandidate]
# @PRE: dictionary source exists and fields contain stable field_name values.
# @POST: returns confidence-ranked candidates where exact dictionary matches outrank fuzzy matches and unresolved fields stay explicit.
# @SIDE_EFFECT: emits belief-state logs describing trusted-match and partial-recovery outcomes.
# @DATA_CONTRACT: Input[source_payload:Mapping,fields:Iterable] -> Output[DictionaryResolutionResult]
def resolve_from_dictionary(
self,
source_payload: Mapping[str, Any],
fields: Iterable[Mapping[str, Any]],
) -> DictionaryResolutionResult:
with belief_scope("SemanticSourceResolver.resolve_from_dictionary"):
source_ref = str(source_payload.get("source_ref") or "").strip()
dictionary_rows = source_payload.get("rows")
if not source_ref:
logger.explore("Dictionary semantic source is missing source_ref")
raise ValueError("Dictionary semantic source must include source_ref")
if not isinstance(dictionary_rows, list) or not dictionary_rows:
logger.explore(
"Dictionary semantic source has no usable rows",
extra={"source_ref": source_ref},
)
raise ValueError("Dictionary semantic source must include non-empty rows")
logger.reason(
"Resolving semantics from trusted dictionary source",
extra={"source_ref": source_ref, "row_count": len(dictionary_rows)},
)
normalized_rows = [self._normalize_dictionary_row(row) for row in dictionary_rows if isinstance(row, Mapping)]
row_index = {
row["field_key"]: row
for row in normalized_rows
if row.get("field_key")
}
resolved_fields: List[Dict[str, Any]] = []
unresolved_fields: List[str] = []
for raw_field in fields:
field_name = str(raw_field.get("field_name") or "").strip()
if not field_name:
continue
is_locked = bool(raw_field.get("is_locked"))
if is_locked:
logger.reason(
"Preserving manual lock during dictionary resolution",
extra={"field_name": field_name},
)
resolved_fields.append(
{
"field_name": field_name,
"applied_candidate": None,
"candidates": [],
"provenance": FieldProvenance.MANUAL_OVERRIDE.value,
"needs_review": False,
"has_conflict": False,
"is_locked": True,
"status": "preserved_manual",
}
)
continue
exact_match = row_index.get(self._normalize_key(field_name))
candidates: List[Dict[str, Any]] = []
if exact_match is not None:
logger.reason(
"Resolved exact dictionary match",
extra={"field_name": field_name, "source_ref": source_ref},
)
candidates.append(
self._build_candidate_payload(
rank=1,
match_type=CandidateMatchType.EXACT,
confidence_score=1.0,
row=exact_match,
)
)
else:
fuzzy_matches = self._find_fuzzy_matches(field_name, normalized_rows)
for rank_offset, fuzzy_match in enumerate(fuzzy_matches, start=1):
candidates.append(
self._build_candidate_payload(
rank=rank_offset,
match_type=CandidateMatchType.FUZZY,
confidence_score=float(fuzzy_match["score"]),
row=fuzzy_match["row"],
)
)
if not candidates:
unresolved_fields.append(field_name)
resolved_fields.append(
{
"field_name": field_name,
"applied_candidate": None,
"candidates": [],
"provenance": FieldProvenance.UNRESOLVED.value,
"needs_review": True,
"has_conflict": False,
"is_locked": False,
"status": "unresolved",
}
)
logger.explore(
"No trusted dictionary match found for field",
extra={"field_name": field_name, "source_ref": source_ref},
)
continue
ranked_candidates = self.rank_candidates(candidates)
applied_candidate = ranked_candidates[0]
has_conflict = len(ranked_candidates) > 1
provenance = (
FieldProvenance.DICTIONARY_EXACT.value
if applied_candidate["match_type"] == CandidateMatchType.EXACT.value
else FieldProvenance.FUZZY_INFERRED.value
)
needs_review = applied_candidate["match_type"] != CandidateMatchType.EXACT.value
resolved_fields.append(
{
"field_name": field_name,
"applied_candidate": applied_candidate,
"candidates": ranked_candidates,
"provenance": provenance,
"needs_review": needs_review,
"has_conflict": has_conflict,
"is_locked": False,
"status": "resolved",
}
)
result = DictionaryResolutionResult(
source_ref=source_ref,
resolved_fields=resolved_fields,
unresolved_fields=unresolved_fields,
partial_recovery=bool(unresolved_fields),
)
logger.reflect(
"Dictionary resolution completed",
extra={
"source_ref": source_ref,
"resolved_fields": len(resolved_fields),
"unresolved_fields": len(unresolved_fields),
"partial_recovery": result.partial_recovery,
},
)
return result
# [/DEF:SemanticSourceResolver.resolve_from_dictionary:Function]
# [DEF:SemanticSourceResolver.resolve_from_reference_dataset:Function]
# @COMPLEXITY: 2
# @PURPOSE: Reuse semantic metadata from trusted Superset datasets.
def resolve_from_reference_dataset(
self,
source_payload: Mapping[str, Any],
fields: Iterable[Mapping[str, Any]],
) -> DictionaryResolutionResult:
return DictionaryResolutionResult(source_ref=str(source_payload.get("source_ref") or "reference_dataset"))
# [/DEF:SemanticSourceResolver.resolve_from_reference_dataset:Function]
# [DEF:SemanticSourceResolver.rank_candidates:Function]
# @COMPLEXITY: 3
# @PURPOSE: Apply confidence ordering and determine best candidate per field.
# @RELATION: [DEPENDS_ON] ->[SemanticCandidate]
def rank_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
ranked = sorted(
candidates,
key=lambda candidate: (
self._match_priority(candidate.get("match_type")),
-float(candidate.get("confidence_score", 0.0)),
int(candidate.get("candidate_rank", 999)),
),
)
for index, candidate in enumerate(ranked, start=1):
candidate["candidate_rank"] = index
return ranked
# [/DEF:SemanticSourceResolver.rank_candidates:Function]
# [DEF:SemanticSourceResolver.detect_conflicts:Function]
# @COMPLEXITY: 2
# @PURPOSE: Mark competing candidate sets that require explicit user review.
def detect_conflicts(self, candidates: List[Dict[str, Any]]) -> bool:
return len(candidates) > 1
# [/DEF:SemanticSourceResolver.detect_conflicts:Function]
# [DEF:SemanticSourceResolver.apply_field_decision:Function]
# @COMPLEXITY: 2
# @PURPOSE: Accept, reject, or manually override a field-level semantic value.
def apply_field_decision(self, field_state: Mapping[str, Any], decision: Mapping[str, Any]) -> Dict[str, Any]:
merged = dict(field_state)
merged.update(decision)
return merged
# [/DEF:SemanticSourceResolver.apply_field_decision:Function]
# [DEF:SemanticSourceResolver._normalize_dictionary_row:Function]
# @COMPLEXITY: 2
# @PURPOSE: Normalize one dictionary row into a consistent lookup structure.
def _normalize_dictionary_row(self, row: Mapping[str, Any]) -> Dict[str, Any]:
field_name = (
row.get("field_name")
or row.get("column_name")
or row.get("name")
or row.get("field")
)
normalized_name = str(field_name or "").strip()
return {
"field_name": normalized_name,
"field_key": self._normalize_key(normalized_name),
"verbose_name": row.get("verbose_name") or row.get("label"),
"description": row.get("description"),
"display_format": row.get("display_format") or row.get("format"),
}
# [/DEF:SemanticSourceResolver._normalize_dictionary_row:Function]
# [DEF:SemanticSourceResolver._find_fuzzy_matches:Function]
# @COMPLEXITY: 2
# @PURPOSE: Produce confidence-scored fuzzy matches while keeping them reviewable.
def _find_fuzzy_matches(self, field_name: str, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
normalized_target = self._normalize_key(field_name)
fuzzy_matches: List[Dict[str, Any]] = []
for row in rows:
candidate_key = str(row.get("field_key") or "")
if not candidate_key:
continue
score = SequenceMatcher(None, normalized_target, candidate_key).ratio()
if score < 0.72:
continue
fuzzy_matches.append({"row": row, "score": round(score, 3)})
fuzzy_matches.sort(key=lambda item: item["score"], reverse=True)
return fuzzy_matches[:3]
# [/DEF:SemanticSourceResolver._find_fuzzy_matches:Function]
# [DEF:SemanticSourceResolver._build_candidate_payload:Function]
# @COMPLEXITY: 2
# @PURPOSE: Project normalized dictionary rows into semantic candidate payloads.
def _build_candidate_payload(
self,
rank: int,
match_type: CandidateMatchType,
confidence_score: float,
row: Mapping[str, Any],
) -> Dict[str, Any]:
return {
"candidate_rank": rank,
"match_type": match_type.value,
"confidence_score": confidence_score,
"proposed_verbose_name": row.get("verbose_name"),
"proposed_description": row.get("description"),
"proposed_display_format": row.get("display_format"),
"status": CandidateStatus.PROPOSED.value,
}
# [/DEF:SemanticSourceResolver._build_candidate_payload:Function]
# [DEF:SemanticSourceResolver._match_priority:Function]
# @COMPLEXITY: 2
# @PURPOSE: Encode trusted-confidence ordering so exact dictionary reuse beats fuzzy invention.
def _match_priority(self, match_type: Optional[str]) -> int:
priority = {
CandidateMatchType.EXACT.value: 0,
CandidateMatchType.REFERENCE.value: 1,
CandidateMatchType.FUZZY.value: 2,
CandidateMatchType.GENERATED.value: 3,
}
return priority.get(str(match_type or ""), 99)
# [/DEF:SemanticSourceResolver._match_priority:Function]
# [DEF:SemanticSourceResolver._normalize_key:Function]
# @COMPLEXITY: 1
# @PURPOSE: Normalize field identifiers for stable exact/fuzzy comparisons.
def _normalize_key(self, value: str) -> str:
return "".join(ch for ch in str(value or "").strip().lower() if ch.isalnum() or ch == "_")
# [/DEF:SemanticSourceResolver._normalize_key:Function]
# [/DEF:SemanticSourceResolver:Class]
# [/DEF:SemanticSourceResolver:Module]