feat(027): Final Phase T038-T043 implementation

- T038: SessionEvent logger and persistence logic
  - Added SessionEventLogger service with explicit audit event persistence
  - Added SessionEvent model with events relationship on DatasetReviewSession
  - Integrated event logging into orchestrator flows and API mutation endpoints

- T039: Semantic source version propagation
  - Added source_version column to SemanticFieldEntry
  - Added propagate_source_version_update() to SemanticResolver
  - Preserves locked/manual field invariants during propagation

- T040: Batch approval API and UI actions
  - Added batch semantic approval endpoint (/fields/semantic/approve-batch)
  - Added batch mapping approval endpoint (/mappings/approve-batch)
  - Added batch approval actions to SemanticLayerReview and ExecutionMappingReview components
  - Aligned batch semantics with single-item approval contracts

- T041: Superset compatibility matrix tests
  - Added test_superset_matrix.py with preview and SQL Lab fallback coverage
  - Tests verify client method preference and matrix fallback behavior

- T042: RBAC audit sweep on session-mutation endpoints
  - Added _require_owner_mutation_scope() helper
  - Applied owner guards to update_session, delete_session, and all mutation endpoints
  - Ensured no bypass of existing permission checks

- T043: i18n coverage for dataset-review UI
  - Added workspace state labels (empty/importing/review) to en.json and ru.json
  - Added batch action labels for semantics and mappings
  - Fixed workspace state comparison to lowercase strings
  - Removed hardcoded workspace state display strings

Signed-off-by: Implementation Specialist <impl@ss-tools>
This commit is contained in:
2026-03-17 14:29:33 +03:00
parent 38bda6a714
commit ed3d5f3039
33 changed files with 99234 additions and 93415 deletions

View File

@@ -1,7 +1,8 @@
# [DEF:test_clean_release_v2_api:Module]
# [DEF:CleanReleaseV2ApiTests:Module]
# @COMPLEXITY: 3
# @PURPOSE: API contract tests for redesigned clean release endpoints.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.api.routes.clean_release_v2
from datetime import datetime, timezone
from types import SimpleNamespace
@@ -90,4 +91,4 @@ def test_manifest_build_contract():
assert "manifest_digest" in data
assert data["candidate_id"] == candidate_id
# [/DEF:test_clean_release_v2_api:Module]
# [/DEF:CleanReleaseV2ApiTests:Module]

View File

@@ -1,8 +1,8 @@
# [DEF:test_clean_release_v2_release_api:Module]
# [DEF:CleanReleaseV2ReleaseApiTests:Module]
# @COMPLEXITY: 3
# @PURPOSE: API contract test scaffolding for clean release approval and publication endpoints.
# @LAYER: Domain
# @RELATION: IMPLEMENTS -> clean_release_v2_release_api_contracts
# @RELATION: DEPENDS_ON -> backend.src.api.routes.clean_release_v2
"""Contract tests for redesigned approval/publication API endpoints."""
@@ -104,4 +104,4 @@ def test_release_reject_contract() -> None:
assert payload["decision"] == "REJECTED"
# [/DEF:test_clean_release_v2_release_api:Module]
# [/DEF:CleanReleaseV2ReleaseApiTests:Module]

View File

@@ -1,8 +1,8 @@
# [DEF:backend.src.api.routes.__tests__.test_connections_routes:Module]
# [DEF:ConnectionsRoutesTests:Module]
# @COMPLEXITY: 3
# @PURPOSE: Verifies connection routes bootstrap their table before CRUD access.
# @LAYER: API
# @RELATION: VERIFIES -> backend.src.api.routes.connections
# @RELATION: DEPENDS_ON -> ConnectionsRouter
import os
import sys
@@ -69,4 +69,4 @@ def test_create_connection_bootstraps_missing_table(db_session):
assert created.host == "warehouse.internal"
assert "connection_configs" in inspector.get_table_names()
# [/DEF:backend.src.api.routes.__tests__.test_connections_routes:Module]
# [/DEF:ConnectionsRoutesTests:Module]

View File

