Files
ss-tools/backend/src/services/clean_release/compliance_execution_service.py
2026-03-27 21:27:31 +03:00

228 lines
9.3 KiB
Python

# [DEF:ComplianceExecutionService:Module]
# @COMPLEXITY: 5
# @SEMANTICS: clean-release, compliance, execution, stages, immutable-evidence
# @PURPOSE: Create and execute compliance runs with trusted snapshots, deterministic stages, violations and immutable report persistence.
# @LAYER: Domain
# @RELATION: [DEPENDS_ON] ->[RepositoryRelations]
# @RELATION: [DEPENDS_ON] ->[PolicyResolutionService]
# @RELATION: [DEPENDS_ON] ->[ComplianceStages]
# @RELATION: [DEPENDS_ON] ->[ReportBuilder]
# @INVARIANT: A run binds to exactly one candidate/manifest/policy/registry snapshot set.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Iterable, List, Optional, cast
from uuid import uuid4
from ...core.logger import belief_scope, logger
from ...models.clean_release import (
ComplianceReport,
ComplianceRun,
ComplianceStageRun,
ComplianceViolation,
DistributionManifest,
)
from .audit_service import audit_check_run, audit_report, audit_violation
from .enums import ComplianceDecision, RunStatus
from .exceptions import ComplianceRunError, PolicyResolutionError
from .policy_resolution_service import resolve_trusted_policy_snapshots
from .report_builder import ComplianceReportBuilder
from .repository import CleanReleaseRepository
from .stages import build_default_stages, derive_final_status
from .stages.base import ComplianceStage, ComplianceStageContext, build_stage_run_record
belief_logger = cast(Any, logger)
# [DEF:ComplianceExecutionResult:Class]
# @PURPOSE: Return envelope for compliance execution with run/report and persisted stage artifacts.
@dataclass
class ComplianceExecutionResult:
run: ComplianceRun
report: Optional[ComplianceReport]
stage_runs: List[ComplianceStageRun]
violations: List[ComplianceViolation]
# [/DEF:ComplianceExecutionResult:Class]
# [DEF:ComplianceExecutionService:Class]
# @PURPOSE: Execute clean-release compliance lifecycle over trusted snapshots and immutable evidence.
# @PRE: repository and config_manager are initialized.
# @POST: run state, stage records, violations and optional report are persisted consistently.
class ComplianceExecutionService:
TASK_PLUGIN_ID = "clean-release-compliance"
def __init__(
self,
*,
repository: CleanReleaseRepository,
config_manager,
stages: Optional[Iterable[ComplianceStage]] = None,
):
self.repository = repository
self.config_manager = config_manager
self.stages = list(stages) if stages is not None else build_default_stages()
self.report_builder = ComplianceReportBuilder(repository)
# [DEF:_resolve_manifest:Function]
# @PURPOSE: Resolve explicit manifest or fallback to latest candidate manifest.
# @PRE: candidate exists.
# @POST: Returns manifest snapshot or raises ComplianceRunError.
def _resolve_manifest(
self, candidate_id: str, manifest_id: Optional[str]
) -> DistributionManifest:
with belief_scope("ComplianceExecutionService._resolve_manifest"):
manifest_id_value = cast(Optional[str], manifest_id)
if manifest_id_value is not None and manifest_id_value != "":
manifest = self.repository.get_manifest(manifest_id_value)
if manifest is None:
raise ComplianceRunError(
f"manifest '{manifest_id_value}' not found"
)
if str(getattr(manifest, "candidate_id")) != candidate_id:
raise ComplianceRunError("manifest does not belong to candidate")
return manifest
manifests = self.repository.get_manifests_by_candidate(candidate_id)
if not manifests:
raise ComplianceRunError(f"candidate '{candidate_id}' has no manifest")
return sorted(
manifests, key=lambda item: item.manifest_version, reverse=True
)[0]
# [/DEF:_resolve_manifest:Function]
# [DEF:_persist_stage_run:Function]
# @PURPOSE: Persist stage run if repository supports stage records.
# @POST: Stage run is persisted when adapter is available, otherwise no-op.
def _persist_stage_run(self, stage_run: ComplianceStageRun) -> None:
self.repository.save_stage_run(stage_run)
# [/DEF:_persist_stage_run:Function]
# [DEF:_persist_violations:Function]
# @PURPOSE: Persist stage violations via repository adapters.
# @POST: Violations are appended to repository evidence store.
def _persist_violations(self, violations: List[ComplianceViolation]) -> None:
for violation in violations:
self.repository.save_violation(violation)
# [/DEF:_persist_violations:Function]
# [DEF:execute_run:Function]
# @PURPOSE: Execute compliance run stages and finalize immutable report on terminal success.
# @PRE: candidate exists and trusted policy/registry snapshots are resolvable.
# @POST: Run and evidence are persisted; report exists for SUCCEEDED runs.
def execute_run(
self,
*,
candidate_id: str,
requested_by: str,
manifest_id: Optional[str] = None,
) -> ComplianceExecutionResult:
with belief_scope("ComplianceExecutionService.execute_run"):
belief_logger.reason(
f"Starting compliance execution candidate_id={candidate_id}"
)
candidate = self.repository.get_candidate(candidate_id)
if candidate is None:
raise ComplianceRunError(f"candidate '{candidate_id}' not found")
manifest = self._resolve_manifest(candidate_id, manifest_id)
try:
policy_snapshot, registry_snapshot = resolve_trusted_policy_snapshots(
config_manager=self.config_manager,
repository=self.repository,
)
except PolicyResolutionError as exc:
raise ComplianceRunError(str(exc)) from exc
run = ComplianceRun(
id=f"run-{uuid4()}",
candidate_id=candidate_id,
manifest_id=str(getattr(manifest, "id")),
manifest_digest=str(getattr(manifest, "manifest_digest")),
policy_snapshot_id=str(getattr(policy_snapshot, "id")),
registry_snapshot_id=str(getattr(registry_snapshot, "id")),
requested_by=requested_by,
requested_at=datetime.now(timezone.utc),
started_at=datetime.now(timezone.utc),
status=RunStatus.RUNNING.value,
)
self.repository.save_check_run(run)
stage_runs: List[ComplianceStageRun] = []
violations: List[ComplianceViolation] = []
report: Optional[ComplianceReport] = None
context = ComplianceStageContext(
run=run,
candidate=candidate,
manifest=manifest,
policy=policy_snapshot,
registry=registry_snapshot,
)
try:
for stage in self.stages:
started = datetime.now(timezone.utc)
result = stage.execute(context)
finished = datetime.now(timezone.utc)
stage_run = build_stage_run_record(
run_id=str(getattr(run, "id")),
stage_name=stage.stage_name,
result=result,
started_at=started,
finished_at=finished,
)
self._persist_stage_run(stage_run)
stage_runs.append(stage_run)
if result.violations:
self._persist_violations(result.violations)
violations.extend(result.violations)
setattr(run, "final_status", derive_final_status(stage_runs).value)
setattr(run, "status", RunStatus.SUCCEEDED.value)
setattr(run, "finished_at", datetime.now(timezone.utc))
self.repository.save_check_run(run)
report = self.report_builder.build_report_payload(run, violations)
report = self.report_builder.persist_report(report)
setattr(run, "report_id", str(getattr(report, "id")))
self.repository.save_check_run(run)
belief_logger.reflect(
f"[REFLECT] Compliance run completed run_id={getattr(run, 'id')} final_status={getattr(run, 'final_status', None)}"
)
except Exception as exc: # noqa: BLE001
setattr(run, "status", RunStatus.FAILED.value)
setattr(run, "final_status", ComplianceDecision.ERROR.value)
setattr(run, "failure_reason", str(exc))
setattr(run, "finished_at", datetime.now(timezone.utc))
self.repository.save_check_run(run)
belief_logger.explore(
f"[EXPLORE] Compliance run failed run_id={getattr(run, 'id')}: {exc}"
)
return ComplianceExecutionResult(
run=run,
report=report,
stage_runs=stage_runs,
violations=violations,
)
# [/DEF:execute_run:Function]
# [/DEF:ComplianceExecutionService:Class]
# [/DEF:ComplianceExecutionService:Module]