skills + agents

This commit is contained in:
2026-04-19 20:05:45 +03:00
parent c6147385e5
commit afb95ace3a
40 changed files with 1155 additions and 1868 deletions

View File

@@ -1477,6 +1477,44 @@ def test_mutation_endpoints_surface_session_version_conflict_payload(
# [/DEF:test_mutation_endpoints_surface_session_version_conflict_payload:Function]
# [DEF:test_update_session_surfaces_commit_time_session_version_conflict_payload:Function]
# @RELATION: BINDS_TO -> DatasetReviewApiTests
# @PURPOSE: Session lifecycle mutation should return deterministic 409 conflict semantics when commit-time optimistic locking rejects a stale write.
def test_update_session_surfaces_commit_time_session_version_conflict_payload(
dataset_review_api_dependencies,
):
session = _make_session()
repository = MagicMock()
repository.load_session_detail.return_value = session
repository.require_session_version.return_value = session
repository.commit_session_mutation.side_effect = (
DatasetReviewSessionVersionConflictError(
session_id="sess-1",
expected_version=0,
actual_version=1,
)
)
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
app.dependency_overrides[_get_repository] = lambda: repository
response = client.patch(
"/api/dataset-orchestration/sessions/sess-1",
json={"status": "paused"},
headers={"X-Session-Version": "0"},
)
assert response.status_code == 409
payload = response.json()["detail"]
assert payload["error_code"] == "session_version_conflict"
assert payload["expected_version"] == 0
assert payload["actual_version"] == 1
# [/DEF:test_update_session_surfaces_commit_time_session_version_conflict_payload:Function]
# [DEF:test_execution_snapshot_includes_recovered_imported_filters_without_template_mapping:Function]
# @RELATION: BINDS_TO -> DatasetReviewApiTests
# @PURPOSE: Recovered imported filters with values should flow into preview filter context even when no template variable mapping exists.

View File

@@ -69,8 +69,8 @@ class RevokeRequest(dict):
# @PRE: Payload contains required fields (id, version, source_snapshot_ref, created_by).
# @POST: Candidate is saved in repository.
# @RETURN: CandidateDTO
# @RELATION: CALLS -> [CleanReleaseRepository.save_candidate]
# @RELATION: DEPENDS_ON -> [CandidateDTO]
# @RELATION: DEPENDS_ON -> [CleanReleaseRepository]
# @RELATION: DEPENDS_ON -> [clean_release_dto]
@router.post(
"/candidates", response_model=CandidateDTO, status_code=status.HTTP_201_CREATED
)
@@ -105,7 +105,7 @@ async def register_candidate(
# @PURPOSE: Associate artifacts with a release candidate.
# @PRE: Candidate exists.
# @POST: Artifacts are processed (placeholder).
# @RELATION: CALLS -> [CleanReleaseRepository.get_candidate]
# @RELATION: DEPENDS_ON -> [CleanReleaseRepository]
@router.post("/candidates/{candidate_id}/artifacts")
async def import_artifacts(
candidate_id: str,
@@ -140,8 +140,7 @@ async def import_artifacts(
# @PRE: Candidate exists.
# @POST: Manifest is created and saved.
# @RETURN: ManifestDTO
# @RELATION: CALLS -> [CleanReleaseRepository.save_manifest]
# @RELATION: CALLS -> [CleanReleaseRepository.get_candidate]
# @RELATION: DEPENDS_ON -> [CleanReleaseRepository]
@router.post(
"/candidates/{candidate_id}/manifests",
response_model=ManifestDTO,

View File

@@ -480,16 +480,7 @@ def _enforce_session_version(
"actual_version": exc.actual_version,
},
)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"error_code": "session_version_conflict",
"message": str(exc),
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
},
) from exc
raise _build_session_version_conflict_http_exception(exc) from exc
logger.reflect(
"Dataset review optimistic-lock version accepted",
extra={
@@ -498,9 +489,33 @@ def _enforce_session_version(
},
)
return session
# [/DEF:_enforce_session_version:Function]
# [DEF:_build_session_version_conflict_http_exception:Function]
# @COMPLEXITY: 2
# @PURPOSE: Normalize optimistic-lock conflict errors into deterministic dataset-review HTTP 409 responses.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionVersionConflictError]
def _build_session_version_conflict_http_exception(
exc: DatasetReviewSessionVersionConflictError,
) -> HTTPException:
return HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"error_code": "session_version_conflict",
"message": str(exc),
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
},
)
# [/DEF:_build_session_version_conflict_http_exception:Function]
# [DEF:_prepare_owned_session_mutation:Function]
# @COMPLEXITY: 4
# @PURPOSE: Resolve owner-scoped mutation session and enforce optimistic-lock version before changing dataset review state.
@@ -524,7 +539,9 @@ def _prepare_owned_session_mutation(
)
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
guarded_session = _enforce_session_version(repository, session, expected_version)
guarded_session = _enforce_session_version(
repository, session, expected_version
)
logger.reflect(
"Dataset review mutation session passed ownership and version guards",
extra={
@@ -534,6 +551,8 @@ def _prepare_owned_session_mutation(
},
)
return guarded_session
# [/DEF:_prepare_owned_session_mutation:Function]
@@ -556,11 +575,21 @@ def _commit_owned_session_mutation(
"Committing dataset review mutation",
extra={"session_id": session.session_id},
)
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
for target in refresh_targets or []:
repository.db.refresh(target)
try:
repository.commit_session_mutation(
session,
refresh_targets=refresh_targets,
)
except DatasetReviewSessionVersionConflictError as exc:
logger.explore(
"Dataset review mutation commit detected stale version",
extra={
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
},
)
raise _build_session_version_conflict_http_exception(exc) from exc
logger.reflect(
"Dataset review mutation committed and refreshed",
extra={
@@ -570,6 +599,8 @@ def _commit_owned_session_mutation(
},
)
return session
# [/DEF:_commit_owned_session_mutation:Function]
@@ -678,7 +709,7 @@ def _get_owned_session_or_404(
"Resolving dataset review session in current ownership scope",
extra={"session_id": session_id, "user_id": current_user.id},
)
session = repository.load_detail(session_id, current_user.id)
session = repository.load_session_detail(session_id, current_user.id)
if session is None:
logger.explore(
"Dataset review session not found in current ownership scope",
@@ -1224,9 +1255,21 @@ async def list_sessions(
# @POST: returns persisted session summary scoped to the authenticated user.
# @SIDE_EFFECT: persists session/profile/findings and may enqueue recovery task.
# @DATA_CONTRACT: Input[StartSessionRequest] -> Output[SessionSummary]
@router.post('/sessions', response_model=SessionSummary, status_code=status.HTTP_201_CREATED, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'MANAGE'))])
async def start_session(request: StartSessionRequest, orchestrator: DatasetReviewOrchestrator=Depends(_get_orchestrator), current_user: User=Depends(get_current_user)):
with belief_scope('start_session'):
@router.post(
"/sessions",
response_model=SessionSummary,
status_code=status.HTTP_201_CREATED,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def start_session(
request: StartSessionRequest,
orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator),
current_user: User = Depends(get_current_user),
):
with belief_scope("start_session"):
logger.reason(
"Starting dataset review session",
extra={
@@ -1236,11 +1279,25 @@ async def start_session(request: StartSessionRequest, orchestrator: DatasetRevie
},
)
try:
result = orchestrator.start_session(StartSessionCommand(user=current_user, environment_id=request.environment_id, source_kind=request.source_kind, source_input=request.source_input))
result = orchestrator.start_session(
StartSessionCommand(
user=current_user,
environment_id=request.environment_id,
source_kind=request.source_kind,
source_input=request.source_input,
)
)
except ValueError as exc:
logger.explore('Dataset review session start rejected', extra={'user_id': current_user.id, 'error': str(exc)})
logger.explore(
"Dataset review session start rejected",
extra={"user_id": current_user.id, "error": str(exc)},
)
detail = str(exc)
status_code = status.HTTP_404_NOT_FOUND if detail == 'Environment not found' else status.HTTP_400_BAD_REQUEST
status_code = (
status.HTTP_404_NOT_FOUND
if detail == "Environment not found"
else status.HTTP_400_BAD_REQUEST
)
raise HTTPException(status_code=status_code, detail=detail) from exc
logger.reflect(
"Dataset review session started and serialized",
@@ -1250,6 +1307,8 @@ async def start_session(request: StartSessionRequest, orchestrator: DatasetRevie
},
)
return _serialize_session_summary(result.session)
# [/DEF:start_session:Function]
@@ -1295,9 +1354,22 @@ async def get_session_detail(
# @POST: returns updated summary without changing ownership or unrelated aggregates.
# @SIDE_EFFECT: mutates session lifecycle fields in persistence.
# @DATA_CONTRACT: Input[UpdateSessionRequest] -> Output[SessionSummary]
@router.patch('/sessions/{session_id}', response_model=SessionSummary, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'MANAGE'))])
async def update_session(session_id: str, request: UpdateSessionRequest, session_version: int=Depends(_require_session_version_header), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('update_session'):
@router.patch(
"/sessions/{session_id}",
response_model=SessionSummary,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def update_session(
session_id: str,
request: UpdateSessionRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("update_session"):
logger.reason(
"Updating dataset review session lifecycle state",
extra={
@@ -1306,17 +1378,31 @@ async def update_session(session_id: str, request: UpdateSessionRequest, session
"requested_status": request.status.value,
},
)
session = _prepare_owned_session_mutation(repository, session_id, current_user, session_version)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
session_record = cast(Any, session)
session_record.status = request.status
if request.status == SessionStatus.PAUSED:
session_record.recommended_action = RecommendedAction.RESUME_SESSION
elif request.status in {SessionStatus.ARCHIVED, SessionStatus.CANCELLED, SessionStatus.COMPLETED}:
elif request.status in {
SessionStatus.ARCHIVED,
SessionStatus.CANCELLED,
SessionStatus.COMPLETED,
}:
session_record.active_task_id = None
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
_record_session_event(repository, session, current_user, event_type='session_status_updated', event_summary='Dataset review session lifecycle updated', event_details={'status': session_record.status.value, 'version': session_record.version})
_commit_owned_session_mutation(repository, session)
_record_session_event(
repository,
session,
current_user,
event_type="session_status_updated",
event_summary="Dataset review session lifecycle updated",
event_details={
"status": session_record.status.value,
"version": session_record.version,
},
)
logger.reflect(
"Dataset review session lifecycle updated",
extra={
@@ -1327,11 +1413,11 @@ async def update_session(session_id: str, request: UpdateSessionRequest, session
},
)
return _serialize_session_summary(session)
# [/DEF:update_session:Function]
from src.logger import belief_scope, logger
# [DEF:delete_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Archive or hard-delete a session owned by the current user.
@@ -1340,9 +1426,22 @@ from src.logger import belief_scope, logger
# @POST: session is archived or deleted and no foreign-session existence is disclosed.
# @SIDE_EFFECT: mutates or deletes persisted session aggregate.
# @DATA_CONTRACT: Input[session_id:str,hard_delete:bool] -> Output[HTTP 204]
@router.delete('/sessions/{session_id}', status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'MANAGE'))])
async def delete_session(session_id: str, hard_delete: bool=Query(False), session_version: int=Depends(_require_session_version_header), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('delete_session'):
@router.delete(
"/sessions/{session_id}",
status_code=status.HTTP_204_NO_CONTENT,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def delete_session(
session_id: str,
hard_delete: bool = Query(False),
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("delete_session"):
logger.reason(
"Deleting or archiving dataset review session",
extra={
@@ -1351,9 +1450,18 @@ async def delete_session(session_id: str, hard_delete: bool=Query(False), sessio
"hard_delete": hard_delete,
},
)
session = _prepare_owned_session_mutation(repository, session_id, current_user, session_version)
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
if hard_delete:
_record_session_event(repository, session, current_user, event_type='session_deleted', event_summary='Dataset review session hard-deleted', event_details={'hard_delete': True})
_record_session_event(
repository,
session,
current_user,
event_type="session_deleted",
event_summary="Dataset review session hard-deleted",
event_details={"hard_delete": True},
)
repository.db.delete(session)
repository.db.commit()
logger.reflect(
@@ -1365,7 +1473,14 @@ async def delete_session(session_id: str, hard_delete: bool=Query(False), sessio
session_record.status = SessionStatus.ARCHIVED
session_record.active_task_id = None
_commit_owned_session_mutation(repository, session)
_record_session_event(repository, session, current_user, event_type='session_archived', event_summary='Dataset review session archived', event_details={'hard_delete': False, 'version': session_record.version})
_record_session_event(
repository,
session,
current_user,
event_type="session_archived",
event_summary="Dataset review session archived",
event_details={"hard_delete": False, "version": session_record.version},
)
logger.reflect(
"Dataset review session archive committed",
extra={
@@ -1375,6 +1490,8 @@ async def delete_session(session_id: str, hard_delete: bool=Query(False), sessio
},
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
# [/DEF:delete_session:Function]
@@ -1386,11 +1503,26 @@ async def delete_session(session_id: str, hard_delete: bool=Query(False), sessio
# @POST: returns ownership-scoped export payload without fabricating unrelated artifacts.
# @SIDE_EFFECT: none beyond response construction.
# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse]
@router.get('/sessions/{session_id}/exports/documentation', response_model=ExportArtifactResponse, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'READ'))])
async def export_documentation(session_id: str, format: ArtifactFormat=Query(ArtifactFormat.JSON), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('export_documentation'):
@router.get(
"/sessions/{session_id}/exports/documentation",
response_model=ExportArtifactResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def export_documentation(
session_id: str,
format: ArtifactFormat = Query(ArtifactFormat.JSON),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("export_documentation"):
if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Only json and markdown exports are supported')
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only json and markdown exports are supported",
)
logger.reason(
"Building dataset review documentation export",
extra={
@@ -1409,7 +1541,17 @@ async def export_documentation(session_id: str, format: ArtifactFormat=Query(Art
"format": format.value,
},
)
return ExportArtifactResponse(artifact_id=f'documentation-{session.session_id}-{format.value}', session_id=session.session_id, artifact_type='documentation', format=format.value, storage_ref=export_payload['storage_ref'], created_by_user_id=current_user.id, content=export_payload['content'])
return ExportArtifactResponse(
artifact_id=f"documentation-{session.session_id}-{format.value}",
session_id=session.session_id,
artifact_type="documentation",
format=format.value,
storage_ref=export_payload["storage_ref"],
created_by_user_id=current_user.id,
content=export_payload["content"],
)
# [/DEF:export_documentation:Function]
@@ -1421,11 +1563,26 @@ async def export_documentation(session_id: str, format: ArtifactFormat=Query(Art
# @POST: returns explicit validation export payload scoped to current user session access.
# @SIDE_EFFECT: none beyond response construction.
# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse]
@router.get('/sessions/{session_id}/exports/validation', response_model=ExportArtifactResponse, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'READ'))])
async def export_validation(session_id: str, format: ArtifactFormat=Query(ArtifactFormat.JSON), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('export_validation'):
@router.get(
"/sessions/{session_id}/exports/validation",
response_model=ExportArtifactResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def export_validation(
session_id: str,
format: ArtifactFormat = Query(ArtifactFormat.JSON),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("export_validation"):
if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Only json and markdown exports are supported')
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only json and markdown exports are supported",
)
logger.reason(
"Building dataset review validation export",
extra={
@@ -1444,7 +1601,17 @@ async def export_validation(session_id: str, format: ArtifactFormat=Query(Artifa
"format": format.value,
},
)
return ExportArtifactResponse(artifact_id=f'validation-{session.session_id}-{format.value}', session_id=session.session_id, artifact_type='validation_report', format=format.value, storage_ref=export_payload['storage_ref'], created_by_user_id=current_user.id, content=export_payload['content'])
return ExportArtifactResponse(
artifact_id=f"validation-{session.session_id}-{format.value}",
session_id=session.session_id,
artifact_type="validation_report",
format=format.value,
storage_ref=export_payload["storage_ref"],
created_by_user_id=current_user.id,
content=export_payload["content"],
)
# [/DEF:export_validation:Function]
@@ -1456,18 +1623,46 @@ async def export_validation(session_id: str, format: ArtifactFormat=Query(Artifa
# @POST: Returns at most one active clarification question with why_it_matters, current_guess, and ordered options; sessions without a clarification record return a non-blocking empty state.
# @SIDE_EFFECT: May normalize clarification pointer and readiness state in persistence.
# @DATA_CONTRACT: Input[session_id:str] -> Output[ClarificationStateResponse]
@router.get('/sessions/{session_id}/clarification', response_model=ClarificationStateResponse, dependencies=[Depends(_require_auto_review_flag), Depends(_require_clarification_flag), Depends(has_permission('dataset:session', 'READ'))])
async def get_clarification_state(session_id: str, repository: DatasetReviewSessionRepository=Depends(_get_repository), clarification_engine: ClarificationEngine=Depends(_get_clarification_engine), current_user: User=Depends(get_current_user)):
with belief_scope('get_clarification_state'):
logger.reason('Belief protocol reasoning checkpoint for get_clarification_state')
@router.get(
"/sessions/{session_id}/clarification",
response_model=ClarificationStateResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(_require_clarification_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def get_clarification_state(
session_id: str,
repository: DatasetReviewSessionRepository = Depends(_get_repository),
clarification_engine: ClarificationEngine = Depends(_get_clarification_engine),
current_user: User = Depends(get_current_user),
):
with belief_scope("get_clarification_state"):
logger.reason(
"Belief protocol reasoning checkpoint for get_clarification_state"
)
session = _get_owned_session_or_404(repository, session_id, current_user)
if not session.clarification_sessions:
logger.reflect('Belief protocol postcondition checkpoint for get_clarification_state')
logger.reflect(
"Belief protocol postcondition checkpoint for get_clarification_state"
)
return _serialize_empty_clarification_state()
clarification_session = _get_latest_clarification_session_or_404(session)
current_question = clarification_engine.build_question_payload(session)
logger.reflect('Belief protocol postcondition checkpoint for get_clarification_state')
return _serialize_clarification_state(ClarificationStateResult(clarification_session=clarification_session, current_question=current_question, session=session, changed_findings=[]))
logger.reflect(
"Belief protocol postcondition checkpoint for get_clarification_state"
)
return _serialize_clarification_state(
ClarificationStateResult(
clarification_session=clarification_session,
current_question=current_question,
session=session,
changed_findings=[],
)
)
# [/DEF:get_clarification_state:Function]
@@ -1479,15 +1674,42 @@ async def get_clarification_state(session_id: str, repository: DatasetReviewSess
# @POST: Clarification session enters active state with one current question or completes deterministically when no unresolved items remain.
# @SIDE_EFFECT: Mutates clarification pointer, readiness, and recommended action.
# @DATA_CONTRACT: Input[session_id:str] -> Output[ClarificationStateResponse]
@router.post('/sessions/{session_id}/clarification/resume', response_model=ClarificationStateResponse, dependencies=[Depends(_require_auto_review_flag), Depends(_require_clarification_flag), Depends(has_permission('dataset:session', 'MANAGE'))])
async def resume_clarification(session_id: str, session_version: int=Depends(_require_session_version_header), repository: DatasetReviewSessionRepository=Depends(_get_repository), clarification_engine: ClarificationEngine=Depends(_get_clarification_engine), current_user: User=Depends(get_current_user)):
with belief_scope('resume_clarification'):
logger.reason('Belief protocol reasoning checkpoint for resume_clarification')
session = _prepare_owned_session_mutation(repository, session_id, current_user, session_version)
@router.post(
"/sessions/{session_id}/clarification/resume",
response_model=ClarificationStateResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(_require_clarification_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def resume_clarification(
session_id: str,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
clarification_engine: ClarificationEngine = Depends(_get_clarification_engine),
current_user: User = Depends(get_current_user),
):
with belief_scope("resume_clarification"):
logger.reason("Belief protocol reasoning checkpoint for resume_clarification")
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
)
clarification_session = _get_latest_clarification_session_or_404(session)
current_question = clarification_engine.build_question_payload(session)
logger.reflect('Belief protocol postcondition checkpoint for resume_clarification')
return _serialize_clarification_state(ClarificationStateResult(clarification_session=clarification_session, current_question=current_question, session=session, changed_findings=[]))
logger.reflect(
"Belief protocol postcondition checkpoint for resume_clarification"
)
return _serialize_clarification_state(
ClarificationStateResult(
clarification_session=clarification_session,
current_question=current_question,
session=session,
changed_findings=[],
)
)
# [/DEF:resume_clarification:Function]
@@ -2040,6 +2262,8 @@ async def trigger_preview_generation(
expected_version=session_version,
)
)
except DatasetReviewSessionVersionConflictError as exc:
raise _build_session_version_conflict_http_exception(exc) from exc
except ValueError as exc:
detail = str(exc)
status_code = (
@@ -2108,6 +2332,8 @@ async def launch_dataset(
expected_version=session_version,
)
)
except DatasetReviewSessionVersionConflictError as exc:
raise _build_session_version_conflict_http_exception(exc) from exc
except ValueError as exc:
detail = str(exc)
status_code = (

View File

@@ -19,7 +19,7 @@ from datetime import datetime, time, timedelta, date
# @COMPLEXITY: 3
# @SEMANTICS: scheduler, service, apscheduler
# @PURPOSE: Provides a service to manage scheduled backup tasks.
# @RELATION: DEPENDS_ON -> ThrottledSchedulerConfigurator; CALLS -> asyncio
# @RELATION: DEPENDS_ON -> [ThrottledSchedulerConfigurator]
class SchedulerService:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the scheduler service with task and config managers.

View File

@@ -154,7 +154,7 @@ class SupersetClient:
# @PRE: Client is authenticated.
# @POST: Returns total count and one page of dashboards.
# @DATA_CONTRACT: Input[query: Optional[Dict]] -> Output[Tuple[int, List[Dict]]]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_dashboards_page(
self, query: Optional[Dict] = None
) -> Tuple[int, List[Dict]]:
@@ -438,7 +438,7 @@ class SupersetClient:
# @PRE: Client is authenticated and dashboard_ref exists.
# @POST: Returns dashboard payload from Superset API.
# @DATA_CONTRACT: Input[dashboard_ref: Union[int, str]] -> Output[Dict]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_dashboard(self, dashboard_ref: Union[int, str]) -> Dict:
with belief_scope("SupersetClient.get_dashboard", f"ref={dashboard_ref}"):
response = self.network.request(
@@ -454,7 +454,7 @@ class SupersetClient:
# @PRE: Client is authenticated and permalink key exists.
# @POST: Returns dashboard permalink state payload from Superset API.
# @DATA_CONTRACT: Input[permalink_key: str] -> Output[Dict]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_dashboard_permalink_state(self, permalink_key: str) -> Dict:
with belief_scope(
"SupersetClient.get_dashboard_permalink_state", f"key={permalink_key}"
@@ -472,7 +472,7 @@ class SupersetClient:
# @PRE: Client is authenticated and filter_state_key exists.
# @POST: Returns native filter state payload from Superset API.
# @DATA_CONTRACT: Input[dashboard_id: Union[int, str], filter_state_key: str] -> Output[Dict]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_native_filter_state(
self, dashboard_id: Union[int, str], filter_state_key: str
) -> Dict:
@@ -713,7 +713,7 @@ class SupersetClient:
# @PRE: Client is authenticated and chart_id exists.
# @POST: Returns chart payload from Superset API.
# @DATA_CONTRACT: Input[chart_id: int] -> Output[Dict]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_chart(self, chart_id: int) -> Dict:
with belief_scope("SupersetClient.get_chart", f"id={chart_id}"):
response = self.network.request(method="GET", endpoint=f"/chart/{chart_id}")
@@ -1076,7 +1076,7 @@ class SupersetClient:
# @POST: Returns ZIP content and filename.
# @DATA_CONTRACT: Input[dashboard_id: int] -> Output[Tuple[bytes, str]]
# @SIDE_EFFECT: Performs network I/O to download archive.
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
with belief_scope("export_dashboard"):
app_logger.info(
@@ -1109,7 +1109,7 @@ class SupersetClient:
# @DATA_CONTRACT: Input[file_name: Union[str, Path]] -> Output[Dict]
# @SIDE_EFFECT: Performs network I/O to upload archive.
# @RELATION: CALLS -> [SupersetClientDoImport]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def import_dashboard(
self,
file_name: Union[str, Path],
@@ -1154,7 +1154,7 @@ class SupersetClient:
# @PRE: dashboard_id must exist.
# @POST: Dashboard is removed from Superset.
# @SIDE_EFFECT: Deletes resource from upstream Superset environment.
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def delete_dashboard(self, dashboard_id: Union[int, str]) -> None:
with belief_scope("delete_dashboard"):
app_logger.info(
@@ -1239,7 +1239,7 @@ class SupersetClient:
# @PARAM: dataset_id (int) - The dataset ID to fetch details for.
# @RETURN: Dict - Dataset details with columns and linked_dashboards.
# @RELATION: CALLS -> [SupersetClientGetDataset]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_dataset_detail(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset_detail", f"id={dataset_id}"):
@@ -1366,7 +1366,7 @@ class SupersetClient:
# @PRE: dataset_id must exist.
# @POST: Returns dataset details.
# @DATA_CONTRACT: Input[dataset_id: int] -> Output[Dict]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_dataset(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset", f"id={dataset_id}"):
app_logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id)
@@ -1390,7 +1390,7 @@ from src.logger import belief_scope, logger
# @RELATION: CALLS -> [SupersetClientGetDataset]
# @RELATION: CALLS -> [SupersetClientBuildDatasetPreviewQueryContext]
# @RELATION: CALLS -> [SupersetClientBuildDatasetPreviewLegacyFormData]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
# @RELATION: CALLS -> [SupersetClientExtractCompiledSqlFromPreviewResponse]
# @SIDE_EFFECT: Performs upstream dataset lookup and preview network I/O against Superset.
def compile_dataset_preview(self, dataset_id: int, template_params: Optional[Dict[str, Any]]=None, effective_filters: Optional[List[Dict[str, Any]]]=None) -> Dict[str, Any]:
@@ -1726,7 +1726,7 @@ from src.logger import belief_scope, logger
# @PURPOSE: Normalize compiled SQL from either chart-data or legacy form_data preview responses.
# @PRE: response must be the decoded preview response body from a supported Superset endpoint.
# @POST: Returns compiled SQL and raw response or raises SupersetAPIError when the endpoint does not expose query text.
# @RELATION: DEPENDS_ON -> [ConnectionContracts]
# @RELATION: DEPENDS_ON -> [APIClient]
def _extract_compiled_sql_from_preview_response(
self, response: Any
) -> Dict[str, Any]:
@@ -1808,7 +1808,7 @@ from src.logger import belief_scope, logger
# @POST: Dataset is updated in Superset.
# @DATA_CONTRACT: Input[dataset_id: int, data: Dict] -> Output[Dict]
# @SIDE_EFFECT: Modifies resource in upstream Superset environment.
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def update_dataset(self, dataset_id: int, data: Dict) -> Dict:
with belief_scope("SupersetClient.update_dataset", f"id={dataset_id}"):
app_logger.info("[update_dataset][Enter] Updating dataset %s.", dataset_id)
@@ -1857,7 +1857,7 @@ from src.logger import belief_scope, logger
# @PRE: database_id must exist.
# @POST: Returns database details.
# @DATA_CONTRACT: Input[database_id: int] -> Output[Dict]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def get_database(self, database_id: int) -> Dict:
with belief_scope("get_database"):
app_logger.info("[get_database][Enter] Fetching database %s.", database_id)
@@ -1950,7 +1950,7 @@ from src.logger import belief_scope, logger
# @PURPOSE: Performs the actual multipart upload for import.
# @PRE: file_name must be a path to an existing ZIP file.
# @POST: Returns the API response from the upload.
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def _do_import(self, file_name: Union[str, Path]) -> Dict:
with belief_scope("_do_import"):
app_logger.debug(f"[_do_import][State] Uploading file: {file_name}")
@@ -2031,7 +2031,7 @@ from src.logger import belief_scope, logger
# @PURPOSE: Fetches the total number of items for a given endpoint.
# @PRE: endpoint must be a valid Superset API path.
# @POST: Returns the total count as an integer.
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def _fetch_total_object_count(self, endpoint: str) -> int:
with belief_scope("_fetch_total_object_count"):
return self.network.fetch_paginated_count(
@@ -2047,7 +2047,7 @@ from src.logger import belief_scope, logger
# @PURPOSE: Iterates through all pages to collect all data items.
# @PRE: pagination_options must contain base_query, total_count, and results_field.
# @POST: Returns a combined list of all items.
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: CALLS -> [APIClient]
def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]:
with belief_scope("_fetch_all_pages"):
return self.network.fetch_paginated_data(

View File

@@ -147,6 +147,7 @@ class DatasetReviewSession(Base):
default=RecommendedAction.IMPORT_FROM_SUPERSET,
)
version = Column(Integer, nullable=False, default=0)
__mapper_args__ = {"version_id_col": version, "version_id_generator": False}
status = Column(
SQLEnum(SessionStatus), nullable=False, default=SessionStatus.ACTIVE
)

View File

@@ -3,7 +3,7 @@
# @SEMANTICS: git, llm, commit
# @PURPOSE: LLM-based extensions for the Git plugin, specifically for commit message generation.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.plugins.llm_analysis.service.LLMClient
# @RELATION: DEPENDS_ON -> [LLMClient]
from typing import List
from tenacity import retry, stop_after_attempt, wait_exponential

View File

@@ -3,10 +3,10 @@
# @SEMANTICS: plugin, llm, analysis, documentation
# @PURPOSE: Implements DashboardValidationPlugin and DocumentationPlugin.
# @LAYER: Domain
# @RELATION: INHERITS -> backend.src.core.plugin_base.PluginBase
# @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.ScreenshotService
# @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.LLMClient
# @RELATION: CALLS -> backend.src.services.llm_provider.LLMProviderService
# @RELATION: INHERITS -> [PluginBase]
# @RELATION: CALLS -> [ScreenshotService]
# @RELATION: CALLS -> [LLMClient]
# @RELATION: CALLS -> [LLMProviderService]
# @RELATION: USES -> TaskContext
# @INVARIANT: All LLM interactions must be executed as asynchronous tasks.

View File

@@ -3,7 +3,7 @@
# @SEMANTICS: scheduler, task, automation
# @PURPOSE: Provides helper functions to schedule LLM-based validation tasks.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.core.scheduler
# @RELATION: DEPENDS_ON -> [SchedulerService]
from typing import Dict, Any
from ...dependencies import get_task_manager, get_scheduler_service

View File

@@ -3,7 +3,7 @@
# @SEMANTICS: tests, llm, prompts, templates, settings
# @PURPOSE: Validate normalization and rendering behavior for configurable LLM prompt templates.
# @LAYER: Domain Tests
# @RELATION: DEPENDS_ON -> [backend.src.services.llm_prompt_templates:Function]
# @RELATION: DEPENDS_ON -> [llm_prompt_templates]
# @INVARIANT: All required prompt keys remain available after normalization.
from src.services.llm_prompt_templates import (

View File

@@ -451,6 +451,7 @@ class DatasetReviewOrchestrator:
session.session_id,
command.user.id,
preview,
expected_version=command.expected_version,
)
session.current_phase = SessionPhase.PREVIEW
@@ -598,6 +599,7 @@ class DatasetReviewOrchestrator:
session.session_id,
command.user.id,
run_context,
expected_version=command.expected_version,
)
session.current_phase = SessionPhase.LAUNCH

View File

@@ -1,6 +1,7 @@
import pytest
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.orm import sessionmaker
from pathlib import Path
from src.core.database import _ensure_dataset_review_session_columns
from src.models.mapping import Base, Environment
from src.models.auth import User
@@ -352,6 +353,7 @@ def test_save_profile_and_findings(db_session):
assert updated_session.profile.dataset_name == "Test DS"
assert len(updated_session.findings) == 1
assert updated_session.findings[0].code == "ERR1"
assert updated_session.version == 1
# Verify removal of old findings
new_finding = ValidationFinding(
@@ -369,11 +371,112 @@ def test_save_profile_and_findings(db_session):
final_session = repo.load_session_detail(session.session_id, "user1")
assert len(final_session.findings) == 1
assert final_session.findings[0].code == "WARN1"
assert final_session.version == 2
# [/DEF:test_save_profile_and_findings:Function]
# [DEF:test_save_profile_and_findings_rejects_stale_concurrent_write:Function]
# @RELATION: BINDS_TO -> SessionRepositoryTests
# @PURPOSE: Verify repository save path translates concurrent stale session writes into deterministic optimistic-lock conflicts.
def test_save_profile_and_findings_rejects_stale_concurrent_write(tmp_path: Path):
db_path = tmp_path / "dataset_review_session_repository.sqlite"
engine = create_engine(f"sqlite:///{db_path}")
Base.metadata.create_all(engine)
SessionFactory = sessionmaker(bind=engine)
seed_session = SessionFactory()
seed_session.add_all(
[
User(
id="user1",
username="testuser",
email="test@example.com",
password_hash="pw",
),
Environment(
id="env1",
name="Prod",
url="http://superset",
credentials_id="cred1",
),
]
)
seed_session.commit()
created_session = DatasetReviewSession(
user_id="user1",
environment_id="env1",
source_kind="superset_link",
source_input="http://link",
dataset_ref="dataset1",
)
seed_session.add(created_session)
seed_session.commit()
session_id = created_session.session_id
seed_session.close()
writer_a = SessionFactory()
writer_b = SessionFactory()
repo_a = DatasetReviewSessionRepository(writer_a)
repo_b = DatasetReviewSessionRepository(writer_b)
repo_a.save_profile_and_findings(
session_id,
"user1",
DatasetProfile(
session_id=session_id,
dataset_name="Writer A",
business_summary="Summary A",
business_summary_source=BusinessSummarySource.INFERRED,
confidence_state=ConfidenceState.UNRESOLVED,
),
[
ValidationFinding(
session_id=session_id,
area=FindingArea.SOURCE_INTAKE,
severity=FindingSeverity.WARNING,
code="WARN_A",
title="Warning A",
message="Writer A saved first",
)
],
)
with pytest.raises(DatasetReviewSessionVersionConflictError) as exc_info:
repo_b.save_profile_and_findings(
session_id,
"user1",
DatasetProfile(
session_id=session_id,
dataset_name="Writer B",
business_summary="Summary B",
business_summary_source=BusinessSummarySource.INFERRED,
confidence_state=ConfidenceState.UNRESOLVED,
),
[
ValidationFinding(
session_id=session_id,
area=FindingArea.DATASET_PROFILE,
severity=FindingSeverity.BLOCKING,
code="ERR_B",
title="Error B",
message="Writer B lost optimistic lock",
)
],
expected_version=0,
)
assert exc_info.value.expected_version == 0
assert exc_info.value.actual_version == 1
writer_a.close()
writer_b.close()
# [/DEF:test_save_profile_and_findings_rejects_stale_concurrent_write:Function]
# [DEF:test_save_run_context:Function]
# @RELATION: BINDS_TO -> SessionRepositoryTests
def test_save_run_context(db_session):

View File

@@ -16,6 +16,7 @@ from datetime import datetime
from typing import Any, Optional, List, cast
from sqlalchemy import or_
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.orm.exc import StaleDataError
from src.models.dataset_review import (
ClarificationQuestion,
ClarificationSession,
@@ -207,6 +208,78 @@ class DatasetReviewSessionRepository:
# [/DEF:bump_session_version:Function]
# [DEF:commit_session_mutation:Function]
# @COMPLEXITY: 4
# @PURPOSE: Commit one prepared dataset review session mutation and translate stale writes into deterministic optimistic-lock conflicts.
# @RELATION: [DEPENDS_ON] -> [DatasetReviewSession]
# @PRE: session mutation has already been assembled in the current SQLAlchemy transaction.
# @POST: session mutation is committed with one version increment or a deterministic conflict error is raised.
# @SIDE_EFFECT: increments session version, commits the transaction, refreshes ORM rows, or rolls back failed stale writes.
# @DATA_CONTRACT: Input[DatasetReviewSession,List[Any]|None,int|None] -> Output[DatasetReviewSession|DatasetReviewSessionVersionConflictError]
def commit_session_mutation(
self,
session: DatasetReviewSession,
*,
refresh_targets: Optional[List[Any]] = None,
expected_version: Optional[int] = None,
) -> DatasetReviewSession:
with belief_scope("DatasetReviewSessionRepository.commit_session_mutation"):
session_record = cast(Any, session)
observed_version = int(
expected_version
if expected_version is not None
else getattr(session_record, "version", 0) or 0
)
logger.reason(
"Committing dataset review session mutation with optimistic lock",
extra={
"session_id": session.session_id,
"observed_version": observed_version,
"refresh_count": len(refresh_targets or []),
},
)
self.bump_session_version(session)
try:
self.db.commit()
except StaleDataError as exc:
self.db.rollback()
actual_version_row = (
self.db.query(DatasetReviewSession.version)
.filter(DatasetReviewSession.session_id == session.session_id)
.first()
)
actual_version = (
int(actual_version_row[0] or 0) if actual_version_row else 0
)
logger.explore(
"Dataset review session commit rejected by optimistic lock",
extra={
"session_id": session.session_id,
"expected_version": observed_version,
"actual_version": actual_version,
},
)
raise DatasetReviewSessionVersionConflictError(
session.session_id,
observed_version,
actual_version,
) from exc
self.db.refresh(session)
for target in refresh_targets or []:
self.db.refresh(target)
logger.reflect(
"Dataset review session mutation committed",
extra={
"session_id": session.session_id,
"version": getattr(session, "version", None),
"refresh_count": len(refresh_targets or []),
},
)
return session
# [/DEF:commit_session_mutation:Function]
# [DEF:load_detail:Function]
# @COMPLEXITY: 4
# @PURPOSE: Return the full session aggregate for API and frontend resume flows.
@@ -287,10 +360,13 @@ class DatasetReviewSessionRepository:
user_id: str,
profile: DatasetProfile,
findings: List[ValidationFinding],
expected_version: Optional[int] = None,
) -> DatasetReviewSession:
with belief_scope("DatasetReviewSessionRepository.save_profile_and_findings"):
session = self._get_owned_session(session_id, user_id)
session_record = cast(Any, session)
if expected_version is not None:
self.require_session_version(session, expected_version)
logger.reason(
"Persisting dataset profile and replacing validation findings",
extra={
@@ -298,6 +374,7 @@ class DatasetReviewSessionRepository:
"user_id": user_id,
"has_profile": bool(profile),
"findings_count": len(findings),
"expected_version": expected_version,
},
)
@@ -320,8 +397,7 @@ class DatasetReviewSessionRepository:
finding_record.session_id = session_id
self.db.add(finding)
self.bump_session_version(session)
self.db.commit()
self.commit_session_mutation(session, expected_version=expected_version)
logger.reflect(
"Dataset profile and validation findings committed",
extra={
@@ -351,10 +427,13 @@ class DatasetReviewSessionRepository:
imported_filters: List[ImportedFilter],
template_variables: List[TemplateVariable],
execution_mappings: List[ExecutionMapping],
expected_version: Optional[int] = None,
) -> DatasetReviewSession:
with belief_scope("DatasetReviewSessionRepository.save_recovery_state"):
session = self._get_owned_session(session_id, user_id)
session_record = cast(Any, session)
if expected_version is not None:
self.require_session_version(session, expected_version)
logger.reason(
"Persisting dataset review recovery bootstrap state",
extra={
@@ -363,6 +442,7 @@ class DatasetReviewSessionRepository:
"imported_filters_count": len(imported_filters),
"template_variables_count": len(template_variables),
"execution_mappings_count": len(execution_mappings),
"expected_version": expected_version,
},
)
@@ -393,8 +473,7 @@ class DatasetReviewSessionRepository:
execution_mapping_record.session_id = session_id
self.db.add(execution_mapping)
self.bump_session_version(session)
self.db.commit()
self.commit_session_mutation(session, expected_version=expected_version)
logger.reflect(
"Dataset review recovery bootstrap state committed",
extra={
@@ -420,14 +499,24 @@ class DatasetReviewSessionRepository:
# @SIDE_EFFECT: updates prior preview statuses, inserts a preview row, mutates the parent session, and commits.
# @DATA_CONTRACT: Input[PreviewMutation] -> Output[CompiledPreview]
def save_preview(
self, session_id: str, user_id: str, preview: CompiledPreview
self,
session_id: str,
user_id: str,
preview: CompiledPreview,
expected_version: Optional[int] = None,
) -> CompiledPreview:
with belief_scope("DatasetReviewSessionRepository.save_preview"):
session = self._get_owned_session(session_id, user_id)
session_record = cast(Any, session)
if expected_version is not None:
self.require_session_version(session, expected_version)
logger.reason(
"Persisting compiled preview and staling previous preview snapshots",
extra={"session_id": session_id, "user_id": user_id},
extra={
"session_id": session_id,
"user_id": user_id,
"expected_version": expected_version,
},
)
self.db.query(CompiledPreview).filter(
@@ -438,9 +527,11 @@ class DatasetReviewSessionRepository:
self.db.flush()
session_record.last_preview_id = preview.preview_id
self.bump_session_version(session)
self.db.commit()
self.db.refresh(preview)
self.commit_session_mutation(
session,
refresh_targets=[preview],
expected_version=expected_version,
)
logger.reflect(
"Compiled preview committed as latest session preview",
extra={
@@ -464,23 +555,35 @@ class DatasetReviewSessionRepository:
# @SIDE_EFFECT: inserts a run-context row, mutates the parent session pointer, and commits.
# @DATA_CONTRACT: Input[RunContextMutation] -> Output[DatasetRunContext]
def save_run_context(
self, session_id: str, user_id: str, run_context: DatasetRunContext
self,
session_id: str,
user_id: str,
run_context: DatasetRunContext,
expected_version: Optional[int] = None,
) -> DatasetRunContext:
with belief_scope("DatasetReviewSessionRepository.save_run_context"):
session = self._get_owned_session(session_id, user_id)
session_record = cast(Any, session)
if expected_version is not None:
self.require_session_version(session, expected_version)
logger.reason(
"Persisting dataset run context audit snapshot",
extra={"session_id": session_id, "user_id": user_id},
extra={
"session_id": session_id,
"user_id": user_id,
"expected_version": expected_version,
},
)
self.db.add(run_context)
self.db.flush()
session_record.last_run_context_id = run_context.run_context_id
self.bump_session_version(session)
self.db.commit()
self.db.refresh(run_context)
self.commit_session_mutation(
session,
refresh_targets=[run_context],
expected_version=expected_version,
)
logger.reflect(
"Dataset run context committed as latest launch snapshot",
extra={