feat(ui): add chat-driven dataset review flow

Move dataset review clarification into the assistant workspace and
rework the review page into a chat-centric layout with execution rails.

Add session-scoped assistant actions for mappings, semantic fields,
and SQL preview generation. Introduce optimistic locking for dataset
review mutations, propagate session versions through API responses,
and mask imported filter values before assistant exposure.

Refresh tests, i18n, and spec artifacts to match the new workflow.

BREAKING CHANGE: dataset review mutation endpoints now require the
X-Session-Version header, and clarification is no longer handled
through ClarificationDialog-based flows
This commit is contained in:
2026-03-26 13:33:12 +03:00
parent d7911fb2f1
commit 7c85552132
74 changed files with 6122 additions and 2970 deletions

View File

@@ -21,6 +21,20 @@ from pydantic import BaseModel
from src.api.routes import assistant as assistant_routes
from src.schemas.auth import User
from src.models.assistant import AssistantMessageRecord
from src.models.dataset_review import (
ApprovalState,
CandidateStatus,
DatasetReviewSession,
ExecutionMapping,
ImportedFilter,
MappingMethod,
SemanticCandidate,
SemanticFieldEntry,
ReadinessState,
RecommendedAction,
SessionPhase,
SessionStatus,
)
# [DEF:_run_async:Function]
@@ -167,6 +181,12 @@ class _FakeQuery:
def __init__(self, items):
self.items = items
def outerjoin(self, *args, **kwargs):
return self
def options(self, *args, **kwargs):
return self
def filter(self, *args, **kwargs):
# @INVARIANT: filter() is predicate-blind; returns all records regardless of user_id scope
return self
@@ -203,10 +223,13 @@ class _FakeQuery:
class _FakeDb:
def __init__(self):
self.added = []
self.dataset_sessions = {}
def query(self, model):
if model == AssistantMessageRecord:
return _FakeQuery([])
if model == DatasetReviewSession:
return _FakeQuery(list(self.dataset_sessions.values()))
return _FakeQuery([])
def add(self, obj):
@@ -240,6 +263,121 @@ def _clear_assistant_state():
# [/DEF:_clear_assistant_state:Function]
# [DEF:_dataset_review_session:Function]
# @RELATION: BINDS_TO -> [AssistantApiTests]
# @COMPLEXITY: 1
# @PURPOSE: Build minimal owned dataset-review session fixture for assistant scoped routing tests.
def _dataset_review_session():
session = DatasetReviewSession(
session_id="sess-1",
user_id="u-admin",
environment_id="env-1",
source_kind="superset_link",
source_input="http://superset.local/dashboard/10",
dataset_ref="public.sales",
dataset_id=42,
version=3,
readiness_state=ReadinessState.MAPPING_REVIEW_NEEDED,
recommended_action=RecommendedAction.APPROVE_MAPPING,
status=SessionStatus.ACTIVE,
current_phase=SessionPhase.MAPPING_REVIEW,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
last_activity_at=datetime.utcnow(),
)
session.findings = []
session.previews = []
session.imported_filters = [
ImportedFilter(
filter_id="filter-1",
session_id="sess-1",
filter_name="email",
display_name="Email",
raw_value="john.doe@example.com",
raw_value_masked=False,
normalized_value="john.doe@example.com",
source="manual",
confidence_state="confirmed",
requires_confirmation=False,
recovery_status="recovered",
notes=None,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
]
session.execution_mappings = [
ExecutionMapping(
mapping_id="map-1",
session_id="sess-1",
filter_id="filter-1",
variable_id="var-1",
mapping_method=MappingMethod.DIRECT_MATCH,
raw_input_value="john.doe@example.com",
effective_value="john.doe@example.com",
transformation_note=None,
warning_level=None,
requires_explicit_approval=True,
approval_state=ApprovalState.PENDING,
approved_by_user_id=None,
approved_at=None,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
]
session.semantic_fields = []
session.semantic_fields = [
SemanticFieldEntry(
field_id="field-1",
session_id="sess-1",
field_name="customer_name",
field_kind="dimension",
verbose_name="Customer name",
description="Current semantic label",
display_format="text",
provenance="unresolved",
source_id=None,
source_version=None,
confidence_rank=None,
is_locked=False,
has_conflict=True,
needs_review=True,
last_changed_by="system",
)
]
session.semantic_fields[0].candidates = [
SemanticCandidate(
candidate_id="cand-1",
field_id="field-1",
source_id=None,
candidate_rank=1,
match_type="exact",
confidence_score=0.99,
proposed_verbose_name="Customer legal name",
proposed_description="Approved semantic wording",
proposed_display_format="text",
status=CandidateStatus.PROPOSED,
)
]
session.template_variables = []
session.clarification_sessions = []
session.run_contexts = []
return session
# [/DEF:_dataset_review_session:Function]
# [DEF:_await_none:Function]
# @RELATION: BINDS_TO -> [AssistantApiTests]
# @COMPLEXITY: 1
# @PURPOSE: Async helper returning None for planner fallback tests.
async def _await_none(*args, **kwargs):
return None
# [/DEF:_await_none:Function]
# [DEF:test_unknown_command_returns_needs_clarification:Function]
# @RELATION: BINDS_TO -> [AssistantApiTests]
# @PURPOSE: Unknown command should return clarification state and unknown intent.
@@ -288,6 +426,139 @@ def test_capabilities_question_returns_successful_help(monkeypatch):
assert "я могу сделать" in resp.text.lower()
# [/DEF:test_capabilities_question_returns_successful_help:Function]
# [DEF:test_assistant_message_request_accepts_dataset_review_session_binding:Function]
# @RELATION: BINDS_TO -> [AssistantApiTests]
# @PURPOSE: Assistant request schema should accept active dataset review session binding for scoped orchestration.
def test_assistant_message_request_accepts_dataset_review_session_binding():
request = assistant_routes.AssistantMessageRequest(
message="approve mappings",
dataset_review_session_id="sess-1",
)
assert request.dataset_review_session_id == "sess-1"
# [/DEF:test_assistant_message_request_accepts_dataset_review_session_binding:Function]
# [DEF:test_dataset_review_scoped_message_uses_masked_filter_context:Function]
# @RELATION: BINDS_TO -> [AssistantApiTests]
# @PURPOSE: Session-scoped assistant context should mask imported-filter raw values before assistant-visible metadata is persisted.
def test_dataset_review_scoped_message_uses_masked_filter_context(monkeypatch):
_clear_assistant_state()
db = _FakeDb()
db.dataset_sessions["sess-1"] = _dataset_review_session()
req = assistant_routes.AssistantMessageRequest(
message="show filters",
dataset_review_session_id="sess-1",
)
assistant_routes._plan_intent_with_llm = _await_none
async def _fake_dispatch_dataset_review_intent(
intent, current_user, config_manager, db
):
return str(intent["entities"]["summary"]), None, []
monkeypatch.setattr(
assistant_routes,
"_dispatch_dataset_review_intent",
_fake_dispatch_dataset_review_intent,
)
resp = _run_async(
assistant_routes.send_message(
req,
current_user=_admin_user(),
task_manager=_FakeTaskManager(),
config_manager=_FakeConfigManager(),
db=db,
)
)
assert resp.state == "success"
persisted_assistant = [
item for item in db.added if getattr(item, "role", None) == "assistant"
][-1]
imported_filters = persisted_assistant.payload["dataset_review_context"][
"imported_filters"
]
assert imported_filters[0]["raw_value"] == "***@example.com"
assert imported_filters[0]["raw_value_masked"] is True
# [/DEF:test_dataset_review_scoped_message_uses_masked_filter_context:Function]
# [DEF:test_dataset_review_scoped_command_returns_confirmation_for_mapping_approval:Function]
# @RELATION: BINDS_TO -> [AssistantApiTests]
# @PURPOSE: Session-scoped assistant commands should route dataset-review mapping approvals into confirmation workflow with bound session metadata.
def test_dataset_review_scoped_command_returns_confirmation_for_mapping_approval():
_clear_assistant_state()
db = _FakeDb()
db.dataset_sessions["sess-1"] = _dataset_review_session()
req = assistant_routes.AssistantMessageRequest(
message="approve mappings",
dataset_review_session_id="sess-1",
)
assistant_routes._plan_intent_with_llm = _await_none
resp = _run_async(
assistant_routes.send_message(
req,
current_user=_admin_user(),
task_manager=_FakeTaskManager(),
config_manager=_FakeConfigManager(),
db=db,
)
)
assert resp.state == "needs_confirmation"
assert resp.intent["operation"] == "dataset_review_approve_mappings"
assert resp.intent["entities"]["dataset_review_session_id"] == "sess-1"
assert resp.intent["entities"]["session_version"] == 3
assert resp.intent["entities"]["mapping_ids"] == ["map-1"]
# [/DEF:test_dataset_review_scoped_command_returns_confirmation_for_mapping_approval:Function]
# [DEF:test_dataset_review_scoped_command_routes_field_semantics_update:Function]
# @RELATION: BINDS_TO -> [AssistantApiTests]
# @PURPOSE: Session-scoped assistant commands should route semantic field updates through explicit confirmation metadata.
def test_dataset_review_scoped_command_routes_field_semantics_update():
_clear_assistant_state()
db = _FakeDb()
db.dataset_sessions["sess-1"] = _dataset_review_session()
req = assistant_routes.AssistantMessageRequest(
message='set field semantics target=field:field-1 desc="Approved semantic wording" lock',
dataset_review_session_id="sess-1",
)
assistant_routes._plan_intent_with_llm = _await_none
resp = _run_async(
assistant_routes.send_message(
req,
current_user=_admin_user(),
task_manager=_FakeTaskManager(),
config_manager=_FakeConfigManager(),
db=db,
)
)
assert resp.state == "needs_confirmation"
assert resp.intent["operation"] == "dataset_review_set_field_semantics"
assert resp.intent["entities"]["dataset_review_session_id"] == "sess-1"
assert resp.intent["entities"]["field_id"] == "field-1"
assert resp.intent["entities"]["description"] == "Approved semantic wording"
assert resp.intent["entities"]["lock_field"] is True
# [/DEF:test_dataset_review_scoped_command_routes_field_semantics_update:Function]
# [/DEF:test_capabilities_question_returns_successful_help:Function]
# [/DEF:AssistantApiTests:Module]

View File

@@ -65,6 +65,9 @@ from src.services.dataset_review.orchestrator import (
)
from src.services.dataset_review.semantic_resolver import SemanticSourceResolver
from src.services.dataset_review.event_logger import SessionEventLogger
from src.services.dataset_review.repositories.session_repository import (
DatasetReviewSessionVersionConflictError,
)
client = TestClient(app)
@@ -122,6 +125,7 @@ def _make_session():
dataset_ref="public.sales",
dataset_id=42,
dashboard_id=10,
version=0,
readiness_state=ReadinessState.REVIEW_READY,
recommended_action=RecommendedAction.REVIEW_DOCUMENTATION,
status=SessionStatus.ACTIVE,
@@ -853,6 +857,10 @@ def test_get_session_detail_export_and_lifecycle_endpoints(
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.list_sessions_for_user.return_value = [session]
repository.require_session_version.side_effect = lambda current, expected: current
repository.bump_session_version.side_effect = lambda current: setattr(
current, "version", int(getattr(current, "version", 0) or 0) + 1
) or getattr(current, "version", 0)
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
repository.event_logger.log_for_session.return_value = SimpleNamespace(
@@ -868,6 +876,7 @@ def test_get_session_detail_export_and_lifecycle_endpoints(
patch_response = client.patch(
"/api/dataset-orchestration/sessions/sess-1",
json={"status": "paused"},
headers={"X-Session-Version": "0"},
)
assert patch_response.status_code == 200
assert patch_response.json()["status"] == "paused"
@@ -885,13 +894,39 @@ def test_get_session_detail_export_and_lifecycle_endpoints(
assert validation_response.json()["artifact_type"] == "validation_report"
assert "Validation Report" in validation_response.json()["content"]["markdown"]
delete_response = client.delete("/api/dataset-orchestration/sessions/sess-1")
delete_response = client.delete(
"/api/dataset-orchestration/sessions/sess-1",
headers={"X-Session-Version": "1"},
)
assert delete_response.status_code == 204
# [/DEF:test_get_session_detail_export_and_lifecycle_endpoints:Function]
# [DEF:test_get_clarification_state_returns_empty_payload_when_session_has_no_record:Function]
# @PURPOSE: Clarification state endpoint should return a non-blocking empty payload when the session has no clarification aggregate yet.
def test_get_clarification_state_returns_empty_payload_when_session_has_no_record(
dataset_review_api_dependencies,
):
session = _make_us3_session()
repository = MagicMock()
repository.load_session_detail.return_value = session
app.dependency_overrides[_get_repository] = lambda: repository
response = client.get("/api/dataset-orchestration/sessions/sess-1/clarification")
assert response.status_code == 200
assert response.json() == {
"clarification_session": None,
"current_question": None,
}
# [/DEF:test_get_clarification_state_returns_empty_payload_when_session_has_no_record:Function]
# [DEF:test_us2_clarification_endpoints_persist_answer_and_feedback:Function]
# @RELATION: BINDS_TO -> DatasetReviewApiTests
# @PURPOSE: Clarification endpoints should expose one current question, persist the answer before advancement, and store feedback on the answer audit record.
@@ -901,6 +936,7 @@ def test_us2_clarification_endpoints_persist_answer_and_feedback(
session = _make_us2_session()
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.require_session_version.side_effect = lambda current, expected: current
repository.db = MagicMock()
repository.db.commit.side_effect = lambda: None
repository.db.refresh.side_effect = lambda obj: None
@@ -933,6 +969,7 @@ def test_us2_clarification_endpoints_persist_answer_and_feedback(
"answer_kind": "selected",
"answer_value": "Revenue reporting",
},
headers={"X-Session-Version": "0"},
)
assert answer_response.status_code == 200
answer_payload = answer_response.json()
@@ -947,6 +984,7 @@ def test_us2_clarification_endpoints_persist_answer_and_feedback(
feedback_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/clarification/questions/q-1/feedback",
json={"feedback": "up"},
headers={"X-Session-Version": "0"},
)
assert feedback_response.status_code == 200
assert feedback_response.json() == {"target_id": "q-1", "feedback": "up"}
@@ -965,6 +1003,10 @@ def test_us2_field_semantic_override_lock_unlock_and_feedback(
session = _make_us2_session()
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.require_session_version.side_effect = lambda current, expected: current
repository.bump_session_version.side_effect = lambda current: setattr(
current, "version", int(getattr(current, "version", 0) or 0) + 1
) or getattr(current, "version", 0)
repository.db = MagicMock()
repository.db.commit.side_effect = lambda: None
repository.db.refresh.side_effect = lambda obj: None
@@ -984,6 +1026,7 @@ def test_us2_field_semantic_override_lock_unlock_and_feedback(
"description": "Manual business-approved description",
"display_format": "$,.0f",
},
headers={"X-Session-Version": "0"},
)
assert override_response.status_code == 200
override_payload = override_response.json()
@@ -991,7 +1034,8 @@ def test_us2_field_semantic_override_lock_unlock_and_feedback(
assert override_payload["is_locked"] is True
unlock_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/fields/field-1/unlock"
"/api/dataset-orchestration/sessions/sess-1/fields/field-1/unlock",
headers={"X-Session-Version": "1"},
)
assert unlock_response.status_code == 200
assert unlock_response.json()["is_locked"] is False
@@ -999,6 +1043,7 @@ def test_us2_field_semantic_override_lock_unlock_and_feedback(
candidate_response = client.patch(
"/api/dataset-orchestration/sessions/sess-1/fields/field-1/semantic",
json={"candidate_id": "cand-1", "lock_field": True},
headers={"X-Session-Version": "2"},
)
assert candidate_response.status_code == 200
candidate_payload = candidate_response.json()
@@ -1013,6 +1058,7 @@ def test_us2_field_semantic_override_lock_unlock_and_feedback(
{"field_id": "field-1", "candidate_id": "cand-1", "lock_field": False}
]
},
headers={"X-Session-Version": "3"},
)
assert batch_response.status_code == 200
assert batch_response.json()[0]["field_id"] == "field-1"
@@ -1020,6 +1066,7 @@ def test_us2_field_semantic_override_lock_unlock_and_feedback(
feedback_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/fields/field-1/feedback",
json={"feedback": "down"},
headers={"X-Session-Version": "4"},
)
assert feedback_response.status_code == 200
assert feedback_response.json() == {"target_id": "field-1", "feedback": "down"}
@@ -1052,6 +1099,10 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.require_session_version.side_effect = lambda current, expected: current
repository.bump_session_version.side_effect = lambda current: setattr(
current, "version", int(getattr(current, "version", 0) or 0) + 1
) or getattr(current, "version", 0)
repository.db = MagicMock()
repository.db.commit.side_effect = lambda: None
repository.db.refresh.side_effect = lambda obj: None
@@ -1100,6 +1151,21 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
blocked_reasons=[],
)
def _assert_expected_preview_version(command):
assert command.expected_version == 3
return PreparePreviewResult(
session=session, preview=preview, blocked_reasons=[]
)
def _assert_expected_launch_version(command):
assert command.expected_version == 5
return LaunchDatasetResult(
session=session, run_context=run_context, blocked_reasons=[]
)
orchestrator.prepare_launch_preview.side_effect = _assert_expected_preview_version
orchestrator.launch_dataset.side_effect = _assert_expected_launch_version
app.dependency_overrides[_get_repository] = lambda: repository
app.dependency_overrides[_get_orchestrator] = lambda: orchestrator
@@ -1110,6 +1176,7 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
"mapping_method": "manual_override",
"transformation_note": "Manual override for SQL Lab launch",
},
headers={"X-Session-Version": "0"},
)
assert patch_response.status_code == 200
patch_payload = patch_response.json()
@@ -1130,6 +1197,7 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
approve_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/mappings/map-1/approve",
json={"approval_note": "Approved after reviewing transformation"},
headers={"X-Session-Version": "1"},
)
assert approve_response.status_code == 200
approve_payload = approve_response.json()
@@ -1144,6 +1212,7 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
batch_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/mappings/approve-batch",
json={"mapping_ids": ["map-1"]},
headers={"X-Session-Version": "2"},
)
assert batch_response.status_code == 200
assert batch_response.json()[0]["mapping_id"] == "map-1"
@@ -1152,7 +1221,10 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
assert list_response.status_code == 200
assert list_response.json()["items"][0]["mapping_id"] == "map-1"
preview_response = client.post("/api/dataset-orchestration/sessions/sess-1/preview")
preview_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/preview",
headers={"X-Session-Version": "3"},
)
assert preview_response.status_code == 200
preview_payload = preview_response.json()
assert preview_payload["preview_id"] == "preview-1"
@@ -1160,33 +1232,44 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
assert preview_payload["compiled_by"] == "superset"
assert "SELECT * FROM sales" in preview_payload["compiled_sql"]
orchestrator.prepare_launch_preview.return_value = PreparePreviewResult(
session=session,
preview=SimpleNamespace(
preview_id="preview-pending",
session_id="sess-1",
preview_status=PreviewStatus.PENDING,
compiled_sql=None,
preview_fingerprint="fingerprint-pending",
compiled_by="superset",
error_code=None,
error_details=None,
compiled_at=None,
created_at=datetime.now(timezone.utc),
),
blocked_reasons=[],
def _assert_expected_pending_preview_version(command):
assert command.expected_version == 4
return PreparePreviewResult(
session=session,
preview=SimpleNamespace(
preview_id="preview-pending",
session_id="sess-1",
preview_status=PreviewStatus.PENDING,
compiled_sql=None,
preview_fingerprint="fingerprint-pending",
compiled_by="superset",
error_code=None,
error_details=None,
compiled_at=None,
created_at=datetime.now(timezone.utc),
),
blocked_reasons=[],
)
orchestrator.prepare_launch_preview.side_effect = (
_assert_expected_pending_preview_version
)
preview_enqueue_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/preview"
"/api/dataset-orchestration/sessions/sess-1/preview",
headers={"X-Session-Version": "4"},
)
assert preview_enqueue_response.status_code == 202
assert preview_enqueue_response.json() == {
"session_id": "sess-1",
"session_version": 3,
"preview_status": "pending",
"task_id": None,
}
launch_response = client.post("/api/dataset-orchestration/sessions/sess-1/launch")
launch_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/launch",
headers={"X-Session-Version": "5"},
)
assert launch_response.status_code == 201
launch_payload = launch_response.json()
assert launch_payload["run_context"]["run_context_id"] == "run-1"
@@ -1201,6 +1284,105 @@ def test_us3_mapping_patch_approval_preview_and_launch_endpoints(
# [/DEF:test_us3_mapping_patch_approval_preview_and_launch_endpoints:Function]
# [DEF:test_us3_preview_response_propagates_refreshed_session_version_for_launch_follow_up:Function]
# @RELATION: BINDS_TO -> DatasetReviewApiTests
# @PURPOSE: Preview response should expose the refreshed session version so the normal preview-then-launch UI flow can satisfy optimistic locking without a forced full reload.
def test_us3_preview_response_propagates_refreshed_session_version_for_launch_follow_up(
dataset_review_api_dependencies,
):
session = _make_us3_session()
session.version = 4
repository = MagicMock()
repository.load_session_detail.return_value = session
def _require_session_version(current, expected):
if int(getattr(current, "version", 0) or 0) != expected:
raise DatasetReviewSessionVersionConflictError(
current.session_id,
expected,
int(getattr(current, "version", 0) or 0),
)
return current
repository.require_session_version.side_effect = _require_session_version
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
preview = SimpleNamespace(
preview_id="preview-2",
session_id="sess-1",
preview_status=PreviewStatus.READY,
compiled_sql="SELECT * FROM sales WHERE country = 'FR'",
preview_fingerprint="fingerprint-2",
compiled_by="superset",
error_code=None,
error_details=None,
compiled_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
)
run_context = SimpleNamespace(
run_context_id="run-2",
session_id="sess-1",
dataset_ref="public.sales",
environment_id="env-1",
preview_id="preview-2",
sql_lab_session_ref="sql-lab-88",
effective_filters=[{"mapping_id": "map-1", "effective_value": "FR"}],
template_params={"country": "FR"},
approved_mapping_ids=[],
semantic_decision_refs=[],
open_warning_refs=[],
launch_status=LaunchStatus.STARTED,
launch_error=None,
created_at=datetime.now(timezone.utc),
)
orchestrator = MagicMock()
def _prepare_preview(command):
assert command.expected_version == 4
session.version = 5
return PreparePreviewResult(
session=session,
preview=preview,
blocked_reasons=[],
)
def _launch_dataset(command):
assert command.expected_version == 5
return LaunchDatasetResult(
session=session,
run_context=run_context,
blocked_reasons=[],
)
orchestrator.prepare_launch_preview.side_effect = _prepare_preview
orchestrator.launch_dataset.side_effect = _launch_dataset
app.dependency_overrides[_get_repository] = lambda: repository
app.dependency_overrides[_get_orchestrator] = lambda: orchestrator
preview_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/preview",
headers={"X-Session-Version": "4"},
)
assert preview_response.status_code == 200
preview_payload = preview_response.json()
assert preview_payload["session_version"] == 5
launch_response = client.post(
"/api/dataset-orchestration/sessions/sess-1/launch",
headers={"X-Session-Version": str(preview_payload["session_version"])},
)
assert launch_response.status_code == 201
assert launch_response.json()["run_context"]["run_context_id"] == "run-2"
# [/DEF:test_us3_preview_response_propagates_refreshed_session_version_for_launch_follow_up:Function]
# [DEF:test_us3_preview_endpoint_returns_failed_preview_without_false_dashboard_not_found_contract_drift:Function]
# @RELATION: BINDS_TO -> DatasetReviewApiTests
# @PURPOSE: Preview endpoint should preserve API contract and surface generic upstream preview failures without fabricating dashboard-not-found semantics for non-dashboard 404s.
@@ -1235,7 +1417,10 @@ def test_us3_preview_endpoint_returns_failed_preview_without_false_dashboard_not
app.dependency_overrides[_get_repository] = lambda: repository
app.dependency_overrides[_get_orchestrator] = lambda: orchestrator
response = client.post("/api/dataset-orchestration/sessions/sess-1/preview")
response = client.post(
"/api/dataset-orchestration/sessions/sess-1/preview",
headers={"X-Session-Version": "0"},
)
assert response.status_code == 200
payload = response.json()
@@ -1252,6 +1437,46 @@ def test_us3_preview_endpoint_returns_failed_preview_without_false_dashboard_not
# [/DEF:test_us3_preview_endpoint_returns_failed_preview_without_false_dashboard_not_found_contract_drift:Function]
# [DEF:test_mutation_endpoints_surface_session_version_conflict_payload:Function]
# @RELATION: BINDS_TO -> DatasetReviewApiTests
# @PURPOSE: Dataset review mutation endpoints should return deterministic 409 conflict semantics when optimistic-lock versions are stale.
def test_mutation_endpoints_surface_session_version_conflict_payload(
dataset_review_api_dependencies,
):
session = _make_us3_session()
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.require_session_version.side_effect = (
DatasetReviewSessionVersionConflictError(
session_id="sess-1",
expected_version=2,
actual_version=5,
)
)
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
app.dependency_overrides[_get_repository] = lambda: repository
response = client.patch(
"/api/dataset-orchestration/sessions/sess-1/mappings/map-1",
json={
"effective_value": "EU",
"mapping_method": "manual_override",
},
headers={"X-Session-Version": "2"},
)
assert response.status_code == 409
payload = response.json()["detail"]
assert payload["error_code"] == "session_version_conflict"
assert payload["expected_version"] == 2
assert payload["actual_version"] == 5
# [/DEF:test_mutation_endpoints_surface_session_version_conflict_payload:Function]
# [DEF:test_execution_snapshot_includes_recovered_imported_filters_without_template_mapping:Function]
# @RELATION: BINDS_TO -> DatasetReviewApiTests
# @PURPOSE: Recovered imported filters with values should flow into preview filter context even when no template variable mapping exists.

