semantics update

This commit is contained in:
2026-04-01 15:30:13 +03:00
parent 05f5cf5393
commit 3bc4c8f885
22 changed files with 438 additions and 299 deletions

View File

@@ -1,5 +1,5 @@
# [DEF:DatasetReviewApi:Module]
# @COMPLEXITY: 4
# @COMPLEXITY: 3
# @SEMANTICS: dataset_review, api, session_lifecycle, exports, rbac, feature_flags
# @PURPOSE: Expose dataset review session lifecycle and export endpoints for backend US1.
# @LAYER: API
@@ -448,34 +448,92 @@ def _require_session_version_header(
# [/DEF:_require_session_version_header:Function]
from src.logger import belief_scope, logger
# [DEF:_enforce_session_version:Function]
# @COMPLEXITY: 4
# @PURPOSE: Convert repository optimistic-lock conflicts into deterministic HTTP 409 responses.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
def _enforce_session_version(repository: DatasetReviewSessionRepository, session: DatasetReviewSession, expected_version: int) -> DatasetReviewSession:
with belief_scope('_enforce_session_version'):
logger.reason('Belief protocol reasoning checkpoint for _enforce_session_version')
# @PRE: Session belongs to the active owner-scoped mutation flow and expected_version comes from the caller's optimistic-lock header.
# @POST: Returns the same session when versions match or raises HTTP 409 with deterministic conflict payload.
# @SIDE_EFFECT: none.
# @DATA_CONTRACT: Input[DatasetReviewSessionRepository,DatasetReviewSession,int] -> Output[DatasetReviewSession|HTTPException]
def _enforce_session_version(
repository: DatasetReviewSessionRepository,
session: DatasetReviewSession,
expected_version: int,
) -> DatasetReviewSession:
with belief_scope("_enforce_session_version"):
logger.reason(
"Checking dataset review optimistic-lock version",
extra={
"session_id": session.session_id,
"expected_version": expected_version,
},
)
try:
repository.require_session_version(session, expected_version)
logger.reflect('Belief protocol postcondition checkpoint for _enforce_session_version')
return session
except DatasetReviewSessionVersionConflictError as exc:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail={'error_code': 'session_version_conflict', 'message': str(exc), 'session_id': exc.session_id, 'expected_version': exc.expected_version, 'actual_version': exc.actual_version}) from exc
logger.explore(
"Dataset review optimistic-lock conflict detected",
extra={
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
},
)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"error_code": "session_version_conflict",
"message": str(exc),
"session_id": exc.session_id,
"expected_version": exc.expected_version,
"actual_version": exc.actual_version,
},
) from exc
logger.reflect(
"Dataset review optimistic-lock version accepted",
extra={
"session_id": session.session_id,
"version": getattr(session, "version", None),
},
)
return session
# [/DEF:_enforce_session_version:Function]
# [DEF:_prepare_owned_session_mutation:Function]
# @COMPLEXITY: 4
# @PURPOSE: Resolve owner-scoped mutation session and enforce optimistic-lock version before changing dataset review state.
def _prepare_owned_session_mutation(repository: DatasetReviewSessionRepository, session_id: str, current_user: User, expected_version: int) -> DatasetReviewSession:
with belief_scope('_prepare_owned_session_mutation'):
logger.reason('Belief protocol reasoning checkpoint for _prepare_owned_session_mutation')
# @RELATION: [CALLS] ->[_get_owned_session_or_404]
# @RELATION: [CALLS] ->[_require_owner_mutation_scope]
# @RELATION: [CALLS] ->[_enforce_session_version]
# @PRE: session_id targets an existing session visible to current_user and expected_version comes from the client mutation header.
# @POST: Returns the owned session only when access and optimistic-lock checks both pass.
# @SIDE_EFFECT: none.
# @DATA_CONTRACT: Input[DatasetReviewSessionRepository,str,User,int] -> Output[DatasetReviewSession|HTTPException]
def _prepare_owned_session_mutation(
repository: DatasetReviewSessionRepository,
session_id: str,
current_user: User,
expected_version: int,
) -> DatasetReviewSession:
with belief_scope("_prepare_owned_session_mutation"):
logger.reason(
"Preparing owner-scoped dataset review mutation",
extra={"session_id": session_id, "user_id": current_user.id},
)
session = _get_owned_session_or_404(repository, session_id, current_user)
_require_owner_mutation_scope(session, current_user)
logger.reflect('Belief protocol postcondition checkpoint for _prepare_owned_session_mutation')
return _enforce_session_version(repository, session, expected_version)
guarded_session = _enforce_session_version(repository, session, expected_version)
logger.reflect(
"Dataset review mutation session passed ownership and version guards",
extra={
"session_id": guarded_session.session_id,
"user_id": current_user.id,
"version": getattr(guarded_session, "version", None),
},
)
return guarded_session
# [/DEF:_prepare_owned_session_mutation:Function]
@@ -483,15 +541,34 @@ def _prepare_owned_session_mutation(repository: DatasetReviewSessionRepository,
# @COMPLEXITY: 4
# @PURPOSE: Centralize dataset-review session version bumping and commit semantics for owner-scoped mutation endpoints.
# @RELATION: [DEPENDS_ON] ->[DatasetReviewSessionRepository]
def _commit_owned_session_mutation(repository: DatasetReviewSessionRepository, session: DatasetReviewSession, *, refresh_targets: Optional[List[Any]]=None) -> DatasetReviewSession:
with belief_scope('_commit_owned_session_mutation'):
logger.reason('Belief protocol reasoning checkpoint for _commit_owned_session_mutation')
# @PRE: Session mutation has already passed ownership and optimistic-lock guards.
# @POST: Session version is bumped, changes are committed, and requested targets are refreshed before returning the same session.
# @SIDE_EFFECT: Persists the current transaction and refreshes ORM targets from the database.
# @DATA_CONTRACT: Input[DatasetReviewSessionRepository,DatasetReviewSession,List[Any]|None] -> Output[DatasetReviewSession]
def _commit_owned_session_mutation(
repository: DatasetReviewSessionRepository,
session: DatasetReviewSession,
*,
refresh_targets: Optional[List[Any]] = None,
) -> DatasetReviewSession:
with belief_scope("_commit_owned_session_mutation"):
logger.reason(
"Committing dataset review mutation",
extra={"session_id": session.session_id},
)
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
for target in refresh_targets or []:
repository.db.refresh(target)
logger.reflect('Belief protocol postcondition checkpoint for _commit_owned_session_mutation')
logger.reflect(
"Dataset review mutation committed and refreshed",
extra={
"session_id": session.session_id,
"version": getattr(session, "version", None),
"refresh_count": len(refresh_targets or []),
},
)
return session
# [/DEF:_commit_owned_session_mutation:Function]
@@ -596,8 +673,12 @@ def _get_owned_session_or_404(
session_id: str,
current_user: User,
) -> DatasetReviewSession:
with belief_scope("dataset_review.get_owned_session_or_404"):
session = repository.load_session_detail(session_id, current_user.id)
with belief_scope("_get_owned_session_or_404"):
logger.reason(
"Resolving dataset review session in current ownership scope",
extra={"session_id": session_id, "user_id": current_user.id},
)
session = repository.load_detail(session_id, current_user.id)
if session is None:
logger.explore(
"Dataset review session not found in current ownership scope",
@@ -606,6 +687,10 @@ def _get_owned_session_or_404(
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Session not found"
)
logger.reflect(
"Dataset review session resolved for current ownership scope",
extra={"session_id": session.session_id, "user_id": current_user.id},
)
return session
@@ -624,12 +709,29 @@ def _require_owner_mutation_scope(
session: DatasetReviewSession,
current_user: User,
) -> DatasetReviewSession:
if session.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the owner can mutate dataset review state",
with belief_scope("_require_owner_mutation_scope"):
logger.reason(
"Checking owner-only mutation scope for dataset review session",
extra={"session_id": session.session_id, "user_id": current_user.id},
)
return session
if session.user_id != current_user.id:
logger.explore(
"Dataset review mutation blocked for non-owner",
extra={
"session_id": session.session_id,
"session_owner_id": session.user_id,
"user_id": current_user.id,
},
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the owner can mutate dataset review state",
)
logger.reflect(
"Dataset review mutation confirmed for session owner",
extra={"session_id": session.session_id, "user_id": current_user.id},
)
return session
# [/DEF:_require_owner_mutation_scope:Function]
@@ -672,12 +774,25 @@ def _get_owned_mapping_or_404(
session: DatasetReviewSession,
mapping_id: str,
) -> ExecutionMapping:
for mapping in session.execution_mappings:
if mapping.mapping_id == mapping_id:
return mapping
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Execution mapping not found"
)
with belief_scope("_get_owned_mapping_or_404"):
logger.reason(
"Resolving execution mapping inside owned dataset review session",
extra={"session_id": session.session_id, "mapping_id": mapping_id},
)
for mapping in session.execution_mappings:
if mapping.mapping_id == mapping_id:
logger.reflect(
"Execution mapping resolved inside owned session",
extra={"session_id": session.session_id, "mapping_id": mapping_id},
)
return mapping
logger.explore(
"Execution mapping missing from owned dataset review session",
extra={"session_id": session.session_id, "mapping_id": mapping_id},
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Execution mapping not found"
)
# [/DEF:_get_owned_mapping_or_404:Function]
@@ -695,12 +810,25 @@ def _get_owned_field_or_404(
session: DatasetReviewSession,
field_id: str,
) -> SemanticFieldEntry:
for field in session.semantic_fields:
if field.field_id == field_id:
return field
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Semantic field not found"
)
with belief_scope("_get_owned_field_or_404"):
logger.reason(
"Resolving semantic field inside owned dataset review session",
extra={"session_id": session.session_id, "field_id": field_id},
)
for field in session.semantic_fields:
if field.field_id == field_id:
logger.reflect(
"Semantic field resolved inside owned session",
extra={"session_id": session.session_id, "field_id": field_id},
)
return field
logger.explore(
"Semantic field missing from owned dataset review session",
extra={"session_id": session.session_id, "field_id": field_id},
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Semantic field not found"
)
# [/DEF:_get_owned_field_or_404:Function]
@@ -1059,17 +1187,30 @@ async def list_sessions(
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.list_sessions"):
sessions = repository.list_sessions_for_user(current_user.id)
logger.reason(
"Listing dataset review sessions for current user",
extra={"user_id": current_user.id, "page": page, "page_size": page_size},
)
sessions = repository.list_user_sess(current_user.id)
start = (page - 1) * page_size
end = start + page_size
items = [_serialize_session_summary(session) for session in sessions[start:end]]
return SessionCollectionResponse(
response_payload = SessionCollectionResponse(
items=items,
total=len(sessions),
page=page,
page_size=page_size,
has_next=end < len(sessions),
)
logger.reflect(
"Dataset review session page assembled",
extra={
"user_id": current_user.id,
"returned_items": len(items),
"total": len(sessions),
},
)
return response_payload
# [/DEF:list_sessions:Function]
@@ -1083,46 +1224,32 @@ async def list_sessions(
# @POST: returns persisted session summary scoped to the authenticated user.
# @SIDE_EFFECT: persists session/profile/findings and may enqueue recovery task.
# @DATA_CONTRACT: Input[StartSessionRequest] -> Output[SessionSummary]
@router.post(
"/sessions",
response_model=SessionSummary,
status_code=status.HTTP_201_CREATED,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def start_session(
request: StartSessionRequest,
orchestrator: DatasetReviewOrchestrator = Depends(_get_orchestrator),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.start_session"):
@router.post('/sessions', response_model=SessionSummary, status_code=status.HTTP_201_CREATED, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'MANAGE'))])
async def start_session(request: StartSessionRequest, orchestrator: DatasetReviewOrchestrator=Depends(_get_orchestrator), current_user: User=Depends(get_current_user)):
with belief_scope('start_session'):
logger.reason(
"Starting dataset review session",
extra={
"user_id": current_user.id,
"environment_id": request.environment_id,
"source_kind": request.source_kind,
},
)
try:
result = orchestrator.start_session(
StartSessionCommand(
user=current_user,
environment_id=request.environment_id,
source_kind=request.source_kind,
source_input=request.source_input,
)
)
result = orchestrator.start_session(StartSessionCommand(user=current_user, environment_id=request.environment_id, source_kind=request.source_kind, source_input=request.source_input))
except ValueError as exc:
logger.explore(
"Dataset review session start rejected",
extra={"user_id": current_user.id, "error": str(exc)},
)
logger.explore('Dataset review session start rejected', extra={'user_id': current_user.id, 'error': str(exc)})
detail = str(exc)
status_code = (
status.HTTP_404_NOT_FOUND
if detail == "Environment not found"
else status.HTTP_400_BAD_REQUEST
)
status_code = status.HTTP_404_NOT_FOUND if detail == 'Environment not found' else status.HTTP_400_BAD_REQUEST
raise HTTPException(status_code=status_code, detail=detail) from exc
logger.reflect(
"Dataset review session started and serialized",
extra={
"session_id": result.session.session_id,
"user_id": current_user.id,
},
)
return _serialize_session_summary(result.session)
# [/DEF:start_session:Function]
@@ -1144,8 +1271,17 @@ async def get_session_detail(
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.get_session_detail"):
logger.reason(
"Loading dataset review session detail",
extra={"session_id": session_id, "user_id": current_user.id},
)
session = _get_owned_session_or_404(repository, session_id, current_user)
return _serialize_session_detail(session)
detail = _serialize_session_detail(session)
logger.reflect(
"Dataset review session detail serialized",
extra={"session_id": session.session_id, "user_id": current_user.id},
)
return detail
# [/DEF:get_session_detail:Function]
@@ -1159,57 +1295,43 @@ async def get_session_detail(
# @POST: returns updated summary without changing ownership or unrelated aggregates.
# @SIDE_EFFECT: mutates session lifecycle fields in persistence.
# @DATA_CONTRACT: Input[UpdateSessionRequest] -> Output[SessionSummary]
@router.patch(
"/sessions/{session_id}",
response_model=SessionSummary,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def update_session(
session_id: str,
request: UpdateSessionRequest,
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.update_session"):
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
@router.patch('/sessions/{session_id}', response_model=SessionSummary, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'MANAGE'))])
async def update_session(session_id: str, request: UpdateSessionRequest, session_version: int=Depends(_require_session_version_header), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('update_session'):
logger.reason(
"Updating dataset review session lifecycle state",
extra={
"session_id": session_id,
"user_id": current_user.id,
"requested_status": request.status.value,
},
)
session = _prepare_owned_session_mutation(repository, session_id, current_user, session_version)
session_record = cast(Any, session)
session_record.status = request.status
if request.status == SessionStatus.PAUSED:
session_record.recommended_action = RecommendedAction.RESUME_SESSION
elif request.status in {
SessionStatus.ARCHIVED,
SessionStatus.CANCELLED,
SessionStatus.COMPLETED,
}:
elif request.status in {SessionStatus.ARCHIVED, SessionStatus.CANCELLED, SessionStatus.COMPLETED}:
session_record.active_task_id = None
repository.bump_session_version(session)
repository.db.commit()
repository.db.refresh(session)
_record_session_event(
repository,
session,
current_user,
event_type="session_status_updated",
event_summary="Dataset review session lifecycle updated",
event_details={
_record_session_event(repository, session, current_user, event_type='session_status_updated', event_summary='Dataset review session lifecycle updated', event_details={'status': session_record.status.value, 'version': session_record.version})
logger.reflect(
"Dataset review session lifecycle updated",
extra={
"session_id": session.session_id,
"user_id": current_user.id,
"status": session_record.status.value,
"version": session_record.version,
},
)
return _serialize_session_summary(session)
# [/DEF:update_session:Function]
from src.logger import belief_scope, logger
# [DEF:delete_session:Function]
# @COMPLEXITY: 4
# @PURPOSE: Archive or hard-delete a session owned by the current user.
@@ -1218,57 +1340,41 @@ async def update_session(
# @POST: session is archived or deleted and no foreign-session existence is disclosed.
# @SIDE_EFFECT: mutates or deletes persisted session aggregate.
# @DATA_CONTRACT: Input[session_id:str,hard_delete:bool] -> Output[HTTP 204]
@router.delete(
"/sessions/{session_id}",
status_code=status.HTTP_204_NO_CONTENT,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "MANAGE")),
],
)
async def delete_session(
session_id: str,
hard_delete: bool = Query(False),
session_version: int = Depends(_require_session_version_header),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.delete_session"):
session = _prepare_owned_session_mutation(
repository, session_id, current_user, session_version
@router.delete('/sessions/{session_id}', status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'MANAGE'))])
async def delete_session(session_id: str, hard_delete: bool=Query(False), session_version: int=Depends(_require_session_version_header), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('delete_session'):
logger.reason(
"Deleting or archiving dataset review session",
extra={
"session_id": session_id,
"user_id": current_user.id,
"hard_delete": hard_delete,
},
)
session = _prepare_owned_session_mutation(repository, session_id, current_user, session_version)
if hard_delete:
_record_session_event(
repository,
session,
current_user,
event_type="session_deleted",
event_summary="Dataset review session hard-deleted",
event_details={"hard_delete": True},
)
_record_session_event(repository, session, current_user, event_type='session_deleted', event_summary='Dataset review session hard-deleted', event_details={'hard_delete': True})
repository.db.delete(session)
repository.db.commit()
logger.reflect(
"Dataset review session hard-delete committed",
extra={"session_id": session_id, "user_id": current_user.id},
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
session_record = cast(Any, session)
session_record.status = SessionStatus.ARCHIVED
session_record.active_task_id = None
_commit_owned_session_mutation(repository, session)
_record_session_event(
repository,
session,
current_user,
event_type="session_archived",
event_summary="Dataset review session archived",
event_details={
"hard_delete": False,
_record_session_event(repository, session, current_user, event_type='session_archived', event_summary='Dataset review session archived', event_details={'hard_delete': False, 'version': session_record.version})
logger.reflect(
"Dataset review session archive committed",
extra={
"session_id": session.session_id,
"user_id": current_user.id,
"version": session_record.version,
},
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
# [/DEF:delete_session:Function]
@@ -1280,39 +1386,30 @@ async def delete_session(
# @POST: returns ownership-scoped export payload without fabricating unrelated artifacts.
# @SIDE_EFFECT: none beyond response construction.
# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse]
@router.get(
"/sessions/{session_id}/exports/documentation",
response_model=ExportArtifactResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def export_documentation(
session_id: str,
format: ArtifactFormat = Query(ArtifactFormat.JSON),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.export_documentation"):
@router.get('/sessions/{session_id}/exports/documentation', response_model=ExportArtifactResponse, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'READ'))])
async def export_documentation(session_id: str, format: ArtifactFormat=Query(ArtifactFormat.JSON), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('export_documentation'):
if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only json and markdown exports are supported",
)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Only json and markdown exports are supported')
logger.reason(
"Building dataset review documentation export",
extra={
"session_id": session_id,
"user_id": current_user.id,
"format": format.value,
},
)
session = _get_owned_session_or_404(repository, session_id, current_user)
export_payload = _build_documentation_export(session, format)
return ExportArtifactResponse(
artifact_id=f"documentation-{session.session_id}-{format.value}",
session_id=session.session_id,
artifact_type="documentation",
format=format.value,
storage_ref=export_payload["storage_ref"],
created_by_user_id=current_user.id,
content=export_payload["content"],
logger.reflect(
"Dataset review documentation export assembled",
extra={
"session_id": session.session_id,
"user_id": current_user.id,
"format": format.value,
},
)
return ExportArtifactResponse(artifact_id=f'documentation-{session.session_id}-{format.value}', session_id=session.session_id, artifact_type='documentation', format=format.value, storage_ref=export_payload['storage_ref'], created_by_user_id=current_user.id, content=export_payload['content'])
# [/DEF:export_documentation:Function]
@@ -1324,39 +1421,30 @@ async def export_documentation(
# @POST: returns explicit validation export payload scoped to current user session access.
# @SIDE_EFFECT: none beyond response construction.
# @DATA_CONTRACT: Input[session_id:str,format:ArtifactFormat] -> Output[ExportArtifactResponse]
@router.get(
"/sessions/{session_id}/exports/validation",
response_model=ExportArtifactResponse,
dependencies=[
Depends(_require_auto_review_flag),
Depends(has_permission("dataset:session", "READ")),
],
)
async def export_validation(
session_id: str,
format: ArtifactFormat = Query(ArtifactFormat.JSON),
repository: DatasetReviewSessionRepository = Depends(_get_repository),
current_user: User = Depends(get_current_user),
):
with belief_scope("dataset_review.export_validation"):
@router.get('/sessions/{session_id}/exports/validation', response_model=ExportArtifactResponse, dependencies=[Depends(_require_auto_review_flag), Depends(has_permission('dataset:session', 'READ'))])
async def export_validation(session_id: str, format: ArtifactFormat=Query(ArtifactFormat.JSON), repository: DatasetReviewSessionRepository=Depends(_get_repository), current_user: User=Depends(get_current_user)):
with belief_scope('export_validation'):
if format not in {ArtifactFormat.JSON, ArtifactFormat.MARKDOWN}:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only json and markdown exports are supported",
)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Only json and markdown exports are supported')
logger.reason(
"Building dataset review validation export",
extra={
"session_id": session_id,
"user_id": current_user.id,
"format": format.value,
},
)
session = _get_owned_session_or_404(repository, session_id, current_user)
export_payload = _build_validation_export(session, format)
return ExportArtifactResponse(
artifact_id=f"validation-{session.session_id}-{format.value}",
session_id=session.session_id,
artifact_type="validation_report",
format=format.value,
storage_ref=export_payload["storage_ref"],
created_by_user_id=current_user.id,
content=export_payload["content"],
logger.reflect(
"Dataset review validation export assembled",
extra={
"session_id": session.session_id,
"user_id": current_user.id,
"format": format.value,
},
)
return ExportArtifactResponse(artifact_id=f'validation-{session.session_id}-{format.value}', session_id=session.session_id, artifact_type='validation_report', format=format.value, storage_ref=export_payload['storage_ref'], created_by_user_id=current_user.id, content=export_payload['content'])
# [/DEF:export_validation:Function]

View File

@@ -1,12 +1,12 @@
# [DEF:ProfileApiModule:Module]
#
# @COMPLEXITY: 5
# @COMPLEXITY: 3
# @SEMANTICS: api, profile, preferences, self-service, account-lookup
# @PURPOSE: Exposes self-scoped profile preference endpoints and environment-based Superset account lookup.
# @LAYER: API
# @RELATION: DEPENDS_ON -> backend.src.services.profile_service
# @RELATION: DEPENDS_ON -> backend.src.dependencies.get_current_user
# @RELATION: DEPENDS_ON -> backend.src.core.database.get_db
# @PURPOSE: Exposes self-scoped profile preference endpoints and environment-based Superset account lookup.
# @LAYER: API
# @RELATION: [DEPENDS_ON] ->[ProfileService]
# @RELATION: [DEPENDS_ON] ->[get_current_user]
# @RELATION: [DEPENDS_ON] ->[get_db]
#
# @INVARIANT: Endpoints are self-scoped and never mutate another user preference.
# @UX_STATE: ProfileLoad -> Returns stable ProfilePreferenceResponse for authenticated user.

View File

@@ -1,12 +1,12 @@
# [DEF:SettingsRouter:Module]
#
# @COMPLEXITY: 4
# @COMPLEXITY: 3
# @SEMANTICS: settings, api, router, fastapi
# @PURPOSE: Provides API endpoints for managing application settings and Superset environments.
# @LAYER: UI (API)
# @PURPOSE: Provides API endpoints for managing application settings and Superset environments.
# @LAYER: API
# @RELATION: [DEPENDS_ON] ->[ConfigManager]
# @RELATION: [DEPENDS_ON] ->[get_config_manager:Function]
# @RELATION: [DEPENDS_ON] ->[has_permission:Function]
# @RELATION: [DEPENDS_ON] ->[get_config_manager]
# @RELATION: [DEPENDS_ON] ->[has_permission]
#
# @INVARIANT: All settings changes must be persisted via ConfigManager.
# @PUBLIC_API: router
@@ -410,25 +410,24 @@ class ConsolidatedSettingsResponse(BaseModel):
# [DEF:get_consolidated_settings:Function]
# @COMPLEXITY: 4
# @PURPOSE: Retrieves all settings categories in a single call
# @PRE: Config manager is available.
# @POST: Returns all consolidated settings.
# @RETURN: ConsolidatedSettingsResponse - All settings categories.
# @PURPOSE: Retrieves all settings categories in a single call.
# @PRE: Config manager is available and the caller holds admin settings read permission.
# @POST: Returns consolidated settings, provider metadata, and persisted notification payload in one stable response.
# @SIDE_EFFECT: Opens one database session to read LLM providers and config-backed notification payload, then closes it.
# @DATA_CONTRACT: Input[ConfigManager] -> Output[ConsolidatedSettingsResponse]
# @RELATION: [DEPENDS_ON] ->[ConfigManager]
# @RELATION: [DEPENDS_ON] ->[LLMProviderService]
# @RELATION: [DEPENDS_ON] ->[AppConfigRecord]
# @RELATION: [DEPENDS_ON] ->[SessionLocal]
# @RELATION: [DEPENDS_ON] ->[has_permission:Function]
# @RELATION: [DEPENDS_ON] ->[normalize_llm_settings:Function]
# @RELATION: [DEPENDS_ON] ->[has_permission]
# @RELATION: [DEPENDS_ON] ->[normalize_llm_settings]
@router.get("/consolidated", response_model=ConsolidatedSettingsResponse)
async def get_consolidated_settings(
config_manager: ConfigManager = Depends(get_config_manager),
_=Depends(has_permission("admin:settings", "READ")),
):
with belief_scope("get_consolidated_settings"):
logger.info(
"[get_consolidated_settings][Entry] Fetching all consolidated settings"
)
logger.reason("Fetching consolidated settings payload")
config = config_manager.get_config()
@@ -465,7 +464,7 @@ async def get_consolidated_settings(
normalized_llm = normalize_llm_settings(config.settings.llm)
return ConsolidatedSettingsResponse(
response_payload = ConsolidatedSettingsResponse(
environments=[env.dict() for env in config.environments],
connections=config.settings.connections,
llm=normalized_llm,
@@ -474,6 +473,14 @@ async def get_consolidated_settings(
storage=config.settings.storage.dict(),
notifications=notifications_payload,
)
logger.reflect(
"Consolidated settings payload assembled",
extra={
"environment_count": len(response_payload.environments),
"provider_count": len(response_payload.llm_providers),
},
)
return response_payload
# [/DEF:get_consolidated_settings:Function]

View File

@@ -2,10 +2,12 @@
#
# @COMPLEXITY: 3
# @SEMANTICS: superset, api, client, rest, http, dashboard, dataset, import, export
# @PURPOSE: Предоставляет высокоуровневый клиент для взаимодействия с Superset REST API, инкапсулируя логику запросов, обработку ошибок и пагинацию.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> [ConfigModels]
# @RELATION: DEPENDS_ON -> [ConnectionContracts]
# @PURPOSE: Предоставляет высокоуровневый клиент для взаимодействия с Superset REST API, инкапсулируя логику запросов, обработку ошибок и пагинацию.
# @LAYER: Core
# @RELATION: [DEPENDS_ON] ->[ConfigModels]
# @RELATION: [DEPENDS_ON] ->[APIClient]
# @RELATION: [DEPENDS_ON] ->[SupersetAPIError]
# @RELATION: [DEPENDS_ON] ->[get_filename_from_headers]
#
# @INVARIANT: All network operations must use the internal APIClient instance.
# @PUBLIC_API: SupersetClient
@@ -31,8 +33,9 @@ app_logger = cast(Any, app_logger)
# [DEF:SupersetClient:Class]
# @COMPLEXITY: 3
# @PURPOSE: Класс-обёртка над Superset REST API, предоставляющий методы для работы с дашбордами и датасетами.
# @RELATION: DEPENDS_ON -> [ConfigModels]
# @RELATION: DEPENDS_ON -> [ConnectionContracts]
# @RELATION: [DEPENDS_ON] ->[ConfigModels]
# @RELATION: [DEPENDS_ON] ->[APIClient]
# @RELATION: [DEPENDS_ON] ->[SupersetAPIError]
class SupersetClient:
# [DEF:SupersetClientInit:Function]
# @COMPLEXITY: 3
@@ -40,13 +43,13 @@ class SupersetClient:
# @PRE: `env` должен быть валидным объектом Environment.
# @POST: Атрибуты `env` и `network` созданы и готовы к работе.
# @DATA_CONTRACT: Input[Environment] -> self.network[APIClient]
# @RELATION: DEPENDS_ON -> [Environment]
# @RELATION: DEPENDS_ON -> [ConnectionContracts]
# @RELATION: [DEPENDS_ON] ->[Environment]
# @RELATION: [DEPENDS_ON] ->[APIClient]
def __init__(self, env: Environment):
with belief_scope("__init__"):
app_logger.info(
"[SupersetClient.__init__][Enter] Initializing SupersetClient for env %s.",
env.name,
with belief_scope("SupersetClientInit"):
app_logger.reason(
"Initializing Superset client for environment",
extra={"environment": getattr(env, "id", None), "name": env.name},
)
self.env = env
# Construct auth payload expected by Superset API
@@ -62,7 +65,10 @@ class SupersetClient:
timeout=env.timeout,
)
self.delete_before_reimport: bool = False
app_logger.info("[SupersetClient.__init__][Exit] SupersetClient initialized.")
app_logger.reflect(
"Superset client initialized",
extra={"environment": getattr(self.env, "id", None)},
)
# [/DEF:SupersetClientInit:Function]
@@ -72,11 +78,22 @@ class SupersetClient:
# @PRE: self.network must be initialized with valid auth configuration.
# @POST: Client is authenticated and tokens are stored.
# @DATA_CONTRACT: None -> Output[Dict[str, str]]
# @RELATION: CALLS -> [ConnectionContracts]
# @RELATION: [CALLS] ->[APIClient]
def authenticate(self) -> Dict[str, str]:
with belief_scope("SupersetClient.authenticate"):
return self.network.authenticate()
with belief_scope("SupersetClientAuthenticate"):
app_logger.reason(
"Authenticating Superset client",
extra={"environment": getattr(self.env, "id", None)},
)
tokens = self.network.authenticate()
app_logger.reflect(
"Superset client authentication completed",
extra={
"environment": getattr(self.env, "id", None),
"token_keys": sorted(tokens.keys()),
},
)
return tokens
# [/DEF:SupersetClientAuthenticate:Function]
@property
@@ -1545,7 +1562,7 @@ class SupersetClient:
# @PRE: dataset_record should come from Superset dataset detail when possible.
# @POST: Returns one serialized-ready form_data structure preserving native filter clauses in legacy transport fields.
# @DATA_CONTRACT: Input[dataset_id:int,dataset_record:Dict,template_params:Dict,effective_filters:List[Dict]] -> Output[Dict[str, Any]]
# @RELATION: CALLS -> [SupersetClientBuildDatasetPreviewQueryContext]
# @RELATION: [CALLS] ->[SupersetClientBuildDatasetPreviewQueryContext]
# @SIDE_EFFECT: Emits reasoning diagnostics describing the inferred legacy payload shape.
def build_dataset_preview_legacy_form_data(
self,
@@ -1630,9 +1647,11 @@ class SupersetClient:
template_params: Dict[str, Any],
effective_filters: List[Dict[str, Any]],
) -> Dict[str, Any]:
with belief_scope(
"SupersetClient.build_dataset_preview_query_context", f"id={dataset_id}"
):
with belief_scope("SupersetClientBuildDatasetPreviewQueryContext"):
app_logger.reason(
"Building Superset dataset preview query context",
extra={"dataset_id": dataset_id, "filter_count": len(effective_filters or [])},
)
normalized_template_params = deepcopy(template_params or {})
normalized_filter_payload = (
self._normalize_effective_filters_for_query_context(
@@ -1749,7 +1768,7 @@ class SupersetClient:
# @PURPOSE: Convert execution mappings into Superset chart-data filter objects.
# @PRE: effective_filters may contain mapping metadata and arbitrary scalar/list values.
# @POST: Returns only valid filter dictionaries suitable for the chart-data query payload.
# @RELATION: DEPENDS_ON -> [CoreContracts]
# @RELATION: [DEPENDS_ON] ->[SupersetClient]
def _normalize_effective_filters_for_query_context(
self,
effective_filters: List[Dict[str, Any]],

View File

@@ -1,11 +1,12 @@
# [DEF:backend.src.core.superset_profile_lookup:Module]
# [DEF:SupersetProfileLookup:Module]
#
# @COMPLEXITY: 3
# @SEMANTICS: superset, users, lookup, profile, pagination, normalization
# @PURPOSE: Provides environment-scoped Superset account lookup adapter with stable normalized output.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> backend.src.core.utils.network.APIClient
# @RELATION: DEPENDS_ON -> backend.src.core.logger
# @PURPOSE: Provides environment-scoped Superset account lookup adapter with stable normalized output.
# @LAYER: Core
# @RELATION: [DEPENDS_ON] ->[APIClient]
# @RELATION: [DEPENDS_ON] ->[SupersetAPIError]
# @RELATION: [DEPENDS_ON] ->[SupersetAccountLookupAdapter]
#
# @INVARIANT: Adapter never leaks raw upstream payload shape to API consumers.
@@ -21,6 +22,8 @@ from .utils.network import APIClient, AuthenticationError, SupersetAPIError
# [DEF:SupersetAccountLookupAdapter:Class]
# @COMPLEXITY: 3
# @PURPOSE: Lookup Superset users and normalize candidates for profile binding.
# @RELATION: [DEPENDS_ON] ->[APIClient]
# @RELATION: [DEPENDS_ON] ->[SupersetProfileLookup]
class SupersetAccountLookupAdapter:
# [DEF:__init__:Function]
# @PURPOSE: Initializes lookup adapter with authenticated API client and environment context.
@@ -235,4 +238,4 @@ class SupersetAccountLookupAdapter:
# [/DEF:normalize_user_payload:Function]
# [/DEF:SupersetAccountLookupAdapter:Class]
# [/DEF:backend.src.core.superset_profile_lookup:Module]
# [/DEF:SupersetProfileLookup:Module]

View File

@@ -87,11 +87,11 @@ class ProfileAuthorizationError(Exception):
# [DEF:ProfileService:Class]
# @RELATION: [DEPENDS_ON] ->[sqlalchemy.orm.Session]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.auth.repository.AuthRepository]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.superset_client.SupersetClient]
# @RELATION: [DEPENDS_ON] ->[backend.src.core.superset_profile_lookup.SupersetAccountLookupAdapter]
# @RELATION: [DEPENDS_ON] ->[backend.src.models.profile.UserDashboardPreference]
# @RELATION: [CALLS] ->[backend.src.services.rbac_permission_catalog.discover_declared_permissions]
# @RELATION: [DEPENDS_ON] ->[AuthRepository]
# @RELATION: [DEPENDS_ON] ->[SupersetClient]
# @RELATION: [DEPENDS_ON] ->[SupersetAccountLookupAdapter]
# @RELATION: [DEPENDS_ON] ->[UserDashboardPreference]
# @RELATION: [CALLS] ->[discover_declared_permissions]
# @COMPLEXITY: 5
# @PURPOSE: Implements profile preference read/update flow and Superset account lookup degradation strategy.
# @PRE: Caller provides authenticated User context for external service methods.