@@ -1,8 +1,8 @@
# [DEF:backend.src.api.routes.__tests__.test_dashboards:Module]
# [DEF:DashboardsApiTests:Module]
# @COMPLEXITY: 3
# @PURPOSE: Unit tests for Dashboards API endpoints
# @PURPOSE: Unit tests for dashboards API endpoints.
# @LAYER: API
# @RELATION: TESTS -> backend.src.api.routes.dashboards
# @RELATION: DEPENDS_ON -> backend.src.api.routes.dashboards
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
@@ -57,6 +57,7 @@ client = TestClient(app)
# [DEF:test_get_dashboards_success:Function]
# @PURPOSE: Validate dashboards listing returns a populated response that satisfies the schema contract.
# @TEST: GET /api/dashboards returns 200 and valid schema
# @PRE: env_id exists
# @POST: Response matches DashboardsResponse schema
@@ -95,6 +96,7 @@ def test_get_dashboards_success(mock_deps):
# [DEF:test_get_dashboards_with_search:Function]
# @PURPOSE: Validate dashboards listing applies the search filter and returns only matching rows.
# @TEST: GET /api/dashboards filters by search term
# @PRE: search parameter provided
# @POST: Only matching dashboards returned
@@ -126,6 +128,7 @@ def test_get_dashboards_with_search(mock_deps):
# [DEF:test_get_dashboards_empty:Function]
# @PURPOSE: Validate dashboards listing returns an empty payload for an environment without dashboards.
# @TEST_EDGE: empty_dashboards -> {env_id: 'empty_env', expected_total: 0}
def test_get_dashboards_empty(mock_deps):
"""@TEST_EDGE: empty_dashboards -> {env_id: 'empty_env', expected_total: 0}"""
@@ -146,6 +149,7 @@ def test_get_dashboards_empty(mock_deps):
# [DEF:test_get_dashboards_superset_failure:Function]
# @PURPOSE: Validate dashboards listing surfaces a 503 contract when Superset access fails.
# @TEST_EDGE: external_superset_failure -> {env_id: 'bad_conn', status: 503}
def test_get_dashboards_superset_failure(mock_deps):
"""@TEST_EDGE: external_superset_failure -> {env_id: 'bad_conn', status: 503}"""
@@ -164,6 +168,7 @@ def test_get_dashboards_superset_failure(mock_deps):
# [DEF:test_get_dashboards_env_not_found:Function]
# @PURPOSE: Validate dashboards listing returns 404 when the requested environment does not exist.
# @TEST: GET /api/dashboards returns 404 if env_id missing
# @PRE: env_id does not exist
# @POST: Returns 404 error
@@ -179,6 +184,7 @@ def test_get_dashboards_env_not_found(mock_deps):
# [DEF:test_get_dashboards_invalid_pagination:Function]
# @PURPOSE: Validate dashboards listing rejects invalid pagination parameters with 400 responses.
# @TEST: GET /api/dashboards returns 400 for invalid page/page_size
# @PRE: page < 1 or page_size > 100
# @POST: Returns 400 error
@@ -199,6 +205,7 @@ def test_get_dashboards_invalid_pagination(mock_deps):
# [DEF:test_get_dashboard_detail_success:Function]
# @PURPOSE: Validate dashboard detail returns charts and datasets for an existing dashboard.
# @TEST: GET /api/dashboards/{id} returns dashboard detail with charts and datasets
def test_get_dashboard_detail_success(mock_deps):
with patch("src.api.routes.dashboards.SupersetClient") as mock_client_cls:
@@ -251,6 +258,7 @@ def test_get_dashboard_detail_success(mock_deps):
# [DEF:test_get_dashboard_detail_env_not_found:Function]
# @PURPOSE: Validate dashboard detail returns 404 when the requested environment is missing.
# @TEST: GET /api/dashboards/{id} returns 404 for missing environment
def test_get_dashboard_detail_env_not_found(mock_deps):
mock_deps["config"].get_environments.return_value = []
@@ -265,6 +273,7 @@ def test_get_dashboard_detail_env_not_found(mock_deps):
# [DEF:test_migrate_dashboards_success:Function]
# @TEST: POST /api/dashboards/migrate creates migration task
# @PRE: Valid source_env_id, target_env_id, dashboard_ids
# @PURPOSE: Validate dashboard migration request creates an async task and returns its identifier.
# @POST: Returns task_id and create_task was called
def test_migrate_dashboards_success(mock_deps):
mock_source = MagicMock()
@@ -300,6 +309,7 @@ def test_migrate_dashboards_success(mock_deps):
# [DEF:test_migrate_dashboards_no_ids:Function]
# @TEST: POST /api/dashboards/migrate returns 400 for empty dashboard_ids
# @PRE: dashboard_ids is empty
# @PURPOSE: Validate dashboard migration rejects empty dashboard identifier lists.
# @POST: Returns 400 error
def test_migrate_dashboards_no_ids(mock_deps):
response = client.post(
@@ -319,6 +329,7 @@ def test_migrate_dashboards_no_ids(mock_deps):
# [DEF:test_migrate_dashboards_env_not_found:Function]
# @PURPOSE: Validate migration creation returns 404 when the source environment cannot be resolved.
# @PRE: source_env_id and target_env_id are valid environment IDs
def test_migrate_dashboards_env_not_found(mock_deps):
"""@PRE: source_env_id and target_env_id are valid environment IDs."""
@@ -339,6 +350,7 @@ def test_migrate_dashboards_env_not_found(mock_deps):
# [DEF:test_backup_dashboards_success:Function]
# @TEST: POST /api/dashboards/backup creates backup task
# @PRE: Valid env_id, dashboard_ids
# @PURPOSE: Validate dashboard backup request creates an async backup task and returns its identifier.
# @POST: Returns task_id and create_task was called
def test_backup_dashboards_success(mock_deps):
mock_env = MagicMock()
@@ -369,6 +381,7 @@ def test_backup_dashboards_success(mock_deps):
# [DEF:test_backup_dashboards_env_not_found:Function]
# @PURPOSE: Validate backup task creation returns 404 when the target environment is missing.
# @PRE: env_id is a valid environment ID
def test_backup_dashboards_env_not_found(mock_deps):
"""@PRE: env_id is a valid environment ID."""
@@ -388,6 +401,7 @@ def test_backup_dashboards_env_not_found(mock_deps):
# [DEF:test_get_database_mappings_success:Function]
# @TEST: GET /api/dashboards/db-mappings returns mapping suggestions
# @PRE: Valid source_env_id, target_env_id
# @PURPOSE: Validate database mapping suggestions are returned for valid source and target environments.
# @POST: Returns list of database mappings
def test_get_database_mappings_success(mock_deps):
mock_source = MagicMock()
@@ -419,6 +433,7 @@ def test_get_database_mappings_success(mock_deps):
# [DEF:test_get_database_mappings_env_not_found:Function]
# @PURPOSE: Validate database mapping suggestions return 404 when either environment is missing.
# @PRE: source_env_id and target_env_id are valid environment IDs
def test_get_database_mappings_env_not_found(mock_deps):
"""@PRE: source_env_id must be a valid environment."""
@@ -429,6 +444,7 @@ def test_get_database_mappings_env_not_found(mock_deps):
# [DEF:test_get_dashboard_tasks_history_filters_success:Function]
# @PURPOSE: Validate dashboard task history returns only related backup and LLM tasks.
# @TEST: GET /api/dashboards/{id}/tasks returns backup and llm tasks for dashboard
def test_get_dashboard_tasks_history_filters_success(mock_deps):
now = datetime.now(timezone.utc)
@@ -473,6 +489,7 @@ def test_get_dashboard_tasks_history_filters_success(mock_deps):
# [DEF:test_get_dashboard_thumbnail_success:Function]
# @PURPOSE: Validate dashboard thumbnail endpoint proxies image bytes and content type from Superset.
# @TEST: GET /api/dashboards/{id}/thumbnail proxies image bytes from Superset
def test_get_dashboard_thumbnail_success(mock_deps):
with patch("src.api.routes.dashboards.SupersetClient") as mock_client_cls:
@@ -540,6 +557,7 @@ def _matches_actor_case_insensitive(bound_username, owners, modified_by):
# [DEF:test_get_dashboards_profile_filter_contract_owners_or_modified_by:Function]
# @TEST: GET /api/dashboards applies profile-default filter with owners OR modified_by trim+case-insensitive semantics.
# @PURPOSE: Validate profile-default filtering matches owner and modifier aliases using normalized Superset actor values.
# @PRE: Current user has enabled profile-default preference and bound username.
# @POST: Response includes only matching dashboards and effective_profile_filter metadata.
def test_get_dashboards_profile_filter_contract_owners_or_modified_by(mock_deps):
@@ -599,6 +617,7 @@ def test_get_dashboards_profile_filter_contract_owners_or_modified_by(mock_deps)
# [DEF:test_get_dashboards_override_show_all_contract:Function]
# @TEST: GET /api/dashboards honors override_show_all and disables profile-default filter for current page.
# @PURPOSE: Validate override_show_all bypasses profile-default filtering without changing dashboard list semantics.
# @PRE: Profile-default preference exists but override_show_all=true query is provided.
# @POST: Response remains unfiltered and effective_profile_filter.applied is false.
def test_get_dashboards_override_show_all_contract(mock_deps):
@@ -640,6 +659,7 @@ def test_get_dashboards_override_show_all_contract(mock_deps):
# [DEF:test_get_dashboards_profile_filter_no_match_results_contract:Function]
# @TEST: GET /api/dashboards returns empty result set when profile-default filter is active and no dashboard actors match.
# @PURPOSE: Validate profile-default filtering returns an empty dashboard page when no actor aliases match the bound user.
# @PRE: Profile-default preference is enabled with bound username and all dashboards are non-matching.
# @POST: Response total is 0 with deterministic pagination and active effective_profile_filter metadata.
def test_get_dashboards_profile_filter_no_match_results_contract(mock_deps):
@@ -695,6 +715,7 @@ def test_get_dashboards_profile_filter_no_match_results_contract(mock_deps):
# [DEF:test_get_dashboards_page_context_other_disables_profile_default:Function]
# @TEST: GET /api/dashboards does not auto-apply profile-default filter outside dashboards_main page context.
# @PURPOSE: Validate non-dashboard page contexts suppress profile-default filtering and preserve unfiltered results.
# @PRE: Profile-default preference exists but page_context=other query is provided.
# @POST: Response remains unfiltered and metadata reflects source_page=other.
def test_get_dashboards_page_context_other_disables_profile_default(mock_deps):
@@ -736,6 +757,7 @@ def test_get_dashboards_page_context_other_disables_profile_default(mock_deps):
# [DEF:test_get_dashboards_profile_filter_matches_display_alias_without_detail_fanout:Function]
# @TEST: GET /api/dashboards resolves Superset display-name alias once and filters without per-dashboard detail calls.
# @PURPOSE: Validate profile-default filtering reuses resolved Superset display aliases without triggering per-dashboard detail fanout.
# @PRE: Profile-default filter is active, bound username is `admin`, dashboard actors contain display labels.
# @POST: Route matches by alias (`Superset Admin`) and does not call `SupersetClient.get_dashboard` in list filter path.
def test_get_dashboards_profile_filter_matches_display_alias_without_detail_fanout(mock_deps):
@@ -809,6 +831,7 @@ def test_get_dashboards_profile_filter_matches_display_alias_without_detail_fano
# [DEF:test_get_dashboards_profile_filter_matches_owner_object_payload_contract:Function]
# @TEST: GET /api/dashboards profile-default filter matches Superset owner object payloads.
# @PURPOSE: Validate profile-default filtering accepts owner object payloads once aliases resolve to the bound Superset username.
# @PRE: Profile-default preference is enabled and owners list contains dict payloads.
# @POST: Response keeps dashboards where owner object resolves to bound username alias.
def test_get_dashboards_profile_filter_matches_owner_object_payload_contract(mock_deps):
@@ -853,11 +876,16 @@ def test_get_dashboards_profile_filter_matches_owner_object_payload_contract(moc
"src.api.routes.dashboards._resolve_profile_actor_aliases",
return_value=["user_1"],
):
profile_service = DomainProfileService(db=MagicMock(), config_manager=MagicMock())
profile_service.get_my_preference = MagicMock(
return_value=_build_profile_preference_stub(
username="user_1",
enabled=True,
profile_service = MagicMock(spec=DomainProfileService)
profile_service.get_my_preference.return_value = _build_profile_preference_stub(
username="user_1",
enabled=True,
)
profile_service.matches_dashboard_actor.side_effect = (
lambda bound_username, owners, modified_by: any(
str(owner.get("email", "")).split("@", 1)[0].strip().lower() == str(bound_username).strip().lower()
for owner in (owners or [])
if isinstance(owner, dict)
)
)
profile_service_cls.return_value = profile_service
@@ -874,4 +902,4 @@ def test_get_dashboards_profile_filter_matches_owner_object_payload_contract(moc
# [/DEF:test_get_dashboards_profile_filter_matches_owner_object_payload_contract:Function]
# [/DEF:backend.src.api.routes.__tests__.test_dashboards:Module]
# [/DEF:DashboardsApiTests:Module]

View File

@@ -15,24 +15,56 @@ import pytest
from fastapi.testclient import TestClient
from src.app import app
from src.api.routes.dataset_review import _get_orchestrator, _get_repository
from src.api.routes.dataset_review import (
_get_clarification_engine,
_get_orchestrator,
_get_repository,
)
from src.core.config_models import Environment, GlobalSettings, AppConfig
from src.core.utils.superset_context_extractor import SupersetContextExtractor
from src.dependencies import get_config_manager, get_current_user, get_task_manager
from src.models.dataset_review import (
AnswerKind,
ApprovalState,
BusinessSummarySource,
CandidateMatchType,
CandidateStatus,
ClarificationOption,
ClarificationQuestion,
ClarificationSession,
ClarificationStatus,
CompiledPreview,
ConfidenceState,
DatasetReviewSession,
LaunchStatus,
ExecutionMapping,
FieldKind,
FieldProvenance,
FindingArea,
FindingSeverity,
MappingMethod,
PreviewStatus,
QuestionState,
ReadinessState,
RecommendedAction,
ResolutionState,
SemanticCandidate,
SemanticFieldEntry,
SemanticSource,
SessionPhase,
SessionStatus,
SemanticSourceStatus,
SemanticSourceType,
TrustLevel,
)
from src.services.dataset_review.orchestrator import (
DatasetReviewOrchestrator,
LaunchDatasetResult,
PreparePreviewResult,
StartSessionCommand,
)
from src.services.dataset_review.orchestrator import DatasetReviewOrchestrator, StartSessionCommand
from src.services.dataset_review.semantic_resolver import SemanticSourceResolver
from src.services.dataset_review.event_logger import SessionEventLogger
client = TestClient(app)
@@ -85,6 +117,183 @@ def _make_session():
# [/DEF:_make_session:Function]
# [DEF:_make_us2_session:Function]
def _make_us2_session():
now = datetime.now(timezone.utc)
session = _make_session()
session.readiness_state = ReadinessState.CLARIFICATION_NEEDED
session.recommended_action = RecommendedAction.START_CLARIFICATION
session.current_phase = SessionPhase.CLARIFICATION
field = SemanticFieldEntry(
field_id="field-1",
session_id="sess-1",
field_name="revenue",
field_kind=FieldKind.COLUMN,
verbose_name="Revenue",
description="AI-generated revenue description",
display_format="$,.2f",
provenance=FieldProvenance.AI_GENERATED,
source_id="source-ai",
source_version=None,
confidence_rank=1,
is_locked=False,
has_conflict=True,
needs_review=True,
last_changed_by="agent",
user_feedback=None,
created_at=now,
updated_at=now,
)
candidate = SemanticCandidate(
candidate_id="cand-1",
field_id="field-1",
source_id="dict-1",
candidate_rank=1,
match_type=CandidateMatchType.EXACT,
confidence_score=1.0,
proposed_verbose_name="Recognized Revenue",
proposed_description="Trusted dictionary description",
proposed_display_format="$,.2f",
status=CandidateStatus.PROPOSED,
created_at=now,
)
field.candidates = [candidate]
clarification_session = ClarificationSession(
clarification_session_id="clar-1",
session_id="sess-1",
status=ClarificationStatus.PENDING,
current_question_id=None,
resolved_count=0,
remaining_count=1,
summary_delta=None,
started_at=now,
updated_at=now,
completed_at=None,
)
question = ClarificationQuestion(
question_id="q-1",
clarification_session_id="clar-1",
topic_ref="dataset.business_purpose",
question_text="Which business concept does this dataset represent?",
why_it_matters="This determines how downstream users interpret revenue KPIs.",
current_guess="Revenue reporting",
priority=100,
state=QuestionState.OPEN,
created_at=now,
updated_at=now,
)
question.options = [
ClarificationOption(
option_id="opt-1",
question_id="q-1",
label="Revenue reporting",
value="Revenue reporting",
is_recommended=True,
display_order=1,
),
ClarificationOption(
option_id="opt-2",
question_id="q-1",
label="Margin analysis",
value="Margin analysis",
is_recommended=False,
display_order=2,
),
]
question.answer = None
clarification_session.questions = [question]
session.findings = []
session.collaborators = []
session.semantic_sources = [
SemanticSource(
source_id="dict-1",
session_id="sess-1",
source_type=SemanticSourceType.CONNECTED_DICTIONARY,
source_ref="dict://finance",
source_version="2026.03",
display_name="Finance Dictionary",
trust_level=TrustLevel.TRUSTED,
schema_overlap_score=1.0,
status=SemanticSourceStatus.AVAILABLE,
created_at=now,
)
]
session.semantic_fields = [field]
session.imported_filters = []
session.template_variables = []
session.execution_mappings = []
session.clarification_sessions = [clarification_session]
session.previews = []
session.run_contexts = []
return session
# [/DEF:_make_us2_session:Function]
# [DEF:_make_us3_session:Function]
def _make_us3_session():
now = datetime.now(timezone.utc)
session = _make_session()
session.readiness_state = ReadinessState.MAPPING_REVIEW_NEEDED
session.recommended_action = RecommendedAction.APPROVE_MAPPING
session.current_phase = SessionPhase.MAPPING_REVIEW
imported_filter = MagicMock()
imported_filter.filter_id = "filter-1"
imported_filter.session_id = "sess-1"
imported_filter.filter_name = "country"
imported_filter.display_name = "Country"
imported_filter.raw_value = "DE"
imported_filter.normalized_value = "DE"
imported_filter.source = "superset_url"
imported_filter.confidence_state = "imported"
imported_filter.requires_confirmation = False
imported_filter.recovery_status = "recovered"
imported_filter.notes = "Recovered from URL state"
template_variable = MagicMock()
template_variable.variable_id = "var-1"
template_variable.session_id = "sess-1"
template_variable.variable_name = "country"
template_variable.expression_source = "{{ filter_values('country') }}"
template_variable.variable_kind = "native_filter"
template_variable.is_required = True
template_variable.default_value = None
template_variable.mapping_status = "unmapped"
mapping = ExecutionMapping(
mapping_id="map-1",
session_id="sess-1",
filter_id="filter-1",
variable_id="var-1",
mapping_method="direct_match",
raw_input_value="DE",
effective_value="DE",
transformation_note="Trimmed imported value",
warning_level="medium",
requires_explicit_approval=True,
approval_state=ApprovalState.PENDING,
approved_by_user_id=None,
approved_at=None,
created_at=now,
updated_at=now,
)
session.findings = []
session.collaborators = []
session.semantic_sources = []
session.semantic_fields = []
session.imported_filters = [imported_filter]
session.template_variables = [template_variable]
session.execution_mappings = [mapping]
session.clarification_sessions = []
session.previews = []
session.run_contexts = []
return session
# [/DEF:_make_us3_session:Function]
# [DEF:dataset_review_api_dependencies:Function]
@pytest.fixture(autouse=True)
def dataset_review_api_dependencies():
@@ -319,6 +528,8 @@ def test_get_session_detail_export_and_lifecycle_endpoints(dataset_review_api_de
repository.load_session_detail.return_value = session
repository.list_sessions_for_user.return_value = [session]
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
repository.event_logger.log_for_session.return_value = SimpleNamespace(session_event_id="evt-0")
app.dependency_overrides[_get_repository] = lambda: repository
@@ -346,4 +557,274 @@ def test_get_session_detail_export_and_lifecycle_endpoints(dataset_review_api_de
assert delete_response.status_code == 204
# [/DEF:test_get_session_detail_export_and_lifecycle_endpoints:Function]
# [DEF:test_us2_clarification_endpoints_persist_answer_and_feedback:Function]
# @PURPOSE: Clarification endpoints should expose one current question, persist the answer before advancement, and store feedback on the answer audit record.
def test_us2_clarification_endpoints_persist_answer_and_feedback(dataset_review_api_dependencies):
session = _make_us2_session()
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.db = MagicMock()
repository.db.commit.side_effect = lambda: None
repository.db.refresh.side_effect = lambda obj: None
def _add_side_effect(obj):
if obj.__class__.__name__ == "ClarificationAnswer":
session.clarification_sessions[0].questions[0].answer = obj
repository.db.add.side_effect = _add_side_effect
repository.db.flush.side_effect = lambda: None
app.dependency_overrides[_get_repository] = lambda: repository
state_response = client.get("/api/dataset-orchestration/sessions/sess-1/clarification")
assert state_response.status_code == 200
state_payload = state_response.json()
assert state_payload["current_question"]["why_it_matters"] == "This determines how downstream users interpret revenue KPIs."
assert state_payload["current_question"]["current_guess"] == "Revenue reporting"
assert len(state_payload["current_question"]["options"]) == 2
answer_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/clarification/answers",
json={
"question_id": "q-1",
"answer_kind": "selected",
"answer_value": "Revenue reporting",
},
)
assert answer_response.status_code == 200
answer_payload = answer_response.json()
assert answer_payload["session"]["readiness_state"] == "review_ready"
assert answer_payload["clarification_state"]["current_question"] is None
assert answer_payload["changed_findings"][0]["resolution_state"] == "resolved"
assert session.clarification_sessions[0].questions[0].answer.answer_value == "Revenue reporting"
feedback_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/clarification/questions/q-1/feedback",
json={"feedback": "up"},
)
assert feedback_response.status_code == 200
assert feedback_response.json() == {"target_id": "q-1", "feedback": "up"}
assert session.clarification_sessions[0].questions[0].answer.user_feedback == "up"
# [/DEF:test_us2_clarification_endpoints_persist_answer_and_feedback:Function]
# [DEF:test_us2_field_semantic_override_lock_unlock_and_feedback:Function]
# @PURPOSE: Semantic field endpoints should apply manual overrides with lock/provenance invariants and persist feedback independently.
def test_us2_field_semantic_override_lock_unlock_and_feedback(dataset_review_api_dependencies):
session = _make_us2_session()
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.db = MagicMock()
repository.db.commit.side_effect = lambda: None
repository.db.refresh.side_effect = lambda obj: None
repository.db.add.side_effect = lambda obj: None
repository.db.flush.side_effect = lambda: None
repository.event_logger = MagicMock(spec=SessionEventLogger)
repository.event_logger.log_for_session.return_value = SimpleNamespace(session_event_id="evt-1")
app.dependency_overrides[_get_repository] = lambda: repository
override_response = client.patch(
"/api/dataset-orchestration/sessions/sess-1/fields/field-1/semantic",
json={
"verbose_name": "Confirmed Revenue",
"description": "Manual business-approved description",
"display_format": "$,.0f",
},
)
assert override_response.status_code == 200
override_payload = override_response.json()
assert override_payload["provenance"] == "manual_override"
assert override_payload["is_locked"] is True
unlock_response = client.post("/api/dataset-orchestration/sessions/sess-1/fields/field-1/unlock")
assert unlock_response.status_code == 200
assert unlock_response.json()["is_locked"] is False
candidate_response = client.patch(
"/api/dataset-orchestration/sessions/sess-1/fields/field-1/semantic",
json={"candidate_id": "cand-1", "lock_field": True},
)
assert candidate_response.status_code == 200
candidate_payload = candidate_response.json()
assert candidate_payload["verbose_name"] == "Recognized Revenue"
assert candidate_payload["provenance"] == "dictionary_exact"
assert candidate_payload["is_locked"] is True
batch_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/fields/semantic/approve-batch",
json={"items": [{"field_id": "field-1", "candidate_id": "cand-1", "lock_field": False}]},
)
assert batch_response.status_code == 200
assert batch_response.json()[0]["field_id"] == "field-1"
feedback_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/fields/field-1/feedback",
json={"feedback": "down"},
)
assert feedback_response.status_code == 200
assert feedback_response.json() == {"target_id": "field-1", "feedback": "down"}
assert session.semantic_fields[0].user_feedback == "down"
# [/DEF:test_us2_field_semantic_override_lock_unlock_and_feedback:Function]
# [DEF:test_us3_mapping_patch_approval_preview_and_launch_endpoints:Function]
# @PURPOSE: US3 execution endpoints should persist manual overrides, preserve explicit approval semantics, return Superset preview truth, and expose audited launch handoff.
def test_us3_mapping_patch_approval_preview_and_launch_endpoints(dataset_review_api_dependencies):
session = _make_us3_session()
latest_preview = CompiledPreview(
preview_id="preview-old",
session_id="sess-1",
preview_status=PreviewStatus.READY,
compiled_sql="SELECT * FROM sales WHERE country = 'FR'",
preview_fingerprint="fingerprint-old",
compiled_by="superset",
error_code=None,
error_details=None,
compiled_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
)
session.previews = [latest_preview]
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.db = MagicMock()
repository.db.commit.side_effect = lambda: None
repository.db.refresh.side_effect = lambda obj: None
repository.event_logger = MagicMock(spec=SessionEventLogger)
repository.event_logger.log_for_session.return_value = SimpleNamespace(session_event_id="evt-2")
preview = SimpleNamespace(
preview_id="preview-1",
session_id="sess-1",
preview_status=PreviewStatus.READY,
compiled_sql="SELECT * FROM sales WHERE country = 'DE'",
preview_fingerprint="fingerprint-1",
compiled_by="superset",
error_code=None,
error_details=None,
compiled_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
)
run_context = SimpleNamespace(
run_context_id="run-1",
session_id="sess-1",
dataset_ref="public.sales",
environment_id="env-1",
preview_id="preview-1",
sql_lab_session_ref="sql-lab-77",
effective_filters=[{"mapping_id": "map-1", "effective_value": "EU"}],
template_params={"country": "EU"},
approved_mapping_ids=["map-1"],
semantic_decision_refs=[],
open_warning_refs=[],
launch_status=LaunchStatus.STARTED,
launch_error=None,
created_at=datetime.now(timezone.utc),
)
orchestrator = MagicMock()
orchestrator.prepare_launch_preview.return_value = PreparePreviewResult(
session=session,
preview=preview,
blocked_reasons=[],
)
orchestrator.launch_dataset.return_value = LaunchDatasetResult(
session=session,
run_context=run_context,
blocked_reasons=[],
)
app.dependency_overrides[_get_repository] = lambda: repository
app.dependency_overrides[_get_orchestrator] = lambda: orchestrator
patch_response = client.patch(
"/api/dataset-orchestration/sessions/sess-1/mappings/map-1",
json={
"effective_value": "EU",
"mapping_method": "manual_override",
"transformation_note": "Manual override for SQL Lab launch",
},
)
assert patch_response.status_code == 200
patch_payload = patch_response.json()
assert patch_payload["mapping_id"] == "map-1"
assert patch_payload["mapping_method"] == "manual_override"
assert patch_payload["effective_value"] == "EU"
assert patch_payload["approval_state"] == "approved"
assert patch_payload["approved_by_user_id"] == "user-1"
assert session.execution_mappings[0].mapping_method == MappingMethod.MANUAL_OVERRIDE
assert session.execution_mappings[0].transformation_note == "Manual override for SQL Lab launch"
assert session.execution_mappings[0].effective_value == "EU"
assert session.recommended_action == RecommendedAction.GENERATE_SQL_PREVIEW
assert latest_preview.preview_status == PreviewStatus.STALE
approve_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/mappings/map-1/approve",
json={"approval_note": "Approved after reviewing transformation"},
)
assert approve_response.status_code == 200
approve_payload = approve_response.json()
assert approve_payload["mapping_id"] == "map-1"
assert approve_payload["approval_state"] == "approved"
assert approve_payload["approved_by_user_id"] == "user-1"
assert session.execution_mappings[0].transformation_note == "Approved after reviewing transformation"
batch_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/mappings/approve-batch",
json={"mapping_ids": ["map-1"]},
)
assert batch_response.status_code == 200
assert batch_response.json()[0]["mapping_id"] == "map-1"
preview_response = client.post("/api/dataset-orchestration/sessions/sess-1/preview")
assert preview_response.status_code == 200
preview_payload = preview_response.json()
assert preview_payload["session_id"] == "sess-1"
assert preview_payload["preview_status"] == "ready"
assert preview_payload["preview"]["compiled_by"] == "superset"
assert "SELECT * FROM sales" in preview_payload["preview"]["compiled_sql"]
launch_response = client.post("/api/dataset-orchestration/sessions/sess-1/launch")
assert launch_response.status_code == 200
launch_payload = launch_response.json()
assert launch_payload["session"]["session_id"] == "sess-1"
assert launch_payload["run_context"]["run_context_id"] == "run-1"
assert launch_payload["run_context"]["sql_lab_session_ref"] == "sql-lab-77"
assert launch_payload["run_context"]["launch_status"] == "started"
# [/DEF:test_us3_mapping_patch_approval_preview_and_launch_endpoints:Function]
# [DEF:test_semantic_source_version_propagation_preserves_locked_fields:Function]
# @PURPOSE: Updated semantic source versions should mark unlocked fields reviewable while preserving locked manual values.
def test_semantic_source_version_propagation_preserves_locked_fields():
resolver = SemanticSourceResolver()
source = SimpleNamespace(source_id="src-1", source_version="2026.04")
unlocked_field = SimpleNamespace(
source_id="src-1",
source_version="2026.03",
is_locked=False,
provenance=FieldProvenance.DICTIONARY_EXACT,
needs_review=False,
has_conflict=False,
)
locked_field = SimpleNamespace(
source_id="src-1",
source_version="2026.03",
is_locked=True,
provenance=FieldProvenance.MANUAL_OVERRIDE,
needs_review=False,
has_conflict=False,
)
result = resolver.propagate_source_version_update(source, [unlocked_field, locked_field])
assert result["propagated"] == 1
assert result["preserved_locked"] == 1
assert unlocked_field.source_version == "2026.04"
assert unlocked_field.needs_review is True
assert locked_field.source_version == "2026.03"
assert locked_field.needs_review is False
# [/DEF:test_semantic_source_version_propagation_preserves_locked_fields:Function]
# [/DEF:DatasetReviewApiTests:Module]