View File

@@ -15,7 +15,7 @@ import json
import re
import uuid
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, cast
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
@@ -40,6 +40,9 @@ from ...services.llm_prompt_templates import (
resolve_bound_provider_id,
)
from ...core.superset_client import SupersetClient
from ...core.utils.superset_context_extractor import (
sanitize_imported_filter_for_assistant,
)
from ...plugins.llm_analysis.service import LLMClient
from ...plugins.llm_analysis.models import LLMProviderType
from ...schemas.auth import User
@@ -48,9 +51,25 @@ from ...models.assistant import (
AssistantConfirmationRecord,
AssistantMessageRecord,
)
from ...models.dataset_review import (
ApprovalState,
DatasetReviewSession,
ReadinessState,
RecommendedAction,
)
from ...services.dataset_review.orchestrator import (
DatasetReviewOrchestrator,
PreparePreviewCommand,
)
from ...services.dataset_review.repositories.session_repository import (
DatasetReviewSessionRepository,
DatasetReviewSessionVersionConflictError,
)
from .dataset_review import FieldSemanticUpdateRequest, _update_semantic_field_state
router = APIRouter(tags=["Assistant"])
git_service = GitService()
logger = cast(Any, logger)
# [DEF:AssistantMessageRequest:Class]
@@ -65,6 +84,7 @@ git_service = GitService()
class AssistantMessageRequest(BaseModel):
conversation_id: Optional[str] = None
message: str = Field(..., min_length=1, max_length=4000)
dataset_review_session_id: Optional[str] = None
# [/DEF:AssistantMessageRequest:Class]
@@ -159,6 +179,10 @@ INTENT_PERMISSION_CHECKS: Dict[str, List[Tuple[str, str]]] = {
"run_llm_validation": [("plugin:llm_dashboard_validation", "EXECUTE")],
"run_llm_documentation": [("plugin:llm_documentation", "EXECUTE")],
"get_health_summary": [("plugin:migration", "READ")],
"dataset_review_answer_context": [("dataset:session", "READ")],
"dataset_review_approve_mappings": [("dataset:session", "MANAGE")],
"dataset_review_set_field_semantics": [("dataset:session", "MANAGE")],
"dataset_review_generate_sql_preview": [("dataset:session", "MANAGE")],
}
@@ -1139,7 +1163,10 @@ def _has_any_permission(current_user: User, checks: List[Tuple[str, str]]) -> bo
# @PRE: current_user is authenticated; config/db are available.
# @POST: Returns list of executable tools filtered by permission and runtime availability.
def _build_tool_catalog(
current_user: User, config_manager: ConfigManager, db: Session
current_user: User,
config_manager: ConfigManager,
db: Session,
dataset_review_context: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
envs = config_manager.get_environments()
default_env_id = _get_default_environment_id(config_manager)
@@ -1271,6 +1298,61 @@ def _build_tool_catalog(
if checks and not _has_any_permission(current_user, checks):
continue
available.append(tool)
if dataset_review_context is not None:
dataset_tools: List[Dict[str, Any]] = [
{
"operation": "dataset_review_answer_context",
"domain": "dataset_review",
"description": "Answer questions using the currently bound dataset review session context",
"required_entities": ["dataset_review_session_id"],
"optional_entities": [],
"risk_level": "safe",
"requires_confirmation": False,
},
{
"operation": "dataset_review_approve_mappings",
"domain": "dataset_review",
"description": "Approve warning-sensitive execution mappings in the current dataset review session",
"required_entities": ["dataset_review_session_id", "session_version"],
"optional_entities": ["mapping_ids"],
"risk_level": "guarded",
"requires_confirmation": True,
},
{
"operation": "dataset_review_set_field_semantics",
"domain": "dataset_review",
"description": "Apply explicit semantic field override or candidate selection in the current dataset review session",
"required_entities": [
"dataset_review_session_id",
"session_version",
"field_id",
],
"optional_entities": [
"candidate_id",
"verbose_name",
"description",
"display_format",
"lock_field",
],
"risk_level": "guarded",
"requires_confirmation": True,
},
{
"operation": "dataset_review_generate_sql_preview",
"domain": "dataset_review",
"description": "Generate a Superset-compiled SQL preview for the current dataset review session",
"required_entities": ["dataset_review_session_id", "session_version"],
"optional_entities": [],
"risk_level": "guarded",
"requires_confirmation": True,
},
]
for tool in dataset_tools:
checks = INTENT_PERMISSION_CHECKS.get(tool["operation"], [])
if checks and not _has_any_permission(current_user, checks):
continue
available.append(tool)
return available
@@ -1301,7 +1383,602 @@ def _coerce_intent_entities(intent: Dict[str, Any]) -> Dict[str, Any]:
# Operations that are read-only and do not require confirmation.
_SAFE_OPS = {"show_capabilities", "get_task_status", "get_health_summary"}
_SAFE_OPS = {
"show_capabilities",
"get_task_status",
"get_health_summary",
"dataset_review_answer_context",
}
_DATASET_REVIEW_OPS = {
"dataset_review_approve_mappings",
"dataset_review_set_field_semantics",
"dataset_review_generate_sql_preview",
}
# [DEF:_serialize_dataset_review_context:Function]
# @COMPLEXITY: 4
# @PURPOSE: Build assistant-safe dataset-review context snapshot with masked imported-filter payloads for session-scoped assistant routing.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
def _serialize_dataset_review_context(
session: DatasetReviewSession,
) -> Dict[str, Any]:
latest_preview = None
previews = getattr(session, "previews", []) or []
if previews:
latest_preview = previews[-1]
return {
"session_id": session.session_id,
"version": int(getattr(session, "version", 0) or 0),
"dataset_ref": session.dataset_ref,
"dataset_id": session.dataset_id,
"environment_id": session.environment_id,
"readiness_state": session.readiness_state.value,
"recommended_action": session.recommended_action.value,
"status": session.status.value,
"current_phase": session.current_phase.value,
"findings": [
{
"finding_id": item.finding_id,
"code": item.code,
"severity": item.severity.value,
"message": item.message,
"resolution_state": item.resolution_state.value,
}
for item in getattr(session, "findings", [])
],
"imported_filters": [
sanitize_imported_filter_for_assistant(
{
"filter_id": item.filter_id,
"filter_name": item.filter_name,
"display_name": item.display_name,
"raw_value": item.raw_value,
"raw_value_masked": bool(getattr(item, "raw_value_masked", False)),
"normalized_value": item.normalized_value,
"source": getattr(item.source, "value", item.source),
"confidence_state": getattr(
item.confidence_state, "value", item.confidence_state
),
"requires_confirmation": bool(item.requires_confirmation),
"recovery_status": getattr(
item.recovery_status, "value", item.recovery_status
),
"notes": item.notes,
}
)
for item in getattr(session, "imported_filters", [])
],
"mappings": [
{
"mapping_id": item.mapping_id,
"filter_id": item.filter_id,
"variable_id": item.variable_id,
"mapping_method": getattr(
item.mapping_method, "value", item.mapping_method
),
"effective_value": item.effective_value,
"approval_state": getattr(
item.approval_state, "value", item.approval_state
),
"requires_explicit_approval": bool(item.requires_explicit_approval),
}
for item in getattr(session, "execution_mappings", [])
],
"semantic_fields": [
{
"field_id": item.field_id,
"field_name": item.field_name,
"verbose_name": item.verbose_name,
"description": item.description,
"display_format": item.display_format,
"provenance": getattr(item.provenance, "value", item.provenance),
"is_locked": bool(item.is_locked),
"needs_review": bool(item.needs_review),
"candidates": [
{
"candidate_id": candidate.candidate_id,
"proposed_verbose_name": candidate.proposed_verbose_name,
"proposed_description": candidate.proposed_description,
"proposed_display_format": candidate.proposed_display_format,
"status": getattr(candidate.status, "value", candidate.status),
}
for candidate in getattr(item, "candidates", [])
],
}
for item in getattr(session, "semantic_fields", [])
],
"preview": {
"preview_id": latest_preview.preview_id,
"preview_status": getattr(
latest_preview.preview_status, "value", latest_preview.preview_status
),
"compiled_sql": latest_preview.compiled_sql,
}
if latest_preview is not None
else None,
}
# [/DEF:_serialize_dataset_review_context:Function]
# [DEF:_load_dataset_review_context:Function]
# @COMPLEXITY: 4
# @PURPOSE: Load owner-scoped dataset-review context for assistant planning and grounded response generation.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
def _load_dataset_review_context(
dataset_review_session_id: Optional[str],
current_user: User,
db: Session,
) -> Optional[Dict[str, Any]]:
if not dataset_review_session_id:
return None
repository = DatasetReviewSessionRepository(db)
session = repository.load_session_detail(dataset_review_session_id, current_user.id)
if session is None or session.user_id != current_user.id:
raise HTTPException(status_code=404, detail="Dataset review session not found")
return _serialize_dataset_review_context(session)
# [/DEF:_load_dataset_review_context:Function]
# [DEF:_extract_dataset_review_target:Function]
# @COMPLEXITY: 2
# @PURPOSE: Extract structured dataset-review focus target hints embedded in assistant prompts.
def _extract_dataset_review_target(message: str) -> Tuple[Optional[str], Optional[str]]:
match = re.search(
r"(?:target|focus)\s*[:=]\s*(field|mapping|finding|filter)[:=]([A-Za-z0-9._-]+)",
str(message or ""),
re.IGNORECASE,
)
if not match:
return None, None
return match.group(1).lower(), match.group(2)
# [/DEF:_extract_dataset_review_target:Function]
# [DEF:_match_dataset_review_field:Function]
# @COMPLEXITY: 2
# @PURPOSE: Resolve one semantic field from assistant-visible context by id or user-visible label.
def _match_dataset_review_field(
dataset_context: Dict[str, Any],
message: str,
) -> Optional[Dict[str, Any]]:
target_kind, target_id = _extract_dataset_review_target(message)
fields = dataset_context.get("semantic_fields", []) or []
if target_kind == "field" and target_id:
return next(
(item for item in fields if str(item.get("field_id")) == str(target_id)),
None,
)
normalized_message = str(message or "").lower()
for field in fields:
if str(field.get("field_id", "")).lower() in normalized_message:
return field
field_name = str(field.get("field_name", "")).lower()
if field_name and field_name in normalized_message:
return field
verbose_name = str(field.get("verbose_name", "")).lower()
if verbose_name and verbose_name in normalized_message:
return field
return None
# [/DEF:_match_dataset_review_field:Function]
# [DEF:_extract_quoted_segment:Function]
# @COMPLEXITY: 2
# @PURPOSE: Extract one quoted assistant command segment after a label token.
def _extract_quoted_segment(message: str, label: str) -> Optional[str]:
pattern = rf"{label}\s*[=:]?\s*[\"']([^\"']+)[\"']"
match = re.search(pattern, str(message or ""), re.IGNORECASE)
return match.group(1).strip() if match else None
# [/DEF:_extract_quoted_segment:Function]
# [DEF:_dataset_review_conflict_http_exception:Function]
# @COMPLEXITY: 2
# @PURPOSE: Convert dataset-review optimistic-lock conflicts into shared 409 assistant semantics.
def _dataset_review_conflict_http_exception(
exc: DatasetReviewSessionVersionConflictError,
) -> HTTPException:
return HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"error_code": "session_version_conflict",
"message": str(exc),
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
},
)
# [/DEF:_dataset_review_conflict_http_exception:Function]
# [DEF:_plan_dataset_review_intent:Function]
# @COMPLEXITY: 3
# @PURPOSE: Parse session-scoped dataset-review assistant commands before falling back to generic assistant tool routing.
def _plan_dataset_review_intent(
message: str,
dataset_context: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
lower = message.strip().lower()
session_id = dataset_context["session_id"]
session_version = int(dataset_context.get("version", 0) or 0)
target_kind, target_id = _extract_dataset_review_target(message)
if any(
token in lower
for token in [
"approve mappings",
"approve mapping",
"подтверди мапп",
"одобри мапп",
]
):
pending_mapping_ids = [
item["mapping_id"]
for item in dataset_context.get("mappings", [])
if item.get("requires_explicit_approval")
and item.get("approval_state") != ApprovalState.APPROVED.value
]
return {
"domain": "dataset_review",
"operation": "dataset_review_approve_mappings",
"entities": {
"dataset_review_session_id": session_id,
"session_version": session_version,
"mapping_ids": pending_mapping_ids,
},
"confidence": 0.95,
"risk_level": "guarded",
"requires_confirmation": True,
}
if any(
token in lower
for token in [
"generate sql preview",
"generate preview",
"сгенерируй превью",
"собери превью",
]
):
return {
"domain": "dataset_review",
"operation": "dataset_review_generate_sql_preview",
"entities": {
"dataset_review_session_id": session_id,
"session_version": session_version,
},
"confidence": 0.94,
"risk_level": "guarded",
"requires_confirmation": True,
}
if any(
token in lower
for token in [
"set field semantics",
"apply field semantics",
"semantic override",
"update semantic field",
"установи семантик",
"обнови семантик",
]
):
field = _match_dataset_review_field(dataset_context, message)
if field is None:
return None
candidate_id = None
if any(
token in lower
for token in ["accept candidate", "apply candidate", "прими кандидат"]
):
candidates = field.get("candidates") or []
if candidates:
candidate_id = candidates[0].get("candidate_id")
verbose_name = _extract_quoted_segment(
message, "verbose_name|verbose name|label"
)
description = _extract_quoted_segment(message, "description|desc")
display_format = _extract_quoted_segment(
message, "display_format|display format|format"
)
lock_field = any(
token in lower for token in [" lock", "lock it", "зафикс", "закреп"]
)
if not any([candidate_id, verbose_name, description, display_format]):
return None
return {
"domain": "dataset_review",
"operation": "dataset_review_set_field_semantics",
"entities": {
"dataset_review_session_id": session_id,
"session_version": session_version,
"field_id": field.get("field_id") or target_id,
"candidate_id": candidate_id,
"verbose_name": verbose_name,
"description": description,
"display_format": display_format,
"lock_field": lock_field,
},
"confidence": 0.9,
"risk_level": "guarded",
"requires_confirmation": True,
}
if any(
token in lower
for token in [
"filters",
"фильтр",
"mapping",
"маппинг",
"preview",
"превью",
"finding",
"ошиб",
]
):
findings_count = len(dataset_context.get("findings", []))
mappings_count = len(dataset_context.get("mappings", []))
filters_count = len(dataset_context.get("imported_filters", []))
preview = dataset_context.get("preview") or {}
preview_status = preview.get("preview_status") or "missing"
masked_filters = dataset_context.get("imported_filters", [])
return {
"domain": "dataset_review",
"operation": "dataset_review_answer_context",
"entities": {
"dataset_review_session_id": session_id,
"summary": (
f"Session {session_id}: readiness={dataset_context['readiness_state']}, "
f"recommended_action={dataset_context['recommended_action']}, "
f"filters={filters_count}, mappings={mappings_count}, findings={findings_count}, "
f"preview_status={preview_status}, imported_filters={masked_filters}"
),
},
"confidence": 0.8,
"risk_level": "safe",
"requires_confirmation": False,
}
return None
# [/DEF:_plan_dataset_review_intent:Function]
# [DEF:_dispatch_dataset_review_intent:Function]
# @COMPLEXITY: 4
# @PURPOSE: Route confirmed dataset-review assistant intents through existing backend dataset-review APIs and orchestration boundaries.
async def _dispatch_dataset_review_intent(
intent: Dict[str, Any],
current_user: User,
config_manager: ConfigManager,
db: Session,
) -> Tuple[str, Optional[str], List[AssistantAction]]:
with belief_scope("assistant.dispatch_dataset_review_intent"):
entities = intent.get("entities", {})
session_id = entities.get("dataset_review_session_id")
session_version = entities.get("session_version")
if not session_id or session_version is None:
raise HTTPException(
status_code=422,
detail="Missing dataset_review_session_id/session_version",
)
operation = str(intent.get("operation") or "")
repository = DatasetReviewSessionRepository(db)
if operation == "dataset_review_answer_context":
summary = str(entities.get("summary") or "")
logger.reflect(
"Returned assistant-safe dataset review context summary",
extra={"session_id": session_id, "operation": operation},
)
return summary, None, []
session = repository.load_session_detail(session_id, current_user.id)
if session is None or session.user_id != current_user.id:
logger.explore(
"Assistant dataset-review intent rejected because session was not found",
extra={"session_id": session_id, "user_id": current_user.id},
)
raise HTTPException(
status_code=404, detail="Dataset review session not found"
)
try:
repository.require_session_version(session, int(session_version))
except DatasetReviewSessionVersionConflictError as exc:
logger.explore(
"Assistant dataset-review intent rejected due to stale session version",
extra={
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
"operation": operation,
},
)
raise _dataset_review_conflict_http_exception(exc) from exc
logger.reason(
"Dispatching confirmed assistant dataset-review intent",
extra={
"session_id": session_id,
"session_version": session_version,
"operation": operation,
},
)
if operation == "dataset_review_approve_mappings":
mapping_ids = list(dict.fromkeys(entities.get("mapping_ids") or []))
if not mapping_ids:
raise HTTPException(
status_code=409, detail="No pending mappings to approve"
)
updated_count = 0
for mapping in session.execution_mappings:
if mapping.mapping_id not in mapping_ids:
continue
mapping.approval_state = ApprovalState.APPROVED
mapping.approved_by_user_id = current_user.id
mapping.approved_at = datetime.utcnow()
updated_count += 1
if updated_count == 0:
raise HTTPException(
status_code=409, detail="No matching mappings available to approve"
)
session.last_activity_at = datetime.utcnow()
if session.readiness_state == ReadinessState.MAPPING_REVIEW_NEEDED:
session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
repository.event_logger.log_for_session(
session,
actor_user_id=current_user.id,
event_type="assistant_mapping_approval",
event_summary="Assistant-approved warning-sensitive mappings persisted",
event_details={
"mapping_ids": mapping_ids,
"count": updated_count,
"version": int(getattr(session, "version", 0) or 0),
},
)
logger.reflect(
"Assistant mapping approval persisted within optimistic-lock boundary",
extra={
"session_id": session_id,
"updated_count": updated_count,
"version": int(getattr(session, "version", 0) or 0),
},
)
return (
f"Approved {updated_count} mapping(s) for dataset review session {session_id}.",
None,
[
AssistantAction(
type="focus_target",
label="Open mapping review",
target="mapping",
)
],
)
if operation == "dataset_review_set_field_semantics":
field_id = str(entities.get("field_id") or "").strip()
if not field_id:
raise HTTPException(status_code=422, detail="Missing field_id")
field = next(
(item for item in session.semantic_fields if item.field_id == field_id),
None,
)
if field is None:
raise HTTPException(status_code=404, detail="Semantic field not found")
update_request = FieldSemanticUpdateRequest(
candidate_id=entities.get("candidate_id"),
verbose_name=entities.get("verbose_name"),
description=entities.get("description"),
display_format=entities.get("display_format"),
lock_field=bool(entities.get("lock_field", False)),
)
try:
_update_semantic_field_state(
field, update_request, changed_by="assistant"
)
except HTTPException:
raise
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
session.last_activity_at = datetime.utcnow()
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
repository.db.refresh(field)
repository.event_logger.log_for_session(
session,
actor_user_id=current_user.id,
event_type="assistant_field_semantics_updated",
event_summary="Assistant semantic field update persisted",
event_details={
"field_id": field.field_id,
"candidate_id": entities.get("candidate_id"),
"lock_field": bool(entities.get("lock_field", False)),
"version": int(getattr(session, "version", 0) or 0),
},
)
logger.reflect(
"Assistant semantic field update committed safely",
extra={
"session_id": session_id,
"field_id": field_id,
"version": int(getattr(session, "version", 0) or 0),
},
)
return (
f"Updated semantic field {field.field_name} for dataset review session {session_id}.",
None,
[
AssistantAction(
type="focus_target",
label="Open semantic review",
target=f"field:{field.field_id}",
)
],
)
if operation == "dataset_review_generate_sql_preview":
orchestrator = DatasetReviewOrchestrator(
repository=repository,
config_manager=config_manager,
)
result = orchestrator.prepare_launch_preview(
PreparePreviewCommand(
user=current_user,
session_id=session_id,
expected_version=int(session_version),
)
)
preview_status = getattr(
result.preview.preview_status, "value", result.preview.preview_status
)
logger.reflect(
"Assistant-triggered Superset preview generation completed",
extra={
"session_id": session_id,
"preview_status": preview_status,
},
)
return (
f"SQL preview {preview_status} for dataset review session {session_id}.",
None,
[
AssistantAction(
type="focus_target",
label="Open SQL preview",
target="sql-preview",
)
],
)
raise HTTPException(
status_code=400, detail="Unsupported dataset review operation"
)
# [/DEF:_dispatch_dataset_review_intent:Function]
# [DEF:_confirmation_summary:Function]
@@ -1633,6 +2310,14 @@ async def _dispatch_intent(
operation = intent.get("operation")
entities = intent.get("entities", {})
if operation in _DATASET_REVIEW_OPS or operation == "dataset_review_answer_context":
return await _dispatch_dataset_review_intent(
intent,
current_user,
config_manager,
db,
)
if operation == "show_capabilities":
tools_catalog = _build_tool_catalog(current_user, config_manager, db)
labels = {
@@ -2041,6 +2726,11 @@ async def send_message(
):
with belief_scope("assistant.send_message"):
user_id = current_user.id
dataset_review_context = _load_dataset_review_context(
request.dataset_review_session_id,
current_user,
db,
)
conversation_id = _resolve_or_create_conversation(
user_id, request.conversation_id, db
)
@@ -2058,6 +2748,12 @@ async def send_message(
logger.warning(f"[assistant.planner][fallback] Planner error: {exc}")
if not intent:
intent = _parse_command(request.message, config_manager)
if dataset_review_context:
dataset_review_intent = _plan_dataset_review_intent(
request.message, dataset_review_context
)
if dataset_review_intent is not None:
intent = dataset_review_intent
confidence = float(intent.get("confidence", 0.0))
if intent.get("domain") == "unknown" or confidence < 0.6:
@@ -2078,6 +2774,7 @@ async def send_message(
"decision": "needs_clarification",
"message": request.message,
"intent": intent,
"dataset_review_session_id": request.dataset_review_session_id,
}
_audit(user_id, audit_payload)
_persist_audit(db, user_id, audit_payload, conversation_id)
@@ -2127,6 +2824,7 @@ async def send_message(
confirmation_id=confirmation_id,
metadata={
"intent": intent,
"dataset_review_context": dataset_review_context,
"actions": [
{
"type": "confirm",
@@ -2146,6 +2844,7 @@ async def send_message(
"message": request.message,
"intent": intent,
"confirmation_id": confirmation_id,
"dataset_review_session_id": request.dataset_review_session_id,
}
_audit(user_id, audit_payload)
_persist_audit(db, user_id, audit_payload, conversation_id)
@@ -2192,6 +2891,7 @@ async def send_message(
task_id=task_id,
metadata={
"intent": intent,
"dataset_review_context": dataset_review_context,
"actions": [a.model_dump() for a in actions],
},
)
@@ -2200,6 +2900,7 @@ async def send_message(
"message": request.message,
"intent": intent,
"task_id": task_id,
"dataset_review_session_id": request.dataset_review_session_id,
}
_audit(user_id, audit_payload)
_persist_audit(db, user_id, audit_payload, conversation_id)
@@ -2246,6 +2947,7 @@ async def send_message(
"message": request.message,
"intent": intent,
"error": text,
"dataset_review_session_id": request.dataset_review_session_id,
}
_audit(user_id, audit_payload)
_persist_audit(db, user_id, audit_payload, conversation_id)

View File

@@ -17,9 +17,9 @@ from __future__ import annotations
# [DEF:DatasetReviewApi.imports:Block]
import json
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, cast
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Response, status
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
@@ -76,6 +76,7 @@ from src.services.dataset_review.orchestrator import (
)
from src.services.dataset_review.repositories.session_repository import (
DatasetReviewSessionRepository,
DatasetReviewSessionVersionConflictError,
)
# [/DEF:DatasetReviewApi.imports:Block]
@@ -193,7 +194,7 @@ class ClarificationSessionSummaryResponse(BaseModel):
# @COMPLEXITY: 2
# @PURPOSE: Response DTO for current clarification state and active question payload.
class ClarificationStateResponse(BaseModel):
clarification_session: ClarificationSessionSummaryResponse
clarification_session: Optional[ClarificationSessionSummaryResponse] = None
current_question: Optional[ClarificationQuestionDto] = None
@@ -271,6 +272,7 @@ class BatchApproveMappingRequest(BaseModel):
# @PURPOSE: Contract-compliant async preview trigger response exposing only enqueue state.
class PreviewEnqueueResultResponse(BaseModel):
session_id: str
session_version: Optional[int] = None
preview_status: str
task_id: Optional[str] = None
@@ -413,7 +415,9 @@ def _get_clarification_engine(
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSession]
# @RELATION: [DEPENDS_ON] ->[SessionSummary]
def _serialize_session_summary(session: DatasetReviewSession) -> SessionSummary:
return SessionSummary.model_validate(session, from_attributes=True)
summary = SessionSummary.model_validate(session, from_attributes=True)
summary.session_version = summary.version
return summary
# [/DEF:_serialize_session_summary:Function]
@@ -424,18 +428,104 @@ def _serialize_session_summary(session: DatasetReviewSession) -> SessionSummary:
# @PURPOSE: Map SQLAlchemy session aggregate root into stable API detail DTO.
# @RELATION: [DEPENDS_ON] ->[SessionDetail]
def _serialize_session_detail(session: DatasetReviewSession) -> SessionDetail:
return SessionDetail.model_validate(session, from_attributes=True)
detail = SessionDetail.model_validate(session, from_attributes=True)
detail.session_version = detail.version
return detail
# [/DEF:_serialize_session_detail:Function]
# [DEF:_require_session_version_header:Function]
# @COMPLEXITY: 2
# @PURPOSE: Read the optimistic-lock session version header required by dataset review mutation endpoints.
def _require_session_version_header(
session_version: int = Header(..., alias="X-Session-Version", ge=0),
) -> int:
return session_version
# [/DEF:_require_session_version_header:Function]
# [DEF:_enforce_session_version:Function]
# @COMPLEXITY: 4
# @PURPOSE: Convert repository optimistic-lock conflicts into deterministic HTTP 409 responses.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
def _enforce_session_version(
repository: DatasetReviewSessionRepository,
session: DatasetReviewSession,
expected_version: int,
) -> DatasetReviewSession:
try:
repository.require_session_version(session, expected_version)
return session
except DatasetReviewSessionVersionConflictError as exc:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"error_code": "session_version_conflict",
"message": str(exc),
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
},
) from exc
# [/DEF:_enforce_session_version:Function]
# [DEF:_prepare_owned_session_mutation:Function]
# @COMPLEXITY: 4
# @PURPOSE: Resolve owner-scoped mutation session and enforce optimistic-lock version before changing dataset review state.
def _prepare_owned_session_mutation(
repository: DatasetReviewSessionRepository,
session_id: str,
current_user: User,
expected_version: int,
) -> DatasetReviewSession:
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
return _enforce_session_version(repository, session, expected_version)
# [/DEF:_prepare_owned_session_mutation:Function]
# [DEF:_commit_owned_session_mutation:Function]
# @COMPLEXITY: 4
# @PURPOSE: Centralize dataset-review session version bumping and commit semantics for owner-scoped mutation endpoints.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
def _commit_owned_session_mutation(
repository: DatasetReviewSessionRepository,
session: DatasetReviewSession,
*,
refresh_targets: Optional[List[Any]] = None,
) -> DatasetReviewSession:
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
for target in refresh_targets or []:
repository.db.refresh(target)
return session
# [/DEF:_commit_owned_session_mutation:Function]
# [DEF:_serialize_semantic_field:Function]
# @COMPLEXITY: 2
# @PURPOSE: Map one semantic field aggregate into stable field-level DTO output.
# @RELATION: [DEPENDS_ON] ->[SemanticFieldEntryDto]
def _serialize_semantic_field(field: SemanticFieldEntry) -> SemanticFieldEntryDto:
return SemanticFieldEntryDto.model_validate(field, from_attributes=True)
payload = SemanticFieldEntryDto.model_validate(field, from_attributes=True)
session_ref = getattr(field, "session", None)
version_value = getattr(session_ref, "version", None)
payload.session_version = (
int(version_value or 0) if version_value is not None else None
)
return payload
# [/DEF:_serialize_semantic_field:Function]
@@ -497,6 +587,20 @@ def _serialize_clarification_state(
# [/DEF:_serialize_clarification_state:Function]
# [DEF:_serialize_empty_clarification_state:Function]
# @COMPLEXITY: 2
# @PURPOSE: Return a stable empty clarification payload for sessions that have not started clarification yet.
# @RELATION: [DEPENDS_ON] ->[ClarificationStateResponse]
def _serialize_empty_clarification_state() -> ClarificationStateResponse:
return ClarificationStateResponse(
clarification_session=None,
current_question=None,
)
# [/DEF:_serialize_empty_clarification_state:Function]
# [DEF:_get_owned_session_or_404:Function]
# @COMPLEXITY: 4
# @PURPOSE: Resolve one session for current user or collaborator scope, returning 404 when inaccessible.
@@ -766,18 +870,51 @@ def _update_semantic_field_state(
# @PURPOSE: Map one persisted execution mapping into stable API DTO output.
# @RELATION: [DEPENDS_ON] ->[ExecutionMappingDto]
def _serialize_execution_mapping(mapping: ExecutionMapping) -> ExecutionMappingDto:
return ExecutionMappingDto.model_validate(mapping, from_attributes=True)
payload = ExecutionMappingDto.model_validate(mapping, from_attributes=True)
session_ref = getattr(mapping, "session", None)
version_value = getattr(session_ref, "version", None)
payload.session_version = (
int(version_value or 0) if version_value is not None else None
)
return payload
# [/DEF:_serialize_execution_mapping:Function]
# [DEF:_serialize_preview:Function]
# @COMPLEXITY: 2
# @PURPOSE: Map one persisted preview snapshot into stable API DTO output and surface the refreshed session version for follow-up optimistic-lock mutations.
# @RELATION: [DEPENDS_ON] ->[CompiledPreviewDto]
def _serialize_preview(
preview: CompiledPreview, *, session_version_fallback: Optional[int] = None
) -> CompiledPreviewDto:
payload = CompiledPreviewDto.model_validate(preview, from_attributes=True)
session_ref = getattr(preview, "session", None)
version_value = getattr(session_ref, "version", None)
if version_value is None:
version_value = session_version_fallback
payload.session_version = (
int(version_value or 0) if version_value is not None else None
)
return payload
# [/DEF:_serialize_preview:Function]
# [DEF:_serialize_run_context:Function]
# @COMPLEXITY: 2
# @PURPOSE: Map one persisted launch run context into stable API DTO output for SQL Lab handoff confirmation.
# @RELATION: [DEPENDS_ON] ->[DatasetRunContextDto]
def _serialize_run_context(run_context) -> DatasetRunContextDto:
return DatasetRunContextDto.model_validate(run_context, from_attributes=True)
payload = DatasetRunContextDto.model_validate(run_context, from_attributes=True)
session_ref = getattr(run_context, "session", None)
version_value = getattr(session_ref, "version", None)
payload.session_version = (
int(version_value or 0) if version_value is not None else None
)
return payload
# [/DEF:_serialize_run_context:Function]
@@ -1051,23 +1188,27 @@ async def get_session_detail(
async def update_session(
session_id: str,
request: UpdateSessionRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.update_session"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
session_record = cast(Any, session)
session.status = request.status
session_record.status = request.status
if request.status == SessionStatus.PAUSED:
session.recommended_action = RecommendedAction.RESUME_SESSION
session_record.recommended_action = RecommendedAction.RESUME_SESSION
elif request.status in {
SessionStatus.ARCHIVED,
SessionStatus.CANCELLED,
SessionStatus.COMPLETED,
}:
session.active_task_id = None
session_record.active_task_id = None
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
_record_session_event(
@@ -1076,7 +1217,10 @@ async def update_session(
current_user,
event_type="session_status_updated",
event_summary="Dataset review session lifecycle updated",
event_details={"status": session.status.value},
event_details={
"status": session_record.status.value,
"version": session_record.version,
},
)
return _serialize_session_summary(session)
@@ -1103,12 +1247,14 @@ async def update_session(
async def delete_session(
session_id: str,
hard_delete: bool = Query(False),
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.delete_session"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
if hard_delete:
_record_session_event(
@@ -1123,17 +1269,20 @@ async def delete_session(
repository.db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
session.status = SessionStatus.ARCHIVED
session.active_task_id = None
repository.db.commit()
repository.db.refresh(session)
session_record = cast(Any, session)
session_record.status = SessionStatus.ARCHIVED
session_record.active_task_id = None
_commit_owned_session_mutation(repository, session)
_record_session_event(
repository,
session,
current_user,
event_type="session_archived",
event_summary="Dataset review session archived",
event_details={"hard_delete": False},
event_details={
"hard_delete": False,
"version": session_record.version,
},
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@@ -1231,10 +1380,10 @@ async def export_validation(
# [DEF:get_clarification_state:Function]
# @COMPLEXITY: 4
# @PURPOSE: Return the current clarification session summary and one active question payload.
# @PURPOSE: Return the current clarification session summary and one active question payload, or an empty state when clarification has not started.
# @RELATION: [CALLS] ->[build_question_payload:Function]
# @PRE: Session is accessible to current user and clarification feature is enabled.
# @POST: Returns at most one active clarification question with why_it_matters, current_guess, and ordered options.
# @POST: Returns at most one active clarification question with why_it_matters, current_guess, and ordered options; sessions without a clarification record return a non-blocking empty state.
# @SIDE_EFFECT: May normalize clarification pointer and readiness state in persistence.
# @DATA_CONTRACT: Input[session_id:str] -> Output[ClarificationStateResponse]
@router.get(
@@ -1254,6 +1403,9 @@ async def get_clarification_state(
):
with belief_scope("dataset_review.get_clarification_state"):
session = _get_owned_session_or_404(repository, session_id, current_user)
if not session.clarification_sessions:
return _serialize_empty_clarification_state()
clarification_session = _get_latest_clarification_session_or_404(session)
current_question = clarification_engine.build_question_payload(session)
return _serialize_clarification_state(
@@ -1288,13 +1440,15 @@ async def get_clarification_state(
)
async def resume_clarification(
session_id: str,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
clarification_engine: ClarificationEngine = Depends(_get_clarification_engine),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.resume_clarification"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
clarification_session = _get_latest_clarification_session_or_404(session)
current_question = clarification_engine.build_question_payload(session)
return _serialize_clarification_state(
@@ -1330,13 +1484,15 @@ async def resume_clarification(
async def record_clarification_answer(
session_id: str,
request: ClarificationAnswerRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
clarification_engine: ClarificationEngine = Depends(_get_clarification_engine),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.record_clarification_answer"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
try:
result = clarification_engine.record_answer(
ClarificationAnswerCommand(
@@ -1385,16 +1541,18 @@ async def update_field_semantic(
session_id: str,
field_id: str,
request: FieldSemanticUpdateRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.update_field_semantic"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
field = _get_owned_field_or_404(session, field_id)
_update_semantic_field_state(field, request, changed_by="user")
repository.db.commit()
repository.db.refresh(field)
session_record = cast(Any, session)
_commit_owned_session_mutation(repository, session, refresh_targets=[field])
_record_session_event(
repository,
session,
@@ -1407,6 +1565,7 @@ async def update_field_semantic(
"is_locked": field.is_locked,
"source_id": field.source_id,
"source_version": field.source_version,
"version": session_record.version,
},
)
return _serialize_semantic_field(field)
@@ -1434,24 +1593,29 @@ async def update_field_semantic(
async def lock_field_semantic(
session_id: str,
field_id: str,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.lock_field_semantic"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
field = _get_owned_field_or_404(session, field_id)
field.is_locked = True
field.last_changed_by = "user"
repository.db.commit()
repository.db.refresh(field)
session_record = cast(Any, session)
_commit_owned_session_mutation(repository, session, refresh_targets=[field])
_record_session_event(
repository,
session,
current_user,
event_type="semantic_field_locked",
event_summary="Semantic field lock persisted",
event_details={"field_id": field.field_id},
event_details={
"field_id": field.field_id,
"version": session_record.version,
},
)
return _serialize_semantic_field(field)
@@ -1478,27 +1642,32 @@ async def lock_field_semantic(
async def unlock_field_semantic(
session_id: str,
field_id: str,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.unlock_field_semantic"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
field = _get_owned_field_or_404(session, field_id)
field.is_locked = False
field.last_changed_by = "user"
if field.provenance == FieldProvenance.MANUAL_OVERRIDE:
field.provenance = FieldProvenance.UNRESOLVED
field.needs_review = True
repository.db.commit()
repository.db.refresh(field)
session_record = cast(Any, session)
_commit_owned_session_mutation(repository, session, refresh_targets=[field])
_record_session_event(
repository,
session,
current_user,
event_type="semantic_field_unlocked",
event_summary="Semantic field unlock persisted",
event_details={"field_id": field.field_id},
event_details={
"field_id": field.field_id,
"version": session_record.version,
},
)
return _serialize_semantic_field(field)
@@ -1525,12 +1694,14 @@ async def unlock_field_semantic(
async def approve_batch_semantic_fields(
session_id: str,
request: BatchApproveSemanticRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.approve_batch_semantic_fields"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
updated_fields: List[SemanticFieldEntry] = []
for item in request.items:
@@ -1544,9 +1715,10 @@ async def approve_batch_semantic_fields(
)
updated_fields.append(updated_field)
repository.db.commit()
for field in updated_fields:
repository.db.refresh(field)
session_record = cast(Any, session)
_commit_owned_session_mutation(
repository, session, refresh_targets=list(updated_fields)
)
_record_session_event(
repository,
session,
@@ -1556,6 +1728,7 @@ async def approve_batch_semantic_fields(
event_details={
"field_ids": [field.field_id for field in updated_fields],
"count": len(updated_fields),
"version": session_record.version,
},
)
return [_serialize_semantic_field(field) for field in updated_fields]
@@ -1620,12 +1793,14 @@ async def update_execution_mapping(
session_id: str,
mapping_id: str,
request: UpdateExecutionMappingRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.update_execution_mapping"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
mapping = _get_owned_mapping_or_404(session, mapping_id)
if request.effective_value is None:
@@ -1657,8 +1832,8 @@ async def update_execution_mapping(
if preview.preview_status == PreviewStatus.READY:
preview.preview_status = PreviewStatus.STALE
repository.db.commit()
repository.db.refresh(mapping)
session_record = cast(Any, session)
_commit_owned_session_mutation(repository, session, refresh_targets=[mapping])
_record_session_event(
repository,
session,
@@ -1669,6 +1844,7 @@ async def update_execution_mapping(
"mapping_id": mapping.mapping_id,
"approval_state": mapping.approval_state.value,
"preview_state": "stale",
"version": session_record.version,
},
)
return _serialize_execution_mapping(mapping)
@@ -1698,12 +1874,14 @@ async def approve_execution_mapping(
session_id: str,
mapping_id: str,
request: ApproveMappingRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.approve_execution_mapping"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
mapping = _get_owned_mapping_or_404(session, mapping_id)
mapping.approval_state = ApprovalState.APPROVED
mapping.approved_by_user_id = current_user.id
@@ -1713,8 +1891,8 @@ async def approve_execution_mapping(
session.last_activity_at = datetime.utcnow()
if session.readiness_state == ReadinessState.MAPPING_REVIEW_NEEDED:
session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW
repository.db.commit()
repository.db.refresh(mapping)
session_record = cast(Any, session)
_commit_owned_session_mutation(repository, session, refresh_targets=[mapping])
_record_session_event(
repository,
session,
@@ -1724,6 +1902,7 @@ async def approve_execution_mapping(
event_details={
"mapping_id": mapping.mapping_id,
"approval_state": mapping.approval_state.value,
"version": session_record.version,
},
)
return _serialize_execution_mapping(mapping)
@@ -1752,12 +1931,14 @@ async def approve_execution_mapping(
async def approve_batch_execution_mappings(
session_id: str,
request: BatchApproveMappingRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.approve_batch_execution_mappings"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
updated_mappings: List[ExecutionMapping] = []
for mapping_id in list(dict.fromkeys(request.mapping_ids)):
@@ -1773,9 +1954,10 @@ async def approve_batch_execution_mappings(
if session.readiness_state == ReadinessState.MAPPING_REVIEW_NEEDED:
session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW
repository.db.commit()
for mapping in updated_mappings:
repository.db.refresh(mapping)
session_record = cast(Any, session)
_commit_owned_session_mutation(
repository, session, refresh_targets=list(updated_mappings)
)
_record_session_event(
repository,
session,
@@ -1785,6 +1967,7 @@ async def approve_batch_execution_mappings(
event_details={
"mapping_ids": [mapping.mapping_id for mapping in updated_mappings],
"count": len(updated_mappings),
"version": session_record.version,
},
)
return [_serialize_execution_mapping(mapping) for mapping in updated_mappings]
@@ -1814,14 +1997,20 @@ async def trigger_preview_generation(
session_id: str,
response: Response,
orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
session_version: int = Depends(_require_session_version_header),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.trigger_preview_generation"):
_prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
try:
result = orchestrator.prepare_launch_preview(
PreparePreviewCommand(
user=current_user,
session_id=session_id,
expected_version=session_version,
)
)
except ValueError as exc:
@@ -1839,12 +2028,16 @@ async def trigger_preview_generation(
response.status_code = status.HTTP_202_ACCEPTED
return PreviewEnqueueResultResponse(
session_id=result.session.session_id,
session_version=int(getattr(result.session, "version", 0) or 0),
preview_status=result.preview.preview_status.value,
task_id=None,
)
response.status_code = status.HTTP_200_OK
return CompiledPreviewDto.model_validate(result.preview, from_attributes=True)
return _serialize_preview(
result.preview,
session_version_fallback=int(getattr(result.session, "version", 0) or 0),
)
# [/DEF:trigger_preview_generation:Function]
@@ -1871,15 +2064,21 @@ async def trigger_preview_generation(
async def launch_dataset(
session_id: str,
orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
session_version: int = Depends(_require_session_version_header),
config_manager=Depends(get_config_manager),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.launch_dataset"):
_prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
try:
result = orchestrator.launch_dataset(
LaunchDatasetCommand(
user=current_user,
session_id=session_id,
expected_version=session_version,
)
)
except ValueError as exc:
@@ -1929,22 +2128,29 @@ async def record_field_feedback(
session_id: str,
field_id: str,
request: FeedbackRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.record_field_feedback"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
field = _get_owned_field_or_404(session, field_id)
field.user_feedback = request.feedback
repository.db.commit()
session_record = cast(Any, session)
_commit_owned_session_mutation(repository, session)
_record_session_event(
repository,
session,
current_user,
event_type="semantic_field_feedback_recorded",
event_summary="Semantic field feedback persisted",
event_details={"field_id": field.field_id, "feedback": request.feedback},
event_details={
"field_id": field.field_id,
"feedback": request.feedback,
"version": session_record.version,
},
)
return FeedbackResponse(target_id=field.field_id, feedback=request.feedback)
@@ -1973,12 +2179,14 @@ async def record_clarification_feedback(
session_id: str,
question_id: str,
request: FeedbackRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.record_clarification_feedback"):
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
clarification_session = _get_latest_clarification_session_or_404(session)
question = next(
(
@@ -1999,7 +2207,8 @@ async def record_clarification_feedback(
detail="Clarification answer not found",
)
question.answer.user_feedback = request.feedback
repository.db.commit()
session_record = cast(Any, session)
_commit_owned_session_mutation(repository, session)
_record_session_event(
repository,
session,
@@ -2009,6 +2218,7 @@ async def record_clarification_feedback(
event_details={
"question_id": question.question_id,
"feedback": request.feedback,
"version": session_record.version,
},
)
return FeedbackResponse(

View File

@@ -81,16 +81,49 @@ async def get_llm_status(
providers = service.get_all_providers()
active_provider = next((p for p in providers if p.is_active), None)
if not providers:
return {
"configured": False,
"reason": "no_providers_configured",
"provider_count": 0,
"active_provider_count": 0,
}
if not active_provider:
return {"configured": False, "reason": "no_active_provider"}
return {
"configured": False,
"reason": "no_active_provider",
"provider_count": len(providers),
"active_provider_count": 0,
"providers": [
{
"id": provider.id,
"name": provider.name,
"provider_type": provider.provider_type,
"is_active": bool(provider.is_active),
}
for provider in providers
],
}
api_key = service.get_decrypted_api_key(active_provider.id)
if not _is_valid_runtime_api_key(api_key):
return {"configured": False, "reason": "invalid_api_key"}
return {
"configured": False,
"reason": "invalid_api_key",
"provider_count": len(providers),
"active_provider_count": len([provider for provider in providers if provider.is_active]),
"provider_id": active_provider.id,
"provider_name": active_provider.name,
"provider_type": active_provider.provider_type,
"default_model": active_provider.default_model,
}
return {
"configured": True,
"reason": "ok",
"provider_count": len(providers),
"active_provider_count": len([provider for provider in providers if provider.is_active]),
"provider_id": active_provider.id,
"provider_name": active_provider.name,
"provider_type": active_provider.provider_type,