Files
ss-tools/backend/src/services/clean_release/compliance_execution_service.py

197 lines
8.9 KiB
Python

# [DEF:backend.src.services.clean_release.compliance_execution_service:Module]
# @TIER: CRITICAL
# @SEMANTICS: clean-release, compliance, execution, stages, immutable-evidence
# @PURPOSE: Create and execute compliance runs with trusted snapshots, deterministic stages, violations and immutable report persistence.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.repository
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.policy_resolution_service
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.stages
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.report_builder
# @INVARIANT: A run binds to exactly one candidate/manifest/policy/registry snapshot set.
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Iterable, List, Optional
from uuid import uuid4
from ...core.logger import belief_scope, logger
from ...models.clean_release import ComplianceReport, ComplianceRun, ComplianceStageRun, ComplianceViolation, DistributionManifest
from .audit_service import audit_check_run, audit_report, audit_violation
from .enums import ComplianceDecision, RunStatus
from .exceptions import ComplianceRunError, PolicyResolutionError
from .policy_resolution_service import resolve_trusted_policy_snapshots
from .report_builder import ComplianceReportBuilder
from .repository import CleanReleaseRepository
from .stages import build_default_stages, derive_final_status
from .stages.base import ComplianceStage, ComplianceStageContext, build_stage_run_record
# [DEF:ComplianceExecutionResult:Class]
# @PURPOSE: Return envelope for compliance execution with run/report and persisted stage artifacts.
@dataclass
class ComplianceExecutionResult:
run: ComplianceRun
report: Optional[ComplianceReport]
stage_runs: List[ComplianceStageRun]
violations: List[ComplianceViolation]
# [/DEF:ComplianceExecutionResult:Class]
# [DEF:ComplianceExecutionService:Class]
# @PURPOSE: Execute clean-release compliance lifecycle over trusted snapshots and immutable evidence.
# @PRE: repository and config_manager are initialized.
# @POST: run state, stage records, violations and optional report are persisted consistently.
class ComplianceExecutionService:
TASK_PLUGIN_ID = "clean-release-compliance"
def __init__(
self,
*,
repository: CleanReleaseRepository,
config_manager,
stages: Optional[Iterable[ComplianceStage]] = None,
):
self.repository = repository
self.config_manager = config_manager
self.stages = list(stages) if stages is not None else build_default_stages()
self.report_builder = ComplianceReportBuilder(repository)
# [DEF:_resolve_manifest:Function]
# @PURPOSE: Resolve explicit manifest or fallback to latest candidate manifest.
# @PRE: candidate exists.
# @POST: Returns manifest snapshot or raises ComplianceRunError.
def _resolve_manifest(self, candidate_id: str, manifest_id: Optional[str]) -> DistributionManifest:
with belief_scope("ComplianceExecutionService._resolve_manifest"):
if manifest_id:
manifest = self.repository.get_manifest(manifest_id)
if manifest is None:
raise ComplianceRunError(f"manifest '{manifest_id}' not found")
if manifest.candidate_id != candidate_id:
raise ComplianceRunError("manifest does not belong to candidate")
return manifest
manifests = self.repository.get_manifests_by_candidate(candidate_id)
if not manifests:
raise ComplianceRunError(f"candidate '{candidate_id}' has no manifest")
return sorted(manifests, key=lambda item: item.manifest_version, reverse=True)[0]
# [/DEF:_resolve_manifest:Function]
# [DEF:_persist_stage_run:Function]
# @PURPOSE: Persist stage run if repository supports stage records.
# @POST: Stage run is persisted when adapter is available, otherwise no-op.
def _persist_stage_run(self, stage_run: ComplianceStageRun) -> None:
if hasattr(self.repository, "save_stage_run"):
self.repository.save_stage_run(stage_run)
# [/DEF:_persist_stage_run:Function]
# [DEF:_persist_violations:Function]
# @PURPOSE: Persist stage violations via repository adapters.
# @POST: Violations are appended to repository evidence store.
def _persist_violations(self, violations: List[ComplianceViolation]) -> None:
for violation in violations:
self.repository.save_violation(violation)
# [/DEF:_persist_violations:Function]
# [DEF:execute_run:Function]
# @PURPOSE: Execute compliance run stages and finalize immutable report on terminal success.
# @PRE: candidate exists and trusted policy/registry snapshots are resolvable.
# @POST: Run and evidence are persisted; report exists for SUCCEEDED runs.
def execute_run(
self,
*,
candidate_id: str,
requested_by: str,
manifest_id: Optional[str] = None,
) -> ComplianceExecutionResult:
with belief_scope("ComplianceExecutionService.execute_run"):
logger.reason(f"Starting compliance execution candidate_id={candidate_id}")
candidate = self.repository.get_candidate(candidate_id)
if candidate is None:
raise ComplianceRunError(f"candidate '{candidate_id}' not found")
manifest = self._resolve_manifest(candidate_id, manifest_id)
try:
policy_snapshot, registry_snapshot = resolve_trusted_policy_snapshots(
config_manager=self.config_manager,
repository=self.repository,
)
except PolicyResolutionError as exc:
raise ComplianceRunError(str(exc)) from exc
run = ComplianceRun(
id=f"run-{uuid4()}",
candidate_id=candidate_id,
manifest_id=manifest.id,
manifest_digest=manifest.manifest_digest,
policy_snapshot_id=policy_snapshot.id,
registry_snapshot_id=registry_snapshot.id,
requested_by=requested_by,
requested_at=datetime.now(timezone.utc),
started_at=datetime.now(timezone.utc),
status=RunStatus.RUNNING.value,
)
self.repository.save_check_run(run)
stage_runs: List[ComplianceStageRun] = []
violations: List[ComplianceViolation] = []
report: Optional[ComplianceReport] = None
context = ComplianceStageContext(
run=run,
candidate=candidate,
manifest=manifest,
policy=policy_snapshot,
registry=registry_snapshot,
)
try:
for stage in self.stages:
started = datetime.now(timezone.utc)
result = stage.execute(context)
finished = datetime.now(timezone.utc)
stage_run = build_stage_run_record(
run_id=run.id,
stage_name=stage.stage_name,
result=result,
started_at=started,
finished_at=finished,
)
self._persist_stage_run(stage_run)
stage_runs.append(stage_run)
if result.violations:
self._persist_violations(result.violations)
violations.extend(result.violations)
run.final_status = derive_final_status(stage_runs).value
run.status = RunStatus.SUCCEEDED.value
run.finished_at = datetime.now(timezone.utc)
self.repository.save_check_run(run)
report = self.report_builder.build_report_payload(run, violations)
report = self.report_builder.persist_report(report)
run.report_id = report.id
self.repository.save_check_run(run)
logger.reflect(f"[REFLECT] Compliance run completed run_id={run.id} final_status={run.final_status}")
except Exception as exc: # noqa: BLE001
run.status = RunStatus.FAILED.value
run.final_status = ComplianceDecision.ERROR.value
run.failure_reason = str(exc)
run.finished_at = datetime.now(timezone.utc)
self.repository.save_check_run(run)
logger.explore(f"[EXPLORE] Compliance run failed run_id={run.id}: {exc}")
return ComplianceExecutionResult(
run=run,
report=report,
stage_runs=stage_runs,
violations=violations,
)
# [/DEF:execute_run:Function]
# [/DEF:ComplianceExecutionService:Class]
# [/DEF:backend.src.services.clean_release.compliance_execution_service:Module]