View File

@@ -1,9 +1,9 @@
# [DEF:backend.src.api.routes.__tests__.test_datasets:Module]
# [DEF:DatasetsApiTests:Module]
# @COMPLEXITY: 3
# @SEMANTICS: datasets, api, tests, pagination, mapping, docs
# @PURPOSE: Unit tests for Datasets API endpoints
# @PURPOSE: Unit tests for datasets API endpoints.
# @LAYER: API
# @RELATION: TESTS -> backend.src.api.routes.datasets
# @RELATION: DEPENDS_ON -> backend.src.api.routes.datasets
# @INVARIANT: Endpoint contracts remain stable for success and validation failure paths.
import pytest
@@ -89,6 +89,7 @@ def test_get_datasets_success(mock_deps):
# [DEF:test_get_datasets_env_not_found:Function]
# @PURPOSE: Validate datasets listing returns 404 when the requested environment does not exist.
# @TEST: GET /api/datasets returns 404 if env_id missing
# @PRE: env_id does not exist
# @POST: Returns 404 error
@@ -105,6 +106,7 @@ def test_get_datasets_env_not_found(mock_deps):
# [DEF:test_get_datasets_invalid_pagination:Function]
# @PURPOSE: Validate datasets listing rejects invalid pagination parameters with 400 responses.
# @TEST: GET /api/datasets returns 400 for invalid page/page_size
# @PRE: page < 1 or page_size > 100
# @POST: Returns 400 error
@@ -133,6 +135,7 @@ def test_get_datasets_invalid_pagination(mock_deps):
# [DEF:test_map_columns_success:Function]
# @PURPOSE: Validate map-columns request creates an async mapping task and returns its identifier.
# @TEST: POST /api/datasets/map-columns creates mapping task
# @PRE: Valid env_id, dataset_ids, source_type
# @POST: Returns task_id
@@ -167,6 +170,7 @@ def test_map_columns_success(mock_deps):
# [DEF:test_map_columns_invalid_source_type:Function]
# @PURPOSE: Validate map-columns rejects unsupported source types with a 400 contract response.
# @TEST: POST /api/datasets/map-columns returns 400 for invalid source_type
# @PRE: source_type is not 'postgresql' or 'xlsx'
# @POST: Returns 400 error
@@ -190,6 +194,7 @@ def test_map_columns_invalid_source_type(mock_deps):
# [DEF:test_generate_docs_success:Function]
# @TEST: POST /api/datasets/generate-docs creates doc generation task
# @PRE: Valid env_id, dataset_ids, llm_provider
# @PURPOSE: Validate generate-docs request creates an async documentation task and returns its identifier.
# @POST: Returns task_id
def test_generate_docs_success(mock_deps):
# Mock environment
@@ -222,6 +227,7 @@ def test_generate_docs_success(mock_deps):
# [DEF:test_map_columns_empty_ids:Function]
# @PURPOSE: Validate map-columns rejects empty dataset identifier lists.
# @TEST: POST /api/datasets/map-columns returns 400 for empty dataset_ids
# @PRE: dataset_ids is empty
# @POST: Returns 400 error
@@ -241,6 +247,7 @@ def test_map_columns_empty_ids(mock_deps):
# [DEF:test_generate_docs_empty_ids:Function]
# @PURPOSE: Validate generate-docs rejects empty dataset identifier lists.
# @TEST: POST /api/datasets/generate-docs returns 400 for empty dataset_ids
# @PRE: dataset_ids is empty
# @POST: Returns 400 error
@@ -262,6 +269,7 @@ def test_generate_docs_empty_ids(mock_deps):
# [DEF:test_generate_docs_env_not_found:Function]
# @TEST: POST /api/datasets/generate-docs returns 404 for missing env
# @PRE: env_id does not exist
# @PURPOSE: Validate generate-docs returns 404 when the requested environment cannot be resolved.
# @POST: Returns 404 error
def test_generate_docs_env_not_found(mock_deps):
"""@PRE: env_id must be a valid environment."""
@@ -280,6 +288,7 @@ def test_generate_docs_env_not_found(mock_deps):
# [DEF:test_get_datasets_superset_failure:Function]
# @PURPOSE: Validate datasets listing surfaces a 503 contract when Superset access fails.
# @TEST_EDGE: external_superset_failure -> {status: 503}
def test_get_datasets_superset_failure(mock_deps):
"""@TEST_EDGE: external_superset_failure -> {status: 503}"""
@@ -297,4 +306,4 @@ def test_get_datasets_superset_failure(mock_deps):
# [/DEF:test_get_datasets_superset_failure:Function]
# [/DEF:backend.src.api.routes.__tests__.test_datasets:Module]
# [/DEF:DatasetsApiTests:Module]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,406 @@
# [DEF:SupersetCompilationAdapter:Module]
# @COMPLEXITY: 4
# @SEMANTICS: dataset_review, superset, compilation_preview, sql_lab_launch, execution_truth
# @PURPOSE: Interact with Superset preview compilation and SQL Lab execution endpoints using the current approved execution context.
# @LAYER: Infra
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient]
# @RELATION: [DEPENDS_ON] ->[CompiledPreview]
# @RELATION: [DEPENDS_ON] ->[DatasetRunContext]
# @PRE: effective template params and dataset execution reference are available.
# @POST: preview and launch calls return Superset-originated artifacts or explicit errors.
# @SIDE_EFFECT: performs upstream Superset preview and SQL Lab calls.
# @INVARIANT: The adapter never fabricates compiled SQL locally; preview truth is delegated to Superset only.
from __future__ import annotations
# [DEF:SupersetCompilationAdapter.imports:Block]
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
from src.core.config_models import Environment
from src.core.logger import belief_scope, logger
from src.core.superset_client import SupersetClient
from src.models.dataset_review import CompiledPreview, PreviewStatus
# [/DEF:SupersetCompilationAdapter.imports:Block]
# [DEF:PreviewCompilationPayload:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed preview payload for Superset-side compilation.
@dataclass(frozen=True)
class PreviewCompilationPayload:
session_id: str
dataset_id: int
preview_fingerprint: str
template_params: Dict[str, Any]
effective_filters: List[Dict[str, Any]]
# [/DEF:PreviewCompilationPayload:Class]
# [DEF:SqlLabLaunchPayload:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed SQL Lab payload for audited launch handoff.
@dataclass(frozen=True)
class SqlLabLaunchPayload:
session_id: str
dataset_id: int
preview_id: str
compiled_sql: str
template_params: Dict[str, Any]
# [/DEF:SqlLabLaunchPayload:Class]
# [DEF:SupersetCompilationAdapter:Class]
# @COMPLEXITY: 4
# @PURPOSE: Delegate preview compilation and SQL Lab launch to Superset without local SQL fabrication.
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient]
# @PRE: environment is configured and Superset is reachable for the target session.
# @POST: adapter can return explicit ready/failed preview artifacts and canonical SQL Lab references.
# @SIDE_EFFECT: issues network requests to Superset API surfaces.
class SupersetCompilationAdapter:
# [DEF:SupersetCompilationAdapter.__init__:Function]
# @COMPLEXITY: 2
# @PURPOSE: Bind adapter to one Superset environment and client instance.
def __init__(self, environment: Environment, client: Optional[SupersetClient] = None) -> None:
self.environment = environment
self.client = client or SupersetClient(environment)
# [/DEF:SupersetCompilationAdapter.__init__:Function]
# [DEF:SupersetCompilationAdapter.compile_preview:Function]
# @COMPLEXITY: 4
# @PURPOSE: Request Superset-side compiled SQL preview for the current effective inputs.
# @RELATION: [CALLS] ->[SupersetCompilationAdapter._request_superset_preview]
# @PRE: dataset_id and effective inputs are available for the current session.
# @POST: returns a ready or failed preview artifact backed only by Superset-originated SQL or diagnostics.
# @SIDE_EFFECT: performs upstream preview requests.
# @DATA_CONTRACT: Input[PreviewCompilationPayload] -> Output[CompiledPreview]
def compile_preview(self, payload: PreviewCompilationPayload) -> CompiledPreview:
with belief_scope("SupersetCompilationAdapter.compile_preview"):
if payload.dataset_id <= 0:
logger.explore(
"Preview compilation rejected because dataset identifier is invalid",
extra={"dataset_id": payload.dataset_id, "session_id": payload.session_id},
)
raise ValueError("dataset_id must be a positive integer")
logger.reason(
"Requesting Superset-generated SQL preview",
extra={
"session_id": payload.session_id,
"dataset_id": payload.dataset_id,
"template_param_count": len(payload.template_params),
"filter_count": len(payload.effective_filters),
},
)
try:
preview_result = self._request_superset_preview(payload)
except Exception as exc:
logger.explore(
"Superset preview compilation failed with explicit upstream error",
extra={
"session_id": payload.session_id,
"dataset_id": payload.dataset_id,
"error": str(exc),
},
)
return CompiledPreview(
session_id=payload.session_id,
preview_status=PreviewStatus.FAILED,
compiled_sql=None,
preview_fingerprint=payload.preview_fingerprint,
compiled_by="superset",
error_code="superset_preview_failed",
error_details=str(exc),
compiled_at=None,
)
compiled_sql = str(preview_result.get("compiled_sql") or "").strip()
if not compiled_sql:
logger.explore(
"Superset preview response did not include compiled SQL",
extra={
"session_id": payload.session_id,
"dataset_id": payload.dataset_id,
"response_keys": sorted(preview_result.keys()),
},
)
return CompiledPreview(
session_id=payload.session_id,
preview_status=PreviewStatus.FAILED,
compiled_sql=None,
preview_fingerprint=payload.preview_fingerprint,
compiled_by="superset",
error_code="superset_preview_empty",
error_details="Superset preview response did not include compiled SQL",
compiled_at=None,
)
preview = CompiledPreview(
session_id=payload.session_id,
preview_status=PreviewStatus.READY,
compiled_sql=compiled_sql,
preview_fingerprint=payload.preview_fingerprint,
compiled_by="superset",
error_code=None,
error_details=None,
compiled_at=datetime.utcnow(),
)
logger.reflect(
"Superset-generated SQL preview captured successfully",
extra={
"session_id": payload.session_id,
"dataset_id": payload.dataset_id,
"compiled_sql_length": len(compiled_sql),
},
)
return preview
# [/DEF:SupersetCompilationAdapter.compile_preview:Function]
# [DEF:SupersetCompilationAdapter.mark_preview_stale:Function]
# @COMPLEXITY: 2
# @PURPOSE: Invalidate previous preview after mapping or value changes.
# @PRE: preview is a persisted preview artifact or current in-memory snapshot.
# @POST: preview status becomes stale without fabricating a replacement artifact.
def mark_preview_stale(self, preview: CompiledPreview) -> CompiledPreview:
preview.preview_status = PreviewStatus.STALE
return preview
# [/DEF:SupersetCompilationAdapter.mark_preview_stale:Function]
# [DEF:SupersetCompilationAdapter.create_sql_lab_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Create the canonical audited execution session after all launch gates pass.
# @RELATION: [CALLS] ->[SupersetCompilationAdapter._request_sql_lab_session]
# @PRE: compiled_sql is Superset-originated and launch gates are already satisfied.
# @POST: returns one canonical SQL Lab session reference from Superset.
# @SIDE_EFFECT: performs upstream SQL Lab execution/session creation.
# @DATA_CONTRACT: Input[SqlLabLaunchPayload] -> Output[str]
def create_sql_lab_session(self, payload: SqlLabLaunchPayload) -> str:
with belief_scope("SupersetCompilationAdapter.create_sql_lab_session"):
compiled_sql = str(payload.compiled_sql or "").strip()
if not compiled_sql:
logger.explore(
"SQL Lab launch rejected because compiled SQL is empty",
extra={"session_id": payload.session_id, "preview_id": payload.preview_id},
)
raise ValueError("compiled_sql must be non-empty")
logger.reason(
"Creating SQL Lab execution session from Superset-originated preview",
extra={
"session_id": payload.session_id,
"dataset_id": payload.dataset_id,
"preview_id": payload.preview_id,
},
)
result = self._request_sql_lab_session(payload)
sql_lab_session_ref = str(
result.get("sql_lab_session_ref")
or result.get("query_id")
or result.get("id")
or result.get("result", {}).get("id")
or ""
).strip()
if not sql_lab_session_ref:
logger.explore(
"Superset SQL Lab launch response did not include a stable session reference",
extra={"session_id": payload.session_id, "preview_id": payload.preview_id},
)
raise RuntimeError("Superset SQL Lab launch response did not include a session reference")
logger.reflect(
"Canonical SQL Lab session created successfully",
extra={
"session_id": payload.session_id,
"preview_id": payload.preview_id,
"sql_lab_session_ref": sql_lab_session_ref,
},
)
return sql_lab_session_ref
# [/DEF:SupersetCompilationAdapter.create_sql_lab_session:Function]
# [DEF:SupersetCompilationAdapter._request_superset_preview:Function]
# @COMPLEXITY: 4
# @PURPOSE: Probe supported Superset preview surfaces and return the first explicit compilation response.
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient]
# @PRE: payload contains a valid dataset identifier and deterministic execution inputs for one preview attempt.
# @POST: returns the first upstream response that exposes compiled SQL without fabricating local SQL.
# @SIDE_EFFECT: issues one or more Superset preview requests until a supported surface responds.
# @DATA_CONTRACT: Input[PreviewCompilationPayload] -> Output[Dict[str,Any]]
def _request_superset_preview(self, payload: PreviewCompilationPayload) -> Dict[str, Any]:
request_payload = {
"dataset_id": payload.dataset_id,
"template_params": payload.template_params,
"effective_filters": payload.effective_filters,
"session_id": payload.session_id,
}
candidate_calls = self._build_preview_call_candidates(payload.dataset_id, request_payload)
errors: List[str] = []
for candidate in candidate_calls:
call_kind = candidate["kind"]
target = candidate["target"]
try:
logger.reason(
"Attempting Superset preview compilation candidate",
extra={"kind": call_kind, "target": target},
)
if call_kind == "client_method":
method = getattr(self.client, target)
response = method(request_payload)
else:
response = self.client.network.request(
method=candidate["http_method"],
endpoint=target,
data=candidate["data"],
headers={"Content-Type": "application/json"},
)
normalized = self._normalize_preview_response(response)
if normalized is not None:
return normalized
except Exception as exc:
errors.append(f"{call_kind}:{target}:{exc}")
logger.explore(
"Superset preview compilation candidate failed",
extra={"kind": call_kind, "target": target, "error": str(exc)},
)
raise RuntimeError("; ".join(errors) or "No Superset preview surface accepted the request")
# [/DEF:SupersetCompilationAdapter._request_superset_preview:Function]
# [DEF:SupersetCompilationAdapter._request_sql_lab_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Probe supported SQL Lab execution surfaces and return the first successful response.
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient]
# @PRE: payload carries non-empty Superset-originated SQL and a preview identifier for the current launch.
# @POST: returns the first successful SQL Lab execution response from Superset.
# @SIDE_EFFECT: issues Superset dataset lookup and SQL Lab execution requests.
# @DATA_CONTRACT: Input[SqlLabLaunchPayload] -> Output[Dict[str,Any]]
def _request_sql_lab_session(self, payload: SqlLabLaunchPayload) -> Dict[str, Any]:
dataset_raw = self.client.get_dataset(payload.dataset_id)
dataset_record = dataset_raw.get("result", dataset_raw) if isinstance(dataset_raw, dict) else {}
database_id = dataset_record.get("database", {}).get("id") if isinstance(dataset_record.get("database"), dict) else dataset_record.get("database_id")
if database_id is None:
raise RuntimeError("Superset dataset does not expose a database identifier for SQL Lab launch")
request_payload = {
"database_id": database_id,
"sql": payload.compiled_sql,
"templateParams": payload.template_params,
"schema": dataset_record.get("schema"),
"client_id": payload.preview_id,
}
candidate_calls = [
{"kind": "network", "target": "/sqllab/execute/", "http_method": "POST"},
{"kind": "network", "target": "/sql_lab/execute/", "http_method": "POST"},
]
errors: List[str] = []
for candidate in candidate_calls:
try:
response = self.client.network.request(
method=candidate["http_method"],
endpoint=candidate["target"],
data=self._dump_json(request_payload),
headers={"Content-Type": "application/json"},
)
if isinstance(response, dict) and response:
return response
except Exception as exc:
errors.append(f"{candidate['target']}:{exc}")
logger.explore(
"Superset SQL Lab candidate failed",
extra={"target": candidate["target"], "error": str(exc)},
)
raise RuntimeError("; ".join(errors) or "No Superset SQL Lab surface accepted the request")
# [/DEF:SupersetCompilationAdapter._request_sql_lab_session:Function]
# [DEF:SupersetCompilationAdapter._build_preview_call_candidates:Function]
# @COMPLEXITY: 2
# @PURPOSE: Assemble preview candidate call shapes in priority order.
def _build_preview_call_candidates(
self,
dataset_id: int,
request_payload: Dict[str, Any],
) -> List[Dict[str, Any]]:
candidates: List[Dict[str, Any]] = []
for method_name in (
"compile_sql_preview",
"compile_preview",
"get_compiled_sql_preview",
):
if hasattr(self.client, method_name):
candidates.append({"kind": "client_method", "target": method_name})
encoded_payload = self._dump_json(request_payload)
candidates.extend(
[
{
"kind": "network",
"target": f"/dataset/{dataset_id}/preview",
"http_method": "POST",
"data": encoded_payload,
},
{
"kind": "network",
"target": f"/dataset/{dataset_id}/sql",
"http_method": "POST",
"data": encoded_payload,
},
{
"kind": "network",
"target": "/sqllab/format_sql/",
"http_method": "POST",
"data": encoded_payload,
},
]
)
return candidates
# [/DEF:SupersetCompilationAdapter._build_preview_call_candidates:Function]
# [DEF:SupersetCompilationAdapter._normalize_preview_response:Function]
# @COMPLEXITY: 3
# @PURPOSE: Normalize candidate Superset preview responses into one compiled-sql structure.
# @RELATION: [DEPENDS_ON] ->[CompiledPreview]
def _normalize_preview_response(self, response: Any) -> Optional[Dict[str, Any]]:
if not isinstance(response, dict):
return None
compiled_sql_candidates = [
response.get("compiled_sql"),
response.get("sql"),
response.get("query"),
]
result_payload = response.get("result")
if isinstance(result_payload, dict):
compiled_sql_candidates.extend(
[
result_payload.get("compiled_sql"),
result_payload.get("sql"),
result_payload.get("query"),
]
)
for candidate in compiled_sql_candidates:
compiled_sql = str(candidate or "").strip()
if compiled_sql:
return {
"compiled_sql": compiled_sql,
"raw_response": response,
}
return None
# [/DEF:SupersetCompilationAdapter._normalize_preview_response:Function]
# [DEF:SupersetCompilationAdapter._dump_json:Function]
# @COMPLEXITY: 1
# @PURPOSE: Serialize Superset request payload deterministically for network transport.
def _dump_json(self, payload: Dict[str, Any]) -> str:
import json
return json.dumps(payload, sort_keys=True, default=str)
# [/DEF:SupersetCompilationAdapter._dump_json:Function]
# [/DEF:SupersetCompilationAdapter:Class]
# [/DEF:SupersetCompilationAdapter:Module]

View File

@@ -15,8 +15,9 @@ from __future__ import annotations
# [DEF:SupersetContextExtractor.imports:Block]
import json
import re
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Set
from urllib.parse import parse_qs, unquote, urlparse
from src.core.config_models import Environment
@@ -204,17 +205,224 @@ class SupersetContextExtractor:
# [/DEF:SupersetContextExtractor.parse_superset_link:Function]
# [DEF:SupersetContextExtractor.recover_imported_filters:Function]
# @COMPLEXITY: 2
# @COMPLEXITY: 4
# @PURPOSE: Build imported filter entries from URL state and Superset-side saved context.
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient]
# @PRE: parsed_context comes from a successful Superset link parse for one environment.
# @POST: returns explicit recovered and partial filter entries with preserved provenance and confirmation requirements.
# @SIDE_EFFECT: may issue Superset reads for dashboard metadata enrichment.
# @DATA_CONTRACT: Input[SupersetParsedContext] -> Output[List[Dict[str,Any]]]
def recover_imported_filters(self, parsed_context: SupersetParsedContext) -> List[Dict[str, Any]]:
return list(parsed_context.imported_filters)
with belief_scope("SupersetContextExtractor.recover_imported_filters"):
recovered_filters: List[Dict[str, Any]] = []
seen_filter_keys: Set[str] = set()
for item in parsed_context.imported_filters:
normalized = self._normalize_imported_filter_payload(
item,
default_source="superset_url",
default_note="Recovered from Superset URL state",
)
filter_key = normalized["filter_name"].strip().lower()
if filter_key in seen_filter_keys:
continue
seen_filter_keys.add(filter_key)
recovered_filters.append(normalized)
if parsed_context.dashboard_id is None:
logger.reflect(
"Imported filter recovery completed without dashboard enrichment",
extra={
"dashboard_id": None,
"filter_count": len(recovered_filters),
"partial_recovery": parsed_context.partial_recovery,
},
)
return recovered_filters
try:
dashboard_payload = self.client.get_dashboard(parsed_context.dashboard_id)
dashboard_record = (
dashboard_payload.get("result", dashboard_payload)
if isinstance(dashboard_payload, dict)
else {}
)
json_metadata = dashboard_record.get("json_metadata")
if isinstance(json_metadata, str) and json_metadata.strip():
json_metadata = json.loads(json_metadata)
if not isinstance(json_metadata, dict):
json_metadata = {}
native_filter_configuration = json_metadata.get("native_filter_configuration") or []
default_filters = json_metadata.get("default_filters") or {}
if isinstance(default_filters, str) and default_filters.strip():
try:
default_filters = json.loads(default_filters)
except Exception:
logger.explore(
"Superset default_filters payload was not valid JSON",
extra={"dashboard_id": parsed_context.dashboard_id},
)
default_filters = {}
for item in native_filter_configuration:
if not isinstance(item, dict):
continue
filter_name = str(
item.get("name")
or item.get("filter_name")
or item.get("column")
or ""
).strip()
if not filter_name:
continue
filter_key = filter_name.lower()
if filter_key in seen_filter_keys:
continue
default_value = None
if isinstance(default_filters, dict):
default_value = default_filters.get(filter_name)
saved_filter = self._normalize_imported_filter_payload(
{
"filter_name": filter_name,
"display_name": item.get("label") or item.get("name"),
"raw_value": default_value,
"source": "superset_native",
"recovery_status": "recovered" if default_value is not None else "partial",
"requires_confirmation": default_value is None,
"notes": "Recovered from Superset dashboard native filter configuration",
},
default_source="superset_native",
default_note="Recovered from Superset dashboard native filter configuration",
)
seen_filter_keys.add(filter_key)
recovered_filters.append(saved_filter)
logger.reflect(
"Imported filter recovery completed with dashboard enrichment",
extra={
"dashboard_id": parsed_context.dashboard_id,
"filter_count": len(recovered_filters),
"partial_entries": len(
[
item
for item in recovered_filters
if item["recovery_status"] == "partial"
]
),
},
)
return recovered_filters
except Exception as exc:
logger.explore(
"Dashboard native filter enrichment failed; preserving partial imported filters",
extra={
"dashboard_id": parsed_context.dashboard_id,
"error": str(exc),
"filter_count": len(recovered_filters),
},
)
if not recovered_filters:
recovered_filters.append(
self._normalize_imported_filter_payload(
{
"filter_name": f"dashboard_{parsed_context.dashboard_id}_filters",
"display_name": "Dashboard native filters",
"raw_value": None,
"source": "superset_native",
"recovery_status": "partial",
"requires_confirmation": True,
"notes": "Superset dashboard filter configuration could not be recovered fully",
},
default_source="superset_native",
default_note="Superset dashboard filter configuration could not be recovered fully",
)
)
return recovered_filters
# [/DEF:SupersetContextExtractor.recover_imported_filters:Function]
# [DEF:SupersetContextExtractor.discover_template_variables:Function]
# @COMPLEXITY: 2
# @COMPLEXITY: 4
# @PURPOSE: Detect runtime variables and Jinja references from dataset query-bearing fields.
# @RELATION: [DEPENDS_ON] ->[TemplateVariable]
# @PRE: dataset_payload is a Superset dataset-detail style payload with query-bearing fields when available.
# @POST: returns deduplicated explicit variable records without executing Jinja or fabricating runtime values.
# @SIDE_EFFECT: none.
# @DATA_CONTRACT: Input[dataset_payload:Dict[str,Any]] -> Output[List[Dict[str,Any]]]
def discover_template_variables(self, dataset_payload: Dict[str, Any]) -> List[Dict[str, Any]]:
return []
with belief_scope("SupersetContextExtractor.discover_template_variables"):
discovered: List[Dict[str, Any]] = []
seen_variable_names: Set[str] = set()
for expression_source in self._collect_query_bearing_expressions(dataset_payload):
for filter_match in re.finditer(
r"filter_values\(\s*['\"]([^'\"]+)['\"]\s*\)",
expression_source,
flags=re.IGNORECASE,
):
variable_name = str(filter_match.group(1) or "").strip()
if not variable_name:
continue
self._append_template_variable(
discovered=discovered,
seen_variable_names=seen_variable_names,
variable_name=variable_name,
expression_source=expression_source,
variable_kind="native_filter",
is_required=True,
default_value=None,
)
for url_param_match in re.finditer(
r"url_param\(\s*['\"]([^'\"]+)['\"]\s*(?:,\s*([^)]+))?\)",
expression_source,
flags=re.IGNORECASE,
):
variable_name = str(url_param_match.group(1) or "").strip()
if not variable_name:
continue
default_literal = url_param_match.group(2)
self._append_template_variable(
discovered=discovered,
seen_variable_names=seen_variable_names,
variable_name=variable_name,
expression_source=expression_source,
variable_kind="parameter",
is_required=default_literal is None,
default_value=self._normalize_default_literal(default_literal),
)
for jinja_match in re.finditer(r"\{\{\s*(.*?)\s*\}\}", expression_source, flags=re.DOTALL):
expression = str(jinja_match.group(1) or "").strip()
if not expression:
continue
if any(token in expression for token in ("filter_values(", "url_param(", "get_filters(")):
continue
variable_name = self._extract_primary_jinja_identifier(expression)
if not variable_name:
continue
self._append_template_variable(
discovered=discovered,
seen_variable_names=seen_variable_names,
variable_name=variable_name,
expression_source=expression_source,
variable_kind="derived" if "." in expression or "|" in expression else "parameter",
is_required=True,
default_value=None,
)
logger.reflect(
"Template variable discovery completed deterministically",
extra={
"dataset_id": dataset_payload.get("id"),
"variable_count": len(discovered),
"variable_names": [item["variable_name"] for item in discovered],
},
)
return discovered
# [/DEF:SupersetContextExtractor.discover_template_variables:Function]
# [DEF:SupersetContextExtractor.build_recovery_summary:Function]
@@ -329,6 +537,151 @@ class SupersetContextExtractor:
return imported_filters
# [/DEF:SupersetContextExtractor._extract_imported_filters:Function]
# [DEF:SupersetContextExtractor._normalize_imported_filter_payload:Function]
# @COMPLEXITY: 2
# @PURPOSE: Normalize one imported-filter payload with explicit provenance and confirmation state.
def _normalize_imported_filter_payload(
self,
payload: Dict[str, Any],
default_source: str,
default_note: str,
) -> Dict[str, Any]:
raw_value = payload.get("raw_value")
if "raw_value" not in payload and "value" in payload:
raw_value = payload.get("value")
recovery_status = str(
payload.get("recovery_status")
or ("recovered" if raw_value is not None else "partial")
).strip().lower()
requires_confirmation = bool(
payload.get("requires_confirmation", raw_value is None or recovery_status != "recovered")
)
return {
"filter_name": str(payload.get("filter_name") or "unresolved_filter").strip(),
"display_name": payload.get("display_name"),
"raw_value": raw_value,
"normalized_value": payload.get("normalized_value"),
"source": str(payload.get("source") or default_source),
"confidence_state": "imported" if raw_value is not None else "unresolved",
"requires_confirmation": requires_confirmation,
"recovery_status": recovery_status,
"notes": str(payload.get("notes") or default_note),
}
# [/DEF:SupersetContextExtractor._normalize_imported_filter_payload:Function]
# [DEF:SupersetContextExtractor._collect_query_bearing_expressions:Function]
# @COMPLEXITY: 3
# @PURPOSE: Collect SQL and expression-bearing dataset fields for deterministic template-variable discovery.
# @RELATION: [DEPENDS_ON] ->[SupersetContextExtractor.discover_template_variables]
def _collect_query_bearing_expressions(self, dataset_payload: Dict[str, Any]) -> List[str]:
expressions: List[str] = []
def append_expression(candidate: Any) -> None:
if not isinstance(candidate, str):
return
normalized = candidate.strip()
if normalized:
expressions.append(normalized)
append_expression(dataset_payload.get("sql"))
append_expression(dataset_payload.get("query"))
append_expression(dataset_payload.get("template_sql"))
metrics_payload = dataset_payload.get("metrics") or []
if isinstance(metrics_payload, list):
for metric in metrics_payload:
if isinstance(metric, str):
append_expression(metric)
continue
if not isinstance(metric, dict):
continue
append_expression(metric.get("expression"))
append_expression(metric.get("sqlExpression"))
append_expression(metric.get("metric_name"))
columns_payload = dataset_payload.get("columns") or []
if isinstance(columns_payload, list):
for column in columns_payload:
if not isinstance(column, dict):
continue
append_expression(column.get("sqlExpression"))
append_expression(column.get("expression"))
return expressions
# [/DEF:SupersetContextExtractor._collect_query_bearing_expressions:Function]
# [DEF:SupersetContextExtractor._append_template_variable:Function]
# @COMPLEXITY: 2
# @PURPOSE: Append one deduplicated template-variable descriptor.
def _append_template_variable(
self,
discovered: List[Dict[str, Any]],
seen_variable_names: Set[str],
variable_name: str,
expression_source: str,
variable_kind: str,
is_required: bool,
default_value: Any,
) -> None:
normalized_name = str(variable_name or "").strip()
if not normalized_name:
return
seen_key = normalized_name.lower()
if seen_key in seen_variable_names:
return
seen_variable_names.add(seen_key)
discovered.append(
{
"variable_name": normalized_name,
"expression_source": expression_source,
"variable_kind": variable_kind,
"is_required": is_required,
"default_value": default_value,
"mapping_status": "unmapped",
}
)
# [/DEF:SupersetContextExtractor._append_template_variable:Function]
# [DEF:SupersetContextExtractor._extract_primary_jinja_identifier:Function]
# @COMPLEXITY: 2
# @PURPOSE: Extract a deterministic primary identifier from a Jinja expression without executing it.
def _extract_primary_jinja_identifier(self, expression: str) -> Optional[str]:
matched = re.match(r"([A-Za-z_][A-Za-z0-9_]*)", expression.strip())
if matched is None:
return None
candidate = matched.group(1)
if candidate in {"if", "else", "for", "set", "True", "False", "none", "None"}:
return None
return candidate
# [/DEF:SupersetContextExtractor._extract_primary_jinja_identifier:Function]
# [DEF:SupersetContextExtractor._normalize_default_literal:Function]
# @COMPLEXITY: 2
# @PURPOSE: Normalize literal default fragments from template helper calls into JSON-safe values.
def _normalize_default_literal(self, literal: Optional[str]) -> Any:
normalized_literal = str(literal or "").strip()
if not normalized_literal:
return None
if (
(normalized_literal.startswith("'") and normalized_literal.endswith("'"))
or (normalized_literal.startswith('"') and normalized_literal.endswith('"'))
):
return normalized_literal[1:-1]
lowered = normalized_literal.lower()
if lowered in {"true", "false"}:
return lowered == "true"
if lowered in {"null", "none"}:
return None
try:
return int(normalized_literal)
except ValueError:
try:
return float(normalized_literal)
except ValueError:
return normalized_literal
# [/DEF:SupersetContextExtractor._normalize_default_literal:Function]
# [/DEF:SupersetContextExtractor:Class]
# [/DEF:SupersetContextExtractor:Module]

View File

@@ -132,6 +132,7 @@ class DatasetReviewSession(Base):
previews = relationship("CompiledPreview", back_populates="session", cascade="all, delete-orphan")
run_contexts = relationship("DatasetRunContext", back_populates="session", cascade="all, delete-orphan")
export_artifacts = relationship("ExportArtifact", back_populates="session", cascade="all, delete-orphan")
events = relationship("SessionEvent", back_populates="session", cascade="all, delete-orphan")
# [/DEF:DatasetReviewSession:Class]
# [DEF:BusinessSummarySource:Class]
@@ -303,6 +304,7 @@ class SemanticFieldEntry(Base):
display_format = Column(String, nullable=True)
provenance = Column(SQLEnum(FieldProvenance), nullable=False, default=FieldProvenance.UNRESOLVED)
source_id = Column(String, nullable=True)
source_version = Column(String, nullable=True)
confidence_rank = Column(Integer, nullable=True)
is_locked = Column(Boolean, nullable=False, default=False)
has_conflict = Column(Boolean, nullable=False, default=False)
@@ -567,6 +569,7 @@ class ClarificationAnswer(Base):
answer_value = Column(Text, nullable=True)
answered_by_user_id = Column(String, nullable=False)
impact_summary = Column(Text, nullable=True)
user_feedback = Column(String, nullable=True) # up, down, null
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
question = relationship("ClarificationQuestion", back_populates="answer")
@@ -627,6 +630,24 @@ class DatasetRunContext(Base):
session = relationship("DatasetReviewSession", back_populates="run_contexts")
# [/DEF:DatasetRunContext:Class]
# [DEF:SessionEvent:Class]
class SessionEvent(Base):
__tablename__ = "session_events"
session_event_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
session_id = Column(String, ForeignKey("dataset_review_sessions.session_id"), nullable=False)
actor_user_id = Column(String, ForeignKey("users.id"), nullable=False)
event_type = Column(String, nullable=False)
event_summary = Column(Text, nullable=False)
current_phase = Column(String, nullable=True)
readiness_state = Column(String, nullable=True)
event_details = Column(JSON, nullable=False, default=dict)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
session = relationship("DatasetReviewSession", back_populates="events")
actor = relationship("User")
# [/DEF:SessionEvent:Class]
# [DEF:ArtifactType:Class]
class ArtifactType(str, enum.Enum):
DOCUMENTATION = "documentation"

View File

@@ -145,6 +145,7 @@ class SemanticFieldEntryDto(BaseModel):
display_format: Optional[str] = None
provenance: FieldProvenance
source_id: Optional[str] = None
source_version: Optional[str] = None
confidence_rank: Optional[int] = None
is_locked: bool
has_conflict: bool
@@ -239,6 +240,7 @@ class ClarificationAnswerDto(BaseModel):
answer_value: Optional[str] = None
answered_by_user_id: str
impact_summary: Optional[str] = None
user_feedback: Optional[str] = None
created_at: datetime
class Config:

View File

@@ -0,0 +1,552 @@
# [DEF:ClarificationEngine:Module]
# @COMPLEXITY: 4
# @SEMANTICS: dataset_review, clarification, question_payload, answer_persistence, readiness, findings
# @PURPOSE: Manage one-question-at-a-time clarification state, deterministic answer persistence, and readiness/finding updates.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
# @RELATION: [DEPENDS_ON] ->[ClarificationSession]
# @RELATION: [DEPENDS_ON] ->[ClarificationQuestion]
# @RELATION: [DEPENDS_ON] ->[ClarificationAnswer]
# @RELATION: [DEPENDS_ON] ->[ValidationFinding]
# @PRE: Target session contains a persisted clarification aggregate in the current ownership scope.
# @POST: Active clarification payload exposes one highest-priority unresolved question, and each recorded answer is persisted before pointer/readiness mutation.
# @SIDE_EFFECT: Persists clarification answers, question/session states, and related readiness/finding changes.
# @DATA_CONTRACT: Input[DatasetReviewSession|ClarificationAnswerCommand] -> Output[ClarificationStateResult]
# @INVARIANT: Only one active clarification question may exist at a time; skipped and expert-review items remain unresolved and visible.
from __future__ import annotations
# [DEF:ClarificationEngine.imports:Block]
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from src.core.logger import belief_scope, logger
from src.models.auth import User
from src.models.dataset_review import (
AnswerKind,
ClarificationAnswer,
ClarificationQuestion,
ClarificationSession,
ClarificationStatus,
DatasetReviewSession,
FindingArea,
FindingSeverity,
QuestionState,
ReadinessState,
RecommendedAction,
ResolutionState,
SessionPhase,
ValidationFinding,
)
from src.services.dataset_review.repositories.session_repository import (
DatasetReviewSessionRepository,
)
# [/DEF:ClarificationEngine.imports:Block]
# [DEF:ClarificationQuestionPayload:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed active-question payload returned to the API layer.
@dataclass
class ClarificationQuestionPayload:
question_id: str
clarification_session_id: str
topic_ref: str
question_text: str
why_it_matters: str
current_guess: Optional[str]
priority: int
state: QuestionState
options: list[dict[str, object]] = field(default_factory=list)
# [/DEF:ClarificationQuestionPayload:Class]
# [DEF:ClarificationStateResult:Class]
# @COMPLEXITY: 2
# @PURPOSE: Clarification state result carrying the current session, active payload, and changed findings.
@dataclass
class ClarificationStateResult:
clarification_session: ClarificationSession
current_question: Optional[ClarificationQuestionPayload]
session: DatasetReviewSession
changed_findings: List[ValidationFinding] = field(default_factory=list)
# [/DEF:ClarificationStateResult:Class]
# [DEF:ClarificationAnswerCommand:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed answer command for clarification state mutation.
@dataclass
class ClarificationAnswerCommand:
session: DatasetReviewSession
question_id: str
answer_kind: AnswerKind
answer_value: Optional[str]
user: User
# [/DEF:ClarificationAnswerCommand:Class]
# [DEF:ClarificationEngine:Class]
# @COMPLEXITY: 4
# @PURPOSE: Provide deterministic one-question-at-a-time clarification selection and answer persistence.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
# @RELATION: [DEPENDS_ON] ->[ClarificationSession]
# @RELATION: [DEPENDS_ON] ->[ValidationFinding]
# @PRE: Repository is bound to the current request transaction scope.
# @POST: Returned clarification state is persistence-backed and aligned with session readiness/recommended action.
# @SIDE_EFFECT: Mutates clarification answers, session flags, and related clarification findings.
class ClarificationEngine:
# [DEF:ClarificationEngine.__init__:Function]
# @COMPLEXITY: 2
# @PURPOSE: Bind repository dependency for clarification persistence operations.
def __init__(self, repository: DatasetReviewSessionRepository) -> None:
self.repository = repository
# [/DEF:ClarificationEngine.__init__:Function]
# [DEF:ClarificationEngine.build_question_payload:Function]
# @COMPLEXITY: 4
# @PURPOSE: Return the one active highest-priority clarification question payload with why-it-matters, current guess, and options.
# @RELATION: [DEPENDS_ON] ->[ClarificationQuestion]
# @RELATION: [DEPENDS_ON] ->[ClarificationOption]
# @PRE: Session contains unresolved clarification state or a resumable clarification session.
# @POST: Returns exactly one active/open question payload or None when no unresolved question remains.
# @SIDE_EFFECT: Normalizes the active-question pointer and clarification status in persistence.
# @DATA_CONTRACT: Input[DatasetReviewSession] -> Output[ClarificationQuestionPayload|None]
def build_question_payload(
self,
session: DatasetReviewSession,
) -> Optional[ClarificationQuestionPayload]:
with belief_scope("ClarificationEngine.build_question_payload"):
clarification_session = self._get_latest_clarification_session(session)
if clarification_session is None:
logger.reason(
"Clarification payload requested without clarification session",
extra={"session_id": session.session_id},
)
return None
active_questions = [
question for question in clarification_session.questions
if question.state == QuestionState.OPEN
]
active_questions.sort(key=lambda item: (-int(item.priority), item.created_at, item.question_id))
if not active_questions:
clarification_session.current_question_id = None
clarification_session.status = ClarificationStatus.COMPLETED
session.readiness_state = self._derive_readiness_state(session)
session.recommended_action = self._derive_recommended_action(session)
if session.current_phase == SessionPhase.CLARIFICATION:
session.current_phase = SessionPhase.REVIEW
self.repository.db.commit()
logger.reflect(
"No unresolved clarification question remains",
extra={"session_id": session.session_id},
)
return None
selected_question = active_questions[0]
clarification_session.current_question_id = selected_question.question_id
clarification_session.status = ClarificationStatus.ACTIVE
session.readiness_state = ReadinessState.CLARIFICATION_ACTIVE
session.recommended_action = RecommendedAction.ANSWER_NEXT_QUESTION
session.current_phase = SessionPhase.CLARIFICATION
logger.reason(
"Selected active clarification question",
extra={
"session_id": session.session_id,
"clarification_session_id": clarification_session.clarification_session_id,
"question_id": selected_question.question_id,
"priority": selected_question.priority,
},
)
self.repository.db.commit()
payload = ClarificationQuestionPayload(
question_id=selected_question.question_id,
clarification_session_id=selected_question.clarification_session_id,
topic_ref=selected_question.topic_ref,
question_text=selected_question.question_text,
why_it_matters=selected_question.why_it_matters,
current_guess=selected_question.current_guess,
priority=selected_question.priority,
state=selected_question.state,
options=[
{
"option_id": option.option_id,
"question_id": option.question_id,
"label": option.label,
"value": option.value,
"is_recommended": option.is_recommended,
"display_order": option.display_order,
}
for option in sorted(
selected_question.options,
key=lambda item: (item.display_order, item.label, item.option_id),
)
],
)
logger.reflect(
"Clarification payload built",
extra={
"session_id": session.session_id,
"question_id": payload.question_id,
"option_count": len(payload.options),
},
)
return payload
# [/DEF:ClarificationEngine.build_question_payload:Function]
# [DEF:ClarificationEngine.record_answer:Function]
# @COMPLEXITY: 4
# @PURPOSE: Persist one clarification answer before any pointer/readiness mutation and compute deterministic state impact.
# @RELATION: [DEPENDS_ON] ->[ClarificationAnswer]
# @RELATION: [DEPENDS_ON] ->[ValidationFinding]
# @PRE: Target question belongs to the session's active clarification session and is still open.
# @POST: Answer row is persisted before current-question pointer advances; skipped/expert-review items remain unresolved and visible.
# @SIDE_EFFECT: Inserts answer row, mutates question/session states, updates clarification findings, and commits.
# @DATA_CONTRACT: Input[ClarificationAnswerCommand] -> Output[ClarificationStateResult]
def record_answer(self, command: ClarificationAnswerCommand) -> ClarificationStateResult:
with belief_scope("ClarificationEngine.record_answer"):
session = command.session
clarification_session = self._get_latest_clarification_session(session)
if clarification_session is None:
logger.explore(
"Cannot record clarification answer because no clarification session exists",
extra={"session_id": session.session_id},
)
raise ValueError("Clarification session not found")
question = self._find_question(clarification_session, command.question_id)
if question is None:
logger.explore(
"Cannot record clarification answer for foreign or missing question",
extra={"session_id": session.session_id, "question_id": command.question_id},
)
raise ValueError("Clarification question not found")
if question.answer is not None:
logger.explore(
"Rejected duplicate clarification answer submission",
extra={"session_id": session.session_id, "question_id": command.question_id},
)
raise ValueError("Clarification question already answered")
if clarification_session.current_question_id and clarification_session.current_question_id != question.question_id:
logger.explore(
"Rejected answer for non-active clarification question",
extra={
"session_id": session.session_id,
"question_id": question.question_id,
"current_question_id": clarification_session.current_question_id,
},
)
raise ValueError("Only the active clarification question can be answered")
normalized_answer_value = self._normalize_answer_value(command.answer_kind, command.answer_value, question)
logger.reason(
"Persisting clarification answer before state advancement",
extra={
"session_id": session.session_id,
"question_id": question.question_id,
"answer_kind": command.answer_kind.value,
},
)
persisted_answer = ClarificationAnswer(
question_id=question.question_id,
answer_kind=command.answer_kind,
answer_value=normalized_answer_value,
answered_by_user_id=command.user.id,
impact_summary=self._build_impact_summary(question, command.answer_kind, normalized_answer_value),
)
self.repository.db.add(persisted_answer)
self.repository.db.flush()
changed_finding = self._upsert_clarification_finding(
session=session,
question=question,
answer_kind=command.answer_kind,
answer_value=normalized_answer_value,
)
if command.answer_kind == AnswerKind.SELECTED:
question.state = QuestionState.ANSWERED
elif command.answer_kind == AnswerKind.CUSTOM:
question.state = QuestionState.ANSWERED
elif command.answer_kind == AnswerKind.SKIPPED:
question.state = QuestionState.SKIPPED
elif command.answer_kind == AnswerKind.EXPERT_REVIEW:
question.state = QuestionState.EXPERT_REVIEW
question.updated_at = datetime.utcnow()
self.repository.db.flush()
clarification_session.resolved_count = self._count_resolved_questions(clarification_session)
clarification_session.remaining_count = self._count_remaining_questions(clarification_session)
clarification_session.summary_delta = self.summarize_progress(clarification_session)
clarification_session.updated_at = datetime.utcnow()
next_question = self._select_next_open_question(clarification_session)
clarification_session.current_question_id = next_question.question_id if next_question else None
clarification_session.status = (
ClarificationStatus.ACTIVE if next_question else ClarificationStatus.COMPLETED
)
if clarification_session.status == ClarificationStatus.COMPLETED:
clarification_session.completed_at = datetime.utcnow()
session.readiness_state = self._derive_readiness_state(session)
session.recommended_action = self._derive_recommended_action(session)
session.current_phase = (
SessionPhase.CLARIFICATION
if clarification_session.current_question_id
else SessionPhase.REVIEW
)
session.last_activity_at = datetime.utcnow()
self.repository.db.commit()
self.repository.db.refresh(session)
logger.reflect(
"Clarification answer recorded and session advanced",
extra={
"session_id": session.session_id,
"question_id": question.question_id,
"next_question_id": clarification_session.current_question_id,
"readiness_state": session.readiness_state.value,
"remaining_count": clarification_session.remaining_count,
},
)
return ClarificationStateResult(
clarification_session=clarification_session,
current_question=self.build_question_payload(session),
session=session,
changed_findings=[changed_finding] if changed_finding else [],
)
# [/DEF:ClarificationEngine.record_answer:Function]
# [DEF:ClarificationEngine.summarize_progress:Function]
# @COMPLEXITY: 3
# @PURPOSE: Produce a compact progress summary for pause/resume and completion UX.
# @RELATION: [DEPENDS_ON] ->[ClarificationSession]
def summarize_progress(self, clarification_session: ClarificationSession) -> str:
resolved = self._count_resolved_questions(clarification_session)
remaining = self._count_remaining_questions(clarification_session)
return f"{resolved} resolved, {remaining} unresolved"
# [/DEF:ClarificationEngine.summarize_progress:Function]
# [DEF:ClarificationEngine._get_latest_clarification_session:Function]
# @COMPLEXITY: 2
# @PURPOSE: Select the latest clarification session for the current dataset review aggregate.
def _get_latest_clarification_session(
self,
session: DatasetReviewSession,
) -> Optional[ClarificationSession]:
if not session.clarification_sessions:
return None
ordered_sessions = sorted(
session.clarification_sessions,
key=lambda item: (item.started_at, item.clarification_session_id),
reverse=True,
)
return ordered_sessions[0]
# [/DEF:ClarificationEngine._get_latest_clarification_session:Function]
# [DEF:ClarificationEngine._find_question:Function]
# @COMPLEXITY: 1
# @PURPOSE: Resolve a clarification question from the active clarification aggregate.
def _find_question(
self,
clarification_session: ClarificationSession,
question_id: str,
) -> Optional[ClarificationQuestion]:
for question in clarification_session.questions:
if question.question_id == question_id:
return question
return None
# [/DEF:ClarificationEngine._find_question:Function]
# [DEF:ClarificationEngine._select_next_open_question:Function]
# @COMPLEXITY: 2
# @PURPOSE: Select the next unresolved question in deterministic priority order.
def _select_next_open_question(
self,
clarification_session: ClarificationSession,
) -> Optional[ClarificationQuestion]:
open_questions = [
question for question in clarification_session.questions
if question.state == QuestionState.OPEN
]
if not open_questions:
return None
open_questions.sort(key=lambda item: (-int(item.priority), item.created_at, item.question_id))
return open_questions[0]
# [/DEF:ClarificationEngine._select_next_open_question:Function]
# [DEF:ClarificationEngine._count_resolved_questions:Function]
# @COMPLEXITY: 1
# @PURPOSE: Count questions whose answers fully resolved the ambiguity.
def _count_resolved_questions(self, clarification_session: ClarificationSession) -> int:
return sum(
1
for question in clarification_session.questions
if question.state == QuestionState.ANSWERED
)
# [/DEF:ClarificationEngine._count_resolved_questions:Function]
# [DEF:ClarificationEngine._count_remaining_questions:Function]
# @COMPLEXITY: 1
# @PURPOSE: Count questions still unresolved or deferred after clarification interaction.
def _count_remaining_questions(self, clarification_session: ClarificationSession) -> int:
return sum(
1
for question in clarification_session.questions
if question.state in {QuestionState.OPEN, QuestionState.SKIPPED, QuestionState.EXPERT_REVIEW}
)
# [/DEF:ClarificationEngine._count_remaining_questions:Function]
# [DEF:ClarificationEngine._normalize_answer_value:Function]
# @COMPLEXITY: 2
# @PURPOSE: Validate and normalize answer payload based on answer kind and active question options.
def _normalize_answer_value(
self,
answer_kind: AnswerKind,
answer_value: Optional[str],
question: ClarificationQuestion,
) -> Optional[str]:
normalized_answer_value = str(answer_value).strip() if answer_value is not None else None
if answer_kind in {AnswerKind.SELECTED, AnswerKind.CUSTOM} and not normalized_answer_value:
raise ValueError("answer_value is required for selected or custom clarification answers")
if answer_kind == AnswerKind.SELECTED:
allowed_values = {option.value for option in question.options}
if normalized_answer_value not in allowed_values:
raise ValueError("answer_value must match one of the current clarification options")
if answer_kind == AnswerKind.SKIPPED:
return normalized_answer_value or "skipped"
if answer_kind == AnswerKind.EXPERT_REVIEW:
return normalized_answer_value or "expert_review"
return normalized_answer_value
# [/DEF:ClarificationEngine._normalize_answer_value:Function]
# [DEF:ClarificationEngine._build_impact_summary:Function]
# @COMPLEXITY: 2
# @PURPOSE: Build a compact audit note describing how the clarification answer affects session state.
def _build_impact_summary(
self,
question: ClarificationQuestion,
answer_kind: AnswerKind,
answer_value: Optional[str],
) -> str:
if answer_kind == AnswerKind.SKIPPED:
return f"Clarification for {question.topic_ref} was skipped and remains unresolved."
if answer_kind == AnswerKind.EXPERT_REVIEW:
return f"Clarification for {question.topic_ref} was deferred for expert review."
return f"Clarification for {question.topic_ref} recorded as '{answer_value}'."
# [/DEF:ClarificationEngine._build_impact_summary:Function]
# [DEF:ClarificationEngine._upsert_clarification_finding:Function]
# @COMPLEXITY: 3
# @PURPOSE: Keep one finding per clarification topic aligned with answer outcome and unresolved visibility rules.
# @RELATION: [DEPENDS_ON] ->[ValidationFinding]
def _upsert_clarification_finding(
self,
session: DatasetReviewSession,
question: ClarificationQuestion,
answer_kind: AnswerKind,
answer_value: Optional[str],
) -> ValidationFinding:
caused_by_ref = f"clarification:{question.question_id}"
existing = next(
(
finding for finding in session.findings
if finding.area == FindingArea.CLARIFICATION and finding.caused_by_ref == caused_by_ref
),
None,
)
if answer_kind in {AnswerKind.SELECTED, AnswerKind.CUSTOM}:
resolution_state = ResolutionState.RESOLVED
resolved_at = datetime.utcnow()
message = f"Clarified '{question.topic_ref}' with answer '{answer_value}'."
elif answer_kind == AnswerKind.SKIPPED:
resolution_state = ResolutionState.SKIPPED
resolved_at = None
message = f"Clarification for '{question.topic_ref}' was skipped and still needs review."
else:
resolution_state = ResolutionState.EXPERT_REVIEW
resolved_at = None
message = f"Clarification for '{question.topic_ref}' requires expert review."
if existing is None:
existing = ValidationFinding(
finding_id=str(uuid.uuid4()),
session_id=session.session_id,
area=FindingArea.CLARIFICATION,
severity=FindingSeverity.WARNING,
code="CLARIFICATION_PENDING",
title="Clarification pending",
message=message,
resolution_state=resolution_state,
resolution_note=None,
caused_by_ref=caused_by_ref,
created_at=datetime.utcnow(),
resolved_at=resolved_at,
)
self.repository.db.add(existing)
session.findings.append(existing)
else:
existing.message = message
existing.resolution_state = resolution_state
existing.resolved_at = resolved_at
if answer_kind in {AnswerKind.SELECTED, AnswerKind.CUSTOM}:
existing.code = "CLARIFICATION_RESOLVED"
existing.title = "Clarification resolved"
elif answer_kind == AnswerKind.SKIPPED:
existing.code = "CLARIFICATION_SKIPPED"
existing.title = "Clarification skipped"
else:
existing.code = "CLARIFICATION_EXPERT_REVIEW"
existing.title = "Clarification requires expert review"
return existing
# [/DEF:ClarificationEngine._upsert_clarification_finding:Function]
# [DEF:ClarificationEngine._derive_readiness_state:Function]
# @COMPLEXITY: 3
# @PURPOSE: Recompute readiness after clarification mutation while preserving unresolved visibility semantics.
# @RELATION: [DEPENDS_ON] ->[ClarificationSession]
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
def _derive_readiness_state(self, session: DatasetReviewSession) -> ReadinessState:
clarification_session = self._get_latest_clarification_session(session)
if clarification_session is None:
return session.readiness_state
if clarification_session.current_question_id:
return ReadinessState.CLARIFICATION_ACTIVE
if clarification_session.remaining_count > 0:
return ReadinessState.CLARIFICATION_NEEDED
return ReadinessState.REVIEW_READY
# [/DEF:ClarificationEngine._derive_readiness_state:Function]
# [DEF:ClarificationEngine._derive_recommended_action:Function]
# @COMPLEXITY: 2
# @PURPOSE: Recompute next-action guidance after clarification mutations.
def _derive_recommended_action(self, session: DatasetReviewSession) -> RecommendedAction:
clarification_session = self._get_latest_clarification_session(session)
if clarification_session is None:
return session.recommended_action
if clarification_session.current_question_id:
return RecommendedAction.ANSWER_NEXT_QUESTION
if clarification_session.remaining_count > 0:
return RecommendedAction.START_CLARIFICATION
return RecommendedAction.REVIEW_DOCUMENTATION
# [/DEF:ClarificationEngine._derive_recommended_action:Function]
# [/DEF:ClarificationEngine:Class]
# [/DEF:ClarificationEngine:Module]

View File

@@ -0,0 +1,156 @@
# [DEF:SessionEventLogger:Module]
# @COMPLEXITY: 4
# @SEMANTICS: dataset_review, audit, session_events, persistence, observability
# @PURPOSE: Persist explicit session mutation events for dataset-review audit trails without weakening ownership or approval invariants.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[SessionEvent]
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
# @PRE: Caller provides an owned session scope and an authenticated actor identifier for each persisted mutation event.
# @POST: Every logged event is committed as an explicit, queryable audit record with deterministic event metadata.
# @SIDE_EFFECT: Inserts persisted session event rows and emits runtime belief-state logs for audit-sensitive mutations.
from __future__ import annotations
# [DEF:SessionEventLogger.imports:Block]
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from sqlalchemy.orm import Session
from src.core.logger import belief_scope, logger
from src.models.dataset_review import DatasetReviewSession, SessionEvent
# [/DEF:SessionEventLogger.imports:Block]
# [DEF:SessionEventPayload:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed input contract for one persisted dataset-review session audit event.
@dataclass(frozen=True)
class SessionEventPayload:
session_id: str
actor_user_id: str
event_type: str
event_summary: str
current_phase: Optional[str] = None
readiness_state: Optional[str] = None
event_details: Dict[str, Any] = field(default_factory=dict)
# [/DEF:SessionEventPayload:Class]
# [DEF:SessionEventLogger:Class]
# @COMPLEXITY: 4
# @PURPOSE: Persist explicit dataset-review session audit events with meaningful runtime reasoning logs.
# @RELATION: [DEPENDS_ON] ->[SessionEvent]
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
# @PRE: The database session is live and payload identifiers are non-empty.
# @POST: Returns the committed session event row with a stable identifier and stored detail payload.
# @SIDE_EFFECT: Writes one audit row to persistence and emits logger.reason/logger.reflect traces.
class SessionEventLogger:
# [DEF:SessionEventLogger.__init__:Function]
# @COMPLEXITY: 2
# @PURPOSE: Bind a live SQLAlchemy session to the session-event logger.
def __init__(self, db: Session) -> None:
self.db = db
# [/DEF:SessionEventLogger.__init__:Function]
# [DEF:SessionEventLogger.log_event:Function]
# @COMPLEXITY: 4
# @PURPOSE: Persist one explicit session event row for an owned dataset-review mutation.
# @RELATION: [DEPENDS_ON] ->[SessionEvent]
# @PRE: session_id, actor_user_id, event_type, and event_summary are non-empty.
# @POST: Returns the committed SessionEvent record with normalized detail payload.
# @SIDE_EFFECT: Inserts and commits one session_events row.
# @DATA_CONTRACT: Input[SessionEventPayload] -> Output[SessionEvent]
def log_event(self, payload: SessionEventPayload) -> SessionEvent:
with belief_scope("SessionEventLogger.log_event"):
session_id = str(payload.session_id or "").strip()
actor_user_id = str(payload.actor_user_id or "").strip()
event_type = str(payload.event_type or "").strip()
event_summary = str(payload.event_summary or "").strip()
if not session_id:
logger.explore("Session event logging rejected because session_id is empty")
raise ValueError("session_id must be non-empty")
if not actor_user_id:
logger.explore(
"Session event logging rejected because actor_user_id is empty",
extra={"session_id": session_id},
)
raise ValueError("actor_user_id must be non-empty")
if not event_type:
logger.explore(
"Session event logging rejected because event_type is empty",
extra={"session_id": session_id, "actor_user_id": actor_user_id},
)
raise ValueError("event_type must be non-empty")
if not event_summary:
logger.explore(
"Session event logging rejected because event_summary is empty",
extra={"session_id": session_id, "event_type": event_type},
)
raise ValueError("event_summary must be non-empty")
normalized_details = dict(payload.event_details or {})
logger.reason(
"Persisting explicit dataset-review session audit event",
extra={
"session_id": session_id,
"actor_user_id": actor_user_id,
"event_type": event_type,
"current_phase": payload.current_phase,
"readiness_state": payload.readiness_state,
},
)
event = SessionEvent(
session_id=session_id,
actor_user_id=actor_user_id,
event_type=event_type,
event_summary=event_summary,
current_phase=payload.current_phase,
readiness_state=payload.readiness_state,
event_details=normalized_details,
)
self.db.add(event)
self.db.commit()
self.db.refresh(event)
logger.reflect(
"Dataset-review session audit event persisted",
extra={
"session_id": session_id,
"session_event_id": event.session_event_id,
"event_type": event.event_type,
},
)
return event
# [/DEF:SessionEventLogger.log_event:Function]
# [DEF:SessionEventLogger.log_for_session:Function]
# @COMPLEXITY: 3
# @PURPOSE: Convenience wrapper for logging an event directly from a session aggregate root.
# @RELATION: [CALLS] ->[SessionEventLogger.log_event]
def log_for_session(
self,
session: DatasetReviewSession,
*,
actor_user_id: str,
event_type: str,
event_summary: str,
event_details: Optional[Dict[str, Any]] = None,
) -> SessionEvent:
return self.log_event(
SessionEventPayload(
session_id=session.session_id,
actor_user_id=actor_user_id,
event_type=event_type,
event_summary=event_summary,
current_phase=session.current_phase.value if session.current_phase else None,
readiness_state=session.readiness_state.value if session.readiness_state else None,
event_details=dict(event_details or {}),
)
)
# [/DEF:SessionEventLogger.log_for_session:Function]
# [/DEF:SessionEventLogger:Class]
# [/DEF:SessionEventLogger:Module]

View File

@@ -19,23 +19,36 @@ from __future__ import annotations
# [DEF:DatasetReviewOrchestrator.imports:Block]
from dataclasses import dataclass, field
from datetime import datetime
import hashlib
import json
from typing import Any, Dict, List, Optional
from src.core.config_manager import ConfigManager
from src.core.logger import belief_scope, logger
from src.core.task_manager import TaskManager
from src.core.utils.superset_compilation_adapter import (
PreviewCompilationPayload,
SqlLabLaunchPayload,
SupersetCompilationAdapter,
)
from src.core.utils.superset_context_extractor import (
SupersetContextExtractor,
SupersetParsedContext,
)
from src.models.auth import User
from src.models.dataset_review import (
ApprovalState,
BusinessSummarySource,
CompiledPreview,
ConfidenceState,
DatasetProfile,
DatasetReviewSession,
DatasetRunContext,
FindingArea,
FindingSeverity,
LaunchStatus,
PreviewStatus,
RecommendedAction,
ReadinessState,
ResolutionState,
@@ -47,6 +60,7 @@ from src.services.dataset_review.repositories.session_repository import (
DatasetReviewSessionRepository,
)
from src.services.dataset_review.semantic_resolver import SemanticSourceResolver
from src.services.dataset_review.event_logger import SessionEventPayload
# [/DEF:DatasetReviewOrchestrator.imports:Block]
@@ -73,6 +87,48 @@ class StartSessionResult:
# [/DEF:StartSessionResult:Class]
# [DEF:PreparePreviewCommand:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed input contract for compiling one Superset-backed session preview.
@dataclass
class PreparePreviewCommand:
user: User
session_id: str
# [/DEF:PreparePreviewCommand:Class]
# [DEF:PreparePreviewResult:Class]
# @COMPLEXITY: 2
# @PURPOSE: Result contract for one persisted compiled preview attempt.
@dataclass
class PreparePreviewResult:
session: DatasetReviewSession
preview: CompiledPreview
blocked_reasons: List[str] = field(default_factory=list)
# [/DEF:PreparePreviewResult:Class]
# [DEF:LaunchDatasetCommand:Class]
# @COMPLEXITY: 2
# @PURPOSE: Typed input contract for launching one dataset-review session into SQL Lab.
@dataclass
class LaunchDatasetCommand:
user: User
session_id: str
# [/DEF:LaunchDatasetCommand:Class]
# [DEF:LaunchDatasetResult:Class]
# @COMPLEXITY: 2
# @PURPOSE: Launch result carrying immutable run context and any gate blockers surfaced before launch.
@dataclass
class LaunchDatasetResult:
session: DatasetReviewSession
run_context: DatasetRunContext
blocked_reasons: List[str] = field(default_factory=list)
# [/DEF:LaunchDatasetResult:Class]
# [DEF:DatasetReviewOrchestrator:Class]
# @COMPLEXITY: 5
# @PURPOSE: Coordinate safe session startup while preserving cross-user isolation and explicit partial recovery.
@@ -197,6 +253,23 @@ class DatasetReviewOrchestrator:
parsed_context=parsed_context,
dataset_ref=dataset_ref,
)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=persisted_session.session_id,
actor_user_id=command.user.id,
event_type="session_started",
event_summary="Dataset review session shell created",
current_phase=persisted_session.current_phase.value,
readiness_state=persisted_session.readiness_state.value,
event_details={
"source_kind": persisted_session.source_kind,
"dataset_ref": persisted_session.dataset_ref,
"dataset_id": persisted_session.dataset_id,
"dashboard_id": persisted_session.dashboard_id,
"partial_recovery": bool(parsed_context and parsed_context.partial_recovery),
},
)
)
persisted_session = self.repository.save_profile_and_findings(
persisted_session.session_id,
command.user.id,
@@ -213,6 +286,17 @@ class DatasetReviewOrchestrator:
persisted_session.active_task_id = active_task_id
self.repository.db.commit()
self.repository.db.refresh(persisted_session)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=persisted_session.session_id,
actor_user_id=command.user.id,
event_type="recovery_task_linked",
event_summary="Recovery task linked to dataset review session",
current_phase=persisted_session.current_phase.value,
readiness_state=persisted_session.readiness_state.value,
event_details={"task_id": active_task_id},
)
)
logger.reason(
"Linked recovery task to started dataset review session",
extra={"session_id": persisted_session.session_id, "task_id": active_task_id},
@@ -237,6 +321,238 @@ class DatasetReviewOrchestrator:
)
# [/DEF:DatasetReviewOrchestrator.start_session:Function]
# [DEF:DatasetReviewOrchestrator.prepare_launch_preview:Function]
# @COMPLEXITY: 4
# @PURPOSE: Assemble effective execution inputs and trigger Superset-side preview compilation.
# @RELATION: [CALLS] ->[SupersetCompilationAdapter.compile_preview]
# @PRE: all required variables have candidate values or explicitly accepted defaults.
# @POST: returns preview artifact in pending, ready, failed, or stale state.
# @SIDE_EFFECT: persists preview attempt and upstream compilation diagnostics.
# @DATA_CONTRACT: Input[PreparePreviewCommand] -> Output[PreparePreviewResult]
def prepare_launch_preview(self, command: PreparePreviewCommand) -> PreparePreviewResult:
with belief_scope("DatasetReviewOrchestrator.prepare_launch_preview"):
session = self.repository.load_session_detail(command.session_id, command.user.id)
if session is None or session.user_id != command.user.id:
logger.explore(
"Preview preparation rejected because owned session was not found",
extra={"session_id": command.session_id, "user_id": command.user.id},
)
raise ValueError("Session not found")
if session.dataset_id is None:
raise ValueError("Preview requires a resolved dataset_id")
environment = self.config_manager.get_environment(session.environment_id)
if environment is None:
raise ValueError("Environment not found")
execution_snapshot = self._build_execution_snapshot(session)
preview_blockers = execution_snapshot["preview_blockers"]
if preview_blockers:
logger.explore(
"Preview preparation blocked by incomplete execution context",
extra={
"session_id": session.session_id,
"blocked_reasons": preview_blockers,
},
)
raise ValueError("Preview blocked: " + "; ".join(preview_blockers))
adapter = SupersetCompilationAdapter(environment)
preview = adapter.compile_preview(
PreviewCompilationPayload(
session_id=session.session_id,
dataset_id=session.dataset_id,
preview_fingerprint=execution_snapshot["preview_fingerprint"],
template_params=execution_snapshot["template_params"],
effective_filters=execution_snapshot["effective_filters"],
)
)
persisted_preview = self.repository.save_preview(
session.session_id,
command.user.id,
preview,
)
session.current_phase = SessionPhase.PREVIEW
session.last_activity_at = datetime.utcnow()
if persisted_preview.preview_status == PreviewStatus.READY:
launch_blockers = self._build_launch_blockers(
session=session,
execution_snapshot=execution_snapshot,
preview=persisted_preview,
)
if launch_blockers:
session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY
session.recommended_action = RecommendedAction.APPROVE_MAPPING
else:
session.readiness_state = ReadinessState.RUN_READY
session.recommended_action = RecommendedAction.LAUNCH_DATASET
else:
session.readiness_state = ReadinessState.PARTIALLY_READY
session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW
self.repository.db.commit()
self.repository.db.refresh(session)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=session.session_id,
actor_user_id=command.user.id,
event_type="preview_generated",
event_summary="Superset preview generation persisted",
current_phase=session.current_phase.value,
readiness_state=session.readiness_state.value,
event_details={
"preview_id": persisted_preview.preview_id,
"preview_status": persisted_preview.preview_status.value,
"preview_fingerprint": persisted_preview.preview_fingerprint,
},
)
)
logger.reflect(
"Superset preview preparation completed",
extra={
"session_id": session.session_id,
"preview_id": persisted_preview.preview_id,
"preview_status": persisted_preview.preview_status.value,
"preview_fingerprint": persisted_preview.preview_fingerprint,
},
)
return PreparePreviewResult(
session=session,
preview=persisted_preview,
blocked_reasons=[],
)
# [/DEF:DatasetReviewOrchestrator.prepare_launch_preview:Function]
# [DEF:DatasetReviewOrchestrator.launch_dataset:Function]
# @COMPLEXITY: 5
# @PURPOSE: Start the approved dataset execution through SQL Lab and persist run context for audit/replay.
# @RELATION: [CALLS] ->[SupersetCompilationAdapter.create_sql_lab_session]
# @PRE: session is run-ready and compiled preview is current.
# @POST: returns persisted run context with SQL Lab session reference and launch outcome.
# @SIDE_EFFECT: creates SQL Lab execution session and audit snapshot.
# @DATA_CONTRACT: Input[LaunchDatasetCommand] -> Output[LaunchDatasetResult]
# @INVARIANT: launch remains blocked unless blocking findings are closed, approvals are satisfied, and the latest Superset preview fingerprint matches current execution inputs.
def launch_dataset(self, command: LaunchDatasetCommand) -> LaunchDatasetResult:
with belief_scope("DatasetReviewOrchestrator.launch_dataset"):
session = self.repository.load_session_detail(command.session_id, command.user.id)
if session is None or session.user_id != command.user.id:
logger.explore(
"Launch rejected because owned session was not found",
extra={"session_id": command.session_id, "user_id": command.user.id},
)
raise ValueError("Session not found")
if session.dataset_id is None:
raise ValueError("Launch requires a resolved dataset_id")
environment = self.config_manager.get_environment(session.environment_id)
if environment is None:
raise ValueError("Environment not found")
execution_snapshot = self._build_execution_snapshot(session)
current_preview = self._get_latest_preview(session)
launch_blockers = self._build_launch_blockers(
session=session,
execution_snapshot=execution_snapshot,
preview=current_preview,
)
if launch_blockers:
logger.explore(
"Launch gate blocked dataset execution",
extra={
"session_id": session.session_id,
"blocked_reasons": launch_blockers,
},
)
raise ValueError("Launch blocked: " + "; ".join(launch_blockers))
adapter = SupersetCompilationAdapter(environment)
try:
sql_lab_session_ref = adapter.create_sql_lab_session(
SqlLabLaunchPayload(
session_id=session.session_id,
dataset_id=session.dataset_id,
preview_id=current_preview.preview_id,
compiled_sql=str(current_preview.compiled_sql or ""),
template_params=execution_snapshot["template_params"],
)
)
launch_status = LaunchStatus.STARTED
launch_error = None
except Exception as exc:
logger.explore(
"SQL Lab launch failed after passing gates",
extra={"session_id": session.session_id, "error": str(exc)},
)
sql_lab_session_ref = "unavailable"
launch_status = LaunchStatus.FAILED
launch_error = str(exc)
run_context = DatasetRunContext(
session_id=session.session_id,
dataset_ref=session.dataset_ref,
environment_id=session.environment_id,
preview_id=current_preview.preview_id,
sql_lab_session_ref=sql_lab_session_ref,
effective_filters=execution_snapshot["effective_filters"],
template_params=execution_snapshot["template_params"],
approved_mapping_ids=execution_snapshot["approved_mapping_ids"],
semantic_decision_refs=execution_snapshot["semantic_decision_refs"],
open_warning_refs=execution_snapshot["open_warning_refs"],
launch_status=launch_status,
launch_error=launch_error,
)
persisted_run_context = self.repository.save_run_context(
session.session_id,
command.user.id,
run_context,
)
session.current_phase = SessionPhase.LAUNCH
session.last_activity_at = datetime.utcnow()
if launch_status == LaunchStatus.FAILED:
session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY
session.recommended_action = RecommendedAction.LAUNCH_DATASET
else:
session.readiness_state = ReadinessState.RUN_IN_PROGRESS
session.recommended_action = RecommendedAction.EXPORT_OUTPUTS
self.repository.db.commit()
self.repository.db.refresh(session)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=session.session_id,
actor_user_id=command.user.id,
event_type="dataset_launch_requested",
event_summary="Dataset launch handoff persisted",
current_phase=session.current_phase.value,
readiness_state=session.readiness_state.value,
event_details={
"run_context_id": persisted_run_context.run_context_id,
"launch_status": persisted_run_context.launch_status.value,
"preview_id": persisted_run_context.preview_id,
"sql_lab_session_ref": persisted_run_context.sql_lab_session_ref,
},
)
)
logger.reflect(
"Dataset launch orchestration completed with audited run context",
extra={
"session_id": session.session_id,
"run_context_id": persisted_run_context.run_context_id,
"launch_status": persisted_run_context.launch_status.value,
"preview_id": persisted_run_context.preview_id,
},
)
return LaunchDatasetResult(
session=session,
run_context=persisted_run_context,
blocked_reasons=[],
)
# [/DEF:DatasetReviewOrchestrator.launch_dataset:Function]
# [DEF:DatasetReviewOrchestrator._parse_dataset_selection:Function]
# @COMPLEXITY: 3
# @PURPOSE: Normalize dataset-selection payload into canonical session references.
@@ -328,6 +644,158 @@ class DatasetReviewOrchestrator:
return findings
# [/DEF:DatasetReviewOrchestrator._build_partial_recovery_findings:Function]
# [DEF:DatasetReviewOrchestrator._build_execution_snapshot:Function]
# @COMPLEXITY: 4
# @PURPOSE: Build effective filters, template params, approvals, and fingerprint for preview and launch gating.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
# @PRE: Session aggregate includes imported filters, template variables, and current execution mappings.
# @POST: returns deterministic execution snapshot for current session state without mutating persistence.
# @SIDE_EFFECT: none.
# @DATA_CONTRACT: Input[DatasetReviewSession] -> Output[Dict[str,Any]]
def _build_execution_snapshot(self, session: DatasetReviewSession) -> Dict[str, Any]:
filter_lookup = {item.filter_id: item for item in session.imported_filters}
variable_lookup = {item.variable_id: item for item in session.template_variables}
effective_filters: List[Dict[str, Any]] = []
template_params: Dict[str, Any] = {}
approved_mapping_ids: List[str] = []
open_warning_refs: List[str] = []
preview_blockers: List[str] = []
for mapping in session.execution_mappings:
imported_filter = filter_lookup.get(mapping.filter_id)
template_variable = variable_lookup.get(mapping.variable_id)
if imported_filter is None:
preview_blockers.append(f"mapping:{mapping.mapping_id}:missing_filter")
continue
if template_variable is None:
preview_blockers.append(f"mapping:{mapping.mapping_id}:missing_variable")
continue
effective_value = mapping.effective_value
if effective_value is None:
effective_value = imported_filter.normalized_value
if effective_value is None:
effective_value = imported_filter.raw_value
if effective_value is None:
effective_value = template_variable.default_value
if effective_value is None and template_variable.is_required:
preview_blockers.append(f"variable:{template_variable.variable_name}:missing_required_value")
continue
effective_filters.append(
{
"mapping_id": mapping.mapping_id,
"filter_id": imported_filter.filter_id,
"filter_name": imported_filter.filter_name,
"variable_id": template_variable.variable_id,
"variable_name": template_variable.variable_name,
"effective_value": effective_value,
"raw_input_value": mapping.raw_input_value,
}
)
template_params[template_variable.variable_name] = effective_value
if mapping.approval_state == ApprovalState.APPROVED:
approved_mapping_ids.append(mapping.mapping_id)
if mapping.requires_explicit_approval and mapping.approval_state != ApprovalState.APPROVED:
open_warning_refs.append(mapping.mapping_id)
mapped_variable_ids = {mapping.variable_id for mapping in session.execution_mappings}
for variable in session.template_variables:
if variable.variable_id in mapped_variable_ids:
continue
if variable.default_value is not None:
template_params[variable.variable_name] = variable.default_value
continue
if variable.is_required:
preview_blockers.append(f"variable:{variable.variable_name}:unmapped")
semantic_decision_refs = [
field.field_id
for field in session.semantic_fields
if field.is_locked or not field.needs_review or field.provenance.value != "unresolved"
]
preview_fingerprint = self._compute_preview_fingerprint(
{
"dataset_id": session.dataset_id,
"template_params": template_params,
"effective_filters": effective_filters,
}
)
return {
"effective_filters": effective_filters,
"template_params": template_params,
"approved_mapping_ids": sorted(approved_mapping_ids),
"semantic_decision_refs": sorted(semantic_decision_refs),
"open_warning_refs": sorted(open_warning_refs),
"preview_blockers": sorted(set(preview_blockers)),
"preview_fingerprint": preview_fingerprint,
}
# [/DEF:DatasetReviewOrchestrator._build_execution_snapshot:Function]
# [DEF:DatasetReviewOrchestrator._build_launch_blockers:Function]
# @COMPLEXITY: 4
# @PURPOSE: Enforce launch gates from findings, approvals, and current preview truth.
# @RELATION: [DEPENDS_ON] ->[CompiledPreview]
# @PRE: execution_snapshot was computed from current session state and preview is the latest persisted preview or None.
# @POST: returns explicit blocker codes for every unmet launch invariant.
# @SIDE_EFFECT: none.
# @DATA_CONTRACT: Input[DatasetReviewSession,Dict[str,Any],CompiledPreview|None] -> Output[List[str]]
def _build_launch_blockers(
self,
session: DatasetReviewSession,
execution_snapshot: Dict[str, Any],
preview: Optional[CompiledPreview],
) -> List[str]:
blockers = list(execution_snapshot["preview_blockers"])
for finding in session.findings:
if (
finding.severity == FindingSeverity.BLOCKING
and finding.resolution_state not in {ResolutionState.RESOLVED, ResolutionState.APPROVED}
):
blockers.append(f"finding:{finding.code}:blocking")
for mapping in session.execution_mappings:
if mapping.requires_explicit_approval and mapping.approval_state != ApprovalState.APPROVED:
blockers.append(f"mapping:{mapping.mapping_id}:approval_required")
if preview is None:
blockers.append("preview:missing")
else:
if preview.preview_status != PreviewStatus.READY:
blockers.append(f"preview:{preview.preview_status.value}")
if preview.preview_fingerprint != execution_snapshot["preview_fingerprint"]:
blockers.append("preview:fingerprint_mismatch")
return sorted(set(blockers))
# [/DEF:DatasetReviewOrchestrator._build_launch_blockers:Function]
# [DEF:DatasetReviewOrchestrator._get_latest_preview:Function]
# @COMPLEXITY: 2
# @PURPOSE: Resolve the current latest preview snapshot for one session aggregate.
def _get_latest_preview(self, session: DatasetReviewSession) -> Optional[CompiledPreview]:
if not session.previews:
return None
if session.last_preview_id:
for preview in session.previews:
if preview.preview_id == session.last_preview_id:
return preview
return sorted(
session.previews,
key=lambda item: (item.created_at or datetime.min, item.preview_id),
reverse=True,
)[0]
# [/DEF:DatasetReviewOrchestrator._get_latest_preview:Function]
# [DEF:DatasetReviewOrchestrator._compute_preview_fingerprint:Function]
# @COMPLEXITY: 2
# @PURPOSE: Produce deterministic execution fingerprint for preview truth and staleness checks.
def _compute_preview_fingerprint(self, payload: Dict[str, Any]) -> str:
serialized = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(serialized.encode("utf-8")).hexdigest()
# [/DEF:DatasetReviewOrchestrator._compute_preview_fingerprint:Function]
# [DEF:DatasetReviewOrchestrator._enqueue_recovery_task:Function]
# @COMPLEXITY: 4
# @PURPOSE: Link session start to observable async recovery when task infrastructure is available.

View File

@@ -16,14 +16,19 @@ from typing import Optional, List
from sqlalchemy import or_
from sqlalchemy.orm import Session, joinedload
from src.models.dataset_review import (
ClarificationQuestion,
ClarificationSession,
DatasetReviewSession,
DatasetProfile,
ValidationFinding,
CompiledPreview,
DatasetRunContext,
SessionCollaborator
SemanticFieldEntry,
SessionCollaborator,
SessionEvent,
)
from src.core.logger import belief_scope
from src.core.logger import belief_scope, logger
from src.services.dataset_review.event_logger import SessionEventLogger
# [DEF:SessionRepo:Class]
# @COMPLEXITY: 4
@@ -37,16 +42,46 @@ from src.core.logger import belief_scope
# @SIDE_EFFECT: mutates and queries the persistence layer through the injected database session.
# @DATA_CONTRACT: Input[OwnedSessionQuery|SessionMutation] -> Output[PersistedSessionAggregate|PersistedChildRecord]
class DatasetReviewSessionRepository:
"""
@PURPOSE: Persist and retrieve dataset review session aggregates.
@INVARIANT: ownership_scope -> All operations must respect the session owner's user_id.
"""
# [DEF:init_repo:Function]
# @COMPLEXITY: 2
# @PURPOSE: Bind one live SQLAlchemy session to the repository instance.
def __init__(self, db: Session):
self.db = db
self.event_logger = SessionEventLogger(db)
# [/DEF:init_repo:Function]
# [DEF:get_owned_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Resolve one owner-scoped dataset review session for mutation paths without leaking foreign-session state.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @PRE: session_id and user_id are non-empty identifiers from the authenticated ownership scope.
# @POST: returns the owned session or raises a deterministic access error.
# @SIDE_EFFECT: reads one session row from the current database transaction.
# @DATA_CONTRACT: Input[OwnedSessionQuery] -> Output[DatasetReviewSession|ValueError]
def _get_owned_session(self, session_id: str, user_id: str) -> DatasetReviewSession:
with belief_scope("DatasetReviewSessionRepository.get_owned_session"):
logger.reason(
"Resolving owner-scoped dataset review session for mutation path",
extra={"session_id": session_id, "user_id": user_id},
)
session = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.session_id == session_id,
DatasetReviewSession.user_id == user_id,
).first()
if not session:
logger.explore(
"Owner-scoped dataset review session lookup failed",
extra={"session_id": session_id, "user_id": user_id},
)
raise ValueError("Session not found or access denied")
logger.reflect(
"Owner-scoped dataset review session resolved",
extra={"session_id": session.session_id, "user_id": session.user_id},
)
return session
# [/DEF:get_owned_session:Function]
# [DEF:create_sess:Function]
# @COMPLEXITY: 4
# @PURPOSE: Persist an initial dataset review session shell.
@@ -57,9 +92,17 @@ class DatasetReviewSessionRepository:
# @DATA_CONTRACT: Input[DatasetReviewSession] -> Output[DatasetReviewSession]
def create_session(self, session: DatasetReviewSession) -> DatasetReviewSession:
with belief_scope("DatasetReviewSessionRepository.create_session"):
logger.reason(
"Persisting dataset review session shell",
extra={"user_id": session.user_id, "environment_id": session.environment_id},
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
logger.reflect(
"Dataset review session shell persisted with stable identifier",
extra={"session_id": session.session_id, "user_id": session.user_id},
)
return session
# [/DEF:create_sess:Function]
@@ -69,25 +112,27 @@ class DatasetReviewSessionRepository:
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @RELATION: [DEPENDS_ON] -> [SessionCollaborator]
def load_session_detail(self, session_id: str, user_id: str) -> Optional[DatasetReviewSession]:
"""
@PRE: user_id must match session owner or authorized collaborator.
"""
with belief_scope("DatasetReviewSessionRepository.load_session_detail"):
# Check if user is owner or collaborator
return self.db.query(DatasetReviewSession)\
logger.reason(
"Loading dataset review session detail for owner-or-collaborator scope",
extra={"session_id": session_id, "user_id": user_id},
)
session = self.db.query(DatasetReviewSession)\
.outerjoin(SessionCollaborator, DatasetReviewSession.session_id == SessionCollaborator.session_id)\
.options(
joinedload(DatasetReviewSession.profile),
joinedload(DatasetReviewSession.findings),
joinedload(DatasetReviewSession.collaborators),
joinedload(DatasetReviewSession.semantic_sources),
joinedload(DatasetReviewSession.semantic_fields),
joinedload(DatasetReviewSession.semantic_fields).joinedload(SemanticFieldEntry.candidates),
joinedload(DatasetReviewSession.imported_filters),
joinedload(DatasetReviewSession.template_variables),
joinedload(DatasetReviewSession.execution_mappings),
joinedload(DatasetReviewSession.clarification_sessions),
joinedload(DatasetReviewSession.clarification_sessions).joinedload(ClarificationSession.questions).joinedload(ClarificationQuestion.options),
joinedload(DatasetReviewSession.clarification_sessions).joinedload(ClarificationSession.questions).joinedload(ClarificationQuestion.answer),
joinedload(DatasetReviewSession.previews),
joinedload(DatasetReviewSession.run_contexts)
joinedload(DatasetReviewSession.run_contexts),
joinedload(DatasetReviewSession.events)
)\
.filter(DatasetReviewSession.session_id == session_id)\
.filter(
@@ -97,6 +142,15 @@ class DatasetReviewSessionRepository:
)
)\
.first()
logger.reflect(
"Dataset review session detail lookup completed",
extra={
"session_id": session_id,
"user_id": user_id,
"found": bool(session),
},
)
return session
# [/DEF:load_detail:Function]
# [DEF:save_prof_find:Function]
@@ -111,32 +165,40 @@ class DatasetReviewSessionRepository:
# @DATA_CONTRACT: Input[ProfileAndFindingsMutation] -> Output[DatasetReviewSession]
def save_profile_and_findings(self, session_id: str, user_id: str, profile: DatasetProfile, findings: List[ValidationFinding]) -> DatasetReviewSession:
with belief_scope("DatasetReviewSessionRepository.save_profile_and_findings"):
session = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.session_id == session_id,
DatasetReviewSession.user_id == user_id
).first()
if not session:
raise ValueError("Session not found or access denied")
session = self._get_owned_session(session_id, user_id)
logger.reason(
"Persisting dataset profile and replacing validation findings",
extra={
"session_id": session_id,
"user_id": user_id,
"has_profile": bool(profile),
"findings_count": len(findings),
},
)
if profile:
# Ensure we update existing profile by session_id if it exists
existing_profile = self.db.query(DatasetProfile).filter_by(session_id=session_id).first()
if existing_profile:
profile.profile_id = existing_profile.profile_id
self.db.merge(profile)
# Remove old findings for this session to avoid stale data
self.db.query(ValidationFinding).filter(
ValidationFinding.session_id == session_id
).delete()
# Add new findings
for finding in findings:
finding.session_id = session_id
self.db.add(finding)
self.db.commit()
logger.reflect(
"Dataset profile and validation findings committed",
extra={
"session_id": session.session_id,
"user_id": user_id,
"findings_count": len(findings),
},
)
return self.load_session_detail(session_id, user_id)
# [/DEF:save_prof_find:Function]
@@ -151,15 +213,12 @@ class DatasetReviewSessionRepository:
# @DATA_CONTRACT: Input[PreviewMutation] -> Output[CompiledPreview]
def save_preview(self, session_id: str, user_id: str, preview: CompiledPreview) -> CompiledPreview:
with belief_scope("DatasetReviewSessionRepository.save_preview"):
session = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.session_id == session_id,
DatasetReviewSession.user_id == user_id
).first()
session = self._get_owned_session(session_id, user_id)
logger.reason(
"Persisting compiled preview and staling previous preview snapshots",
extra={"session_id": session_id, "user_id": user_id},
)
if not session:
raise ValueError("Session not found or access denied")
# Mark existing previews for this session as stale if they are not the new one
self.db.query(CompiledPreview).filter(
CompiledPreview.session_id == session_id
).update({"preview_status": "stale"})
@@ -170,6 +229,14 @@ class DatasetReviewSessionRepository:
self.db.commit()
self.db.refresh(preview)
logger.reflect(
"Compiled preview committed as latest session preview",
extra={
"session_id": session.session_id,
"preview_id": preview.preview_id,
"user_id": user_id,
},
)
return preview
# [/DEF:save_prev:Function]
@@ -184,13 +251,11 @@ class DatasetReviewSessionRepository:
# @DATA_CONTRACT: Input[RunContextMutation] -> Output[DatasetRunContext]
def save_run_context(self, session_id: str, user_id: str, run_context: DatasetRunContext) -> DatasetRunContext:
with belief_scope("DatasetReviewSessionRepository.save_run_context"):
session = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.session_id == session_id,
DatasetReviewSession.user_id == user_id
).first()
if not session:
raise ValueError("Session not found or access denied")
session = self._get_owned_session(session_id, user_id)
logger.reason(
"Persisting dataset run context audit snapshot",
extra={"session_id": session_id, "user_id": user_id},
)
self.db.add(run_context)
self.db.flush()
@@ -198,6 +263,14 @@ class DatasetReviewSessionRepository:
self.db.commit()
self.db.refresh(run_context)
logger.reflect(
"Dataset run context committed as latest launch snapshot",
extra={
"session_id": session.session_id,
"run_context_id": run_context.run_context_id,
"user_id": user_id,
},
)
return run_context
# [/DEF:save_run_ctx:Function]
@@ -207,9 +280,18 @@ class DatasetReviewSessionRepository:
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
def list_sessions_for_user(self, user_id: str) -> List[DatasetReviewSession]:
with belief_scope("DatasetReviewSessionRepository.list_sessions_for_user"):
return self.db.query(DatasetReviewSession).filter(
logger.reason(
"Listing dataset review sessions for owner scope",
extra={"user_id": user_id},
)
sessions = self.db.query(DatasetReviewSession).filter(
DatasetReviewSession.user_id == user_id
).order_by(DatasetReviewSession.updated_at.desc()).all()
logger.reflect(
"Dataset review session list assembled",
extra={"user_id": user_id, "session_count": len(sessions)},
)
return sessions
# [/DEF:list_user_sess:Function]
# [/DEF:SessionRepo:Class]

View File

@@ -24,6 +24,7 @@ from src.models.dataset_review import (
CandidateMatchType,
CandidateStatus,
FieldProvenance,
SemanticSource,
)
# [/DEF:SemanticSourceResolver.imports:Block]
@@ -259,6 +260,63 @@ class SemanticSourceResolver:
return merged
# [/DEF:SemanticSourceResolver.apply_field_decision:Function]
# [DEF:SemanticSourceResolver.propagate_source_version_update:Function]
# @COMPLEXITY: 4
# @PURPOSE: Propagate a semantic source version change to unlocked field entries without silently overwriting manual or locked values.
# @RELATION: [DEPENDS_ON] ->[SemanticSource]
# @RELATION: [DEPENDS_ON] ->[SemanticFieldEntry]
# @PRE: source is persisted and fields belong to the same session aggregate.
# @POST: unlocked fields linked to the source carry the new source version and are marked reviewable; manual or locked fields keep their active values untouched.
# @SIDE_EFFECT: mutates in-memory field state for the caller to persist.
# @DATA_CONTRACT: Input[SemanticSource,List[SemanticFieldEntry]] -> Output[Dict[str,int]]
def propagate_source_version_update(
self,
source: SemanticSource,
fields: Iterable[Any],
) -> Dict[str, int]:
with belief_scope("SemanticSourceResolver.propagate_source_version_update"):
source_id = str(source.source_id or "").strip()
source_version = str(source.source_version or "").strip()
if not source_id or not source_version:
logger.explore(
"Semantic source version propagation rejected due to incomplete source metadata",
extra={"source_id": source_id, "source_version": source_version},
)
raise ValueError("Semantic source must provide source_id and source_version")
propagated = 0
preserved_locked = 0
untouched = 0
for field in fields:
if str(getattr(field, "source_id", "") or "").strip() != source_id:
untouched += 1
continue
if bool(getattr(field, "is_locked", False)) or getattr(field, "provenance", None) == FieldProvenance.MANUAL_OVERRIDE:
preserved_locked += 1
continue
field.source_version = source_version
field.needs_review = True
field.has_conflict = bool(getattr(field, "has_conflict", False))
propagated += 1
logger.reflect(
"Semantic source version propagation completed",
extra={
"source_id": source_id,
"source_version": source_version,
"propagated": propagated,
"preserved_locked": preserved_locked,
"untouched": untouched,
},
)
return {
"propagated": propagated,
"preserved_locked": preserved_locked,
"untouched": untouched,
}
# [/DEF:SemanticSourceResolver.propagate_source_version_update:Function]
# [DEF:SemanticSourceResolver._normalize_dictionary_row:Function]
# @COMPLEXITY: 2
# @PURPOSE: Normalize one dictionary row into a consistent lookup structure.