fix(027): stabilize shared acceptance gates and compatibility collateral

This commit is contained in:
2026-03-17 11:07:49 +03:00
parent 023bacde39
commit 18bdde0a81
19 changed files with 749 additions and 552 deletions

View File

@@ -13,6 +13,8 @@ from datetime import datetime
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Dict, Any
from pydantic import ConfigDict, Field, model_validator
from pydantic.dataclasses import dataclass as pydantic_dataclass
from sqlalchemy import Column, String, DateTime, JSON, ForeignKey, Integer, Boolean
from sqlalchemy.orm import relationship
from .mapping import Base
@@ -22,12 +24,21 @@ from ..services.clean_release.enums import (
)
from ..services.clean_release.exceptions import IllegalTransitionError
# [DEF:ExecutionMode:Class]
# @PURPOSE: Backward-compatible execution mode enum for legacy TUI/orchestrator tests.
class ExecutionMode(str, Enum):
TUI = "TUI"
API = "API"
SCHEDULER = "SCHEDULER"
# [/DEF:ExecutionMode:Class]
# [DEF:CheckFinalStatus:Class]
# @PURPOSE: Backward-compatible final status enum for legacy TUI/orchestrator tests.
class CheckFinalStatus(str, Enum):
COMPLIANT = "COMPLIANT"
BLOCKED = "BLOCKED"
FAILED = "FAILED"
RUNNING = "RUNNING"
# [/DEF:CheckFinalStatus:Class]
# [DEF:CheckStageName:Class]
@@ -50,7 +61,7 @@ class CheckStageStatus(str, Enum):
# [DEF:CheckStageResult:Class]
# @PURPOSE: Backward-compatible stage result container for legacy TUI/orchestrator tests.
@dataclass
@pydantic_dataclass(config=ConfigDict(validate_assignment=True))
class CheckStageResult:
stage: CheckStageName
status: CheckStageStatus
@@ -80,6 +91,7 @@ class ReleaseCandidateStatus(str, Enum):
CHECK_RUNNING = CandidateStatus.CHECK_RUNNING.value
CHECK_PASSED = CandidateStatus.CHECK_PASSED.value
CHECK_BLOCKED = CandidateStatus.CHECK_BLOCKED.value
BLOCKED = CandidateStatus.CHECK_BLOCKED.value
CHECK_ERROR = CandidateStatus.CHECK_ERROR.value
APPROVED = CandidateStatus.APPROVED.value
PUBLISHED = CandidateStatus.PUBLISHED.value
@@ -88,7 +100,7 @@ class ReleaseCandidateStatus(str, Enum):
# [DEF:ResourceSourceEntry:Class]
# @PURPOSE: Backward-compatible source entry model for legacy TUI bootstrap logic.
@dataclass
@pydantic_dataclass(config=ConfigDict(validate_assignment=True))
class ResourceSourceEntry:
source_id: str
host: str
@@ -99,7 +111,7 @@ class ResourceSourceEntry:
# [DEF:ResourceSourceRegistry:Class]
# @PURPOSE: Backward-compatible source registry model for legacy TUI bootstrap logic.
@dataclass
@pydantic_dataclass(config=ConfigDict(validate_assignment=True))
class ResourceSourceRegistry:
registry_id: str
name: str
@@ -107,6 +119,21 @@ class ResourceSourceRegistry:
updated_at: datetime
updated_by: str
status: str = "ACTIVE"
immutable: bool = True
allowed_hosts: Optional[List[str]] = None
allowed_schemes: Optional[List[str]] = None
allowed_source_types: Optional[List[str]] = None
@model_validator(mode="after")
def populate_legacy_allowlists(self):
enabled_entries = [entry for entry in self.entries if getattr(entry, "enabled", True)]
if self.allowed_hosts is None:
self.allowed_hosts = [entry.host for entry in enabled_entries]
if self.allowed_schemes is None:
self.allowed_schemes = [entry.protocol for entry in enabled_entries]
if self.allowed_source_types is None:
self.allowed_source_types = [entry.purpose for entry in enabled_entries]
return self
@property
def id(self) -> str:
@@ -115,16 +142,35 @@ class ResourceSourceRegistry:
# [DEF:CleanProfilePolicy:Class]
# @PURPOSE: Backward-compatible policy model for legacy TUI bootstrap logic.
@dataclass
@pydantic_dataclass(config=ConfigDict(validate_assignment=True))
class CleanProfilePolicy:
policy_id: str
policy_version: str
profile: str
profile: ProfileType
active: bool
internal_source_registry_ref: str
prohibited_artifact_categories: List[str]
effective_from: datetime
required_system_categories: Optional[List[str]] = None
external_source_forbidden: bool = True
immutable: bool = True
content_json: Optional[Dict[str, Any]] = None
@model_validator(mode="after")
def validate_enterprise_policy(self):
if self.profile == ProfileType.ENTERPRISE_CLEAN:
if not self.prohibited_artifact_categories:
raise ValueError("enterprise-clean policy requires prohibited_artifact_categories")
if self.external_source_forbidden is not True:
raise ValueError("enterprise-clean policy requires external_source_forbidden=true")
if self.content_json is None:
self.content_json = {
"profile": self.profile.value,
"prohibited_artifact_categories": list(self.prohibited_artifact_categories or []),
"required_system_categories": list(self.required_system_categories or []),
"external_source_forbidden": self.external_source_forbidden,
}
return self
@property
def id(self) -> str:
@@ -137,15 +183,49 @@ class CleanProfilePolicy:
# [DEF:ComplianceCheckRun:Class]
# @PURPOSE: Backward-compatible run model for legacy TUI typing/import compatibility.
@dataclass
@pydantic_dataclass(config=ConfigDict(validate_assignment=True))
class ComplianceCheckRun:
check_run_id: str
candidate_id: str
policy_id: str
requested_by: str
execution_mode: str
checks: List[CheckStageResult]
started_at: datetime
triggered_by: str
execution_mode: ExecutionMode
final_status: CheckFinalStatus
checks: List[CheckStageResult]
finished_at: Optional[datetime] = None
@model_validator(mode="after")
def validate_final_status_alignment(self):
mandatory_stages = {
CheckStageName.DATA_PURITY,
CheckStageName.INTERNAL_SOURCES_ONLY,
CheckStageName.NO_EXTERNAL_ENDPOINTS,
CheckStageName.MANIFEST_CONSISTENCY,
}
if self.final_status == CheckFinalStatus.COMPLIANT:
observed_stages = {check.stage for check in self.checks}
if observed_stages != mandatory_stages:
raise ValueError("compliant run requires all mandatory stages")
if any(check.status != CheckStageStatus.PASS for check in self.checks):
raise ValueError("compliant run requires PASS on all mandatory stages")
return self
@property
def id(self) -> str:
return self.check_run_id
@property
def run_id(self) -> str:
return self.check_run_id
@property
def status(self) -> RunStatus:
if self.final_status == CheckFinalStatus.RUNNING:
return RunStatus.RUNNING
if self.final_status == CheckFinalStatus.BLOCKED:
return RunStatus.FAILED
return RunStatus.SUCCEEDED
# [/DEF:ComplianceCheckRun:Class]
# [DEF:ReleaseCandidate:Class]
@@ -164,6 +244,22 @@ class ReleaseCandidate(Base):
created_by = Column(String, nullable=False)
status = Column(String, default=CandidateStatus.DRAFT)
def __init__(self, **kwargs):
if "candidate_id" in kwargs:
kwargs["id"] = kwargs.pop("candidate_id")
if "profile" in kwargs:
kwargs.pop("profile")
status = kwargs.get("status")
if status is None:
kwargs["status"] = CandidateStatus.DRAFT.value
elif isinstance(status, ReleaseCandidateStatus):
kwargs["status"] = status.value
elif isinstance(status, CandidateStatus):
kwargs["status"] = status.value
if not str(kwargs.get("id", "")).strip():
raise ValueError("candidate_id must be non-empty")
super().__init__(**kwargs)
@property
def candidate_id(self) -> str:
return self.id
@@ -214,7 +310,7 @@ class CandidateArtifact(Base):
# [/DEF:CandidateArtifact:Class]
# [DEF:ManifestItem:Class]
@dataclass
@pydantic_dataclass(config=ConfigDict(validate_assignment=True))
class ManifestItem:
path: str
category: str
@@ -224,7 +320,7 @@ class ManifestItem:
# [/DEF:ManifestItem:Class]
# [DEF:ManifestSummary:Class]
@dataclass
@pydantic_dataclass(config=ConfigDict(validate_assignment=True))
class ManifestSummary:
included_count: int
excluded_count: int
@@ -250,6 +346,9 @@ class DistributionManifest(Base):
# Redesign compatibility fields (not persisted directly but used by builder/facade)
def __init__(self, **kwargs):
items = kwargs.pop("items", None)
summary = kwargs.pop("summary", None)
# Handle fields from manifest_builder.py
if "manifest_id" in kwargs:
kwargs["id"] = kwargs.pop("manifest_id")
@@ -259,6 +358,13 @@ class DistributionManifest(Base):
kwargs["created_by"] = kwargs.pop("generated_by")
if "deterministic_hash" in kwargs:
kwargs["manifest_digest"] = kwargs.pop("deterministic_hash")
if "policy_id" in kwargs:
kwargs.pop("policy_id")
if items is not None and summary is not None:
expected_count = int(summary.included_count) + int(summary.excluded_count)
if expected_count != len(items):
raise ValueError("manifest summary counts must match items size")
# Ensure required DB fields have defaults if missing
if "manifest_version" not in kwargs:
@@ -269,10 +375,9 @@ class DistributionManifest(Base):
kwargs["source_snapshot_ref"] = "pending"
# Pack items and summary into content_json if provided
if "items" in kwargs or "summary" in kwargs:
content = kwargs.get("content_json", {})
if "items" in kwargs:
items = kwargs.pop("items")
if items is not None or summary is not None:
content = dict(kwargs.get("content_json") or {})
if items is not None:
content["items"] = [
{
"path": i.path,
@@ -282,8 +387,7 @@ class DistributionManifest(Base):
"checksum": i.checksum
} for i in items
]
if "summary" in kwargs:
summary = kwargs.pop("summary")
if summary is not None:
content["summary"] = {
"included_count": summary.included_count,
"excluded_count": summary.excluded_count,
@@ -292,6 +396,23 @@ class DistributionManifest(Base):
kwargs["content_json"] = content
super().__init__(**kwargs)
@property
def manifest_id(self) -> str:
return self.id
@property
def deterministic_hash(self) -> str:
return self.manifest_digest
@property
def summary(self) -> ManifestSummary:
payload = (self.content_json or {}).get("summary", {})
return ManifestSummary(
included_count=int(payload.get("included_count", 0)),
excluded_count=int(payload.get("excluded_count", 0)),
prohibited_detected_count=int(payload.get("prohibited_detected_count", 0)),
)
# [/DEF:DistributionManifest:Class]
# [DEF:SourceRegistrySnapshot:Class]
@@ -363,6 +484,24 @@ class ComplianceStageRun(Base):
details_json = Column(JSON, default=dict)
# [/DEF:ComplianceStageRun:Class]
# [DEF:ViolationSeverity:Class]
# @PURPOSE: Backward-compatible violation severity enum for legacy clean-release tests.
class ViolationSeverity(str, Enum):
CRITICAL = "CRITICAL"
MAJOR = "MAJOR"
MINOR = "MINOR"
# [/DEF:ViolationSeverity:Class]
# [DEF:ViolationCategory:Class]
# @PURPOSE: Backward-compatible violation category enum for legacy clean-release tests.
class ViolationCategory(str, Enum):
DATA_PURITY = "DATA_PURITY"
EXTERNAL_SOURCE = "EXTERNAL_SOURCE"
SOURCE_ISOLATION = "SOURCE_ISOLATION"
MANIFEST_CONSISTENCY = "MANIFEST_CONSISTENCY"
EXTERNAL_ENDPOINT = "EXTERNAL_ENDPOINT"
# [/DEF:ViolationCategory:Class]
# [DEF:ComplianceViolation:Class]
# @PURPOSE: Violation produced by a stage.
class ComplianceViolation(Base):
@@ -377,6 +516,66 @@ class ComplianceViolation(Base):
artifact_sha256 = Column(String, nullable=True)
message = Column(String, nullable=False)
evidence_json = Column(JSON, default=dict)
def __init__(self, **kwargs):
if "violation_id" in kwargs:
kwargs["id"] = kwargs.pop("violation_id")
if "check_run_id" in kwargs:
kwargs["run_id"] = kwargs.pop("check_run_id")
if "category" in kwargs:
category = kwargs.pop("category")
kwargs["stage_name"] = category.value if isinstance(category, ViolationCategory) else str(category)
if "location" in kwargs:
kwargs["artifact_path"] = kwargs.pop("location")
if "remediation" in kwargs:
remediation = kwargs.pop("remediation")
evidence = dict(kwargs.get("evidence_json") or {})
evidence["remediation"] = remediation
kwargs["evidence_json"] = evidence
if "blocked_release" in kwargs:
blocked_release = kwargs.pop("blocked_release")
evidence = dict(kwargs.get("evidence_json") or {})
evidence["blocked_release"] = blocked_release
kwargs["evidence_json"] = evidence
if "detected_at" in kwargs:
kwargs.pop("detected_at")
if "code" not in kwargs:
kwargs["code"] = "LEGACY_VIOLATION"
if "message" not in kwargs:
kwargs["message"] = kwargs.get("stage_name", "LEGACY_VIOLATION")
super().__init__(**kwargs)
@property
def violation_id(self) -> str:
return self.id
@violation_id.setter
def violation_id(self, value: str) -> None:
self.id = value
@property
def check_run_id(self) -> str:
return self.run_id
@property
def category(self) -> ViolationCategory:
return ViolationCategory(self.stage_name)
@category.setter
def category(self, value: ViolationCategory) -> None:
self.stage_name = value.value if isinstance(value, ViolationCategory) else str(value)
@property
def location(self) -> Optional[str]:
return self.artifact_path
@property
def remediation(self) -> Optional[str]:
return (self.evidence_json or {}).get("remediation")
@property
def blocked_release(self) -> bool:
return bool((self.evidence_json or {}).get("blocked_release", False))
# [/DEF:ComplianceViolation:Class]
# [DEF:ComplianceReport:Class]
@@ -392,6 +591,65 @@ class ComplianceReport(Base):
summary_json = Column(JSON, nullable=False)
generated_at = Column(DateTime, default=datetime.utcnow)
immutable = Column(Boolean, default=True)
def __init__(self, **kwargs):
if "report_id" in kwargs:
kwargs["id"] = kwargs.pop("report_id")
if "check_run_id" in kwargs:
kwargs["run_id"] = kwargs.pop("check_run_id")
operator_summary = kwargs.pop("operator_summary", None)
structured_payload_ref = kwargs.pop("structured_payload_ref", None)
violations_count = kwargs.pop("violations_count", None)
blocking_violations_count = kwargs.pop("blocking_violations_count", None)
final_status = kwargs.get("final_status")
final_status_value = getattr(final_status, "value", final_status)
if (
final_status_value in {CheckFinalStatus.BLOCKED.value, ComplianceDecision.BLOCKED.value}
and blocking_violations_count is not None
and int(blocking_violations_count) <= 0
):
raise ValueError("blocked report requires blocking violations")
if (
operator_summary is not None
or structured_payload_ref is not None
or violations_count is not None
or blocking_violations_count is not None
):
kwargs["summary_json"] = {
"operator_summary": operator_summary or "",
"structured_payload_ref": structured_payload_ref,
"violations_count": int(violations_count or 0),
"blocking_violations_count": int(blocking_violations_count or 0),
}
super().__init__(**kwargs)
@property
def report_id(self) -> str:
return self.id
@property
def check_run_id(self) -> str:
return self.run_id
@property
def operator_summary(self) -> str:
return (self.summary_json or {}).get("operator_summary", "")
@property
def structured_payload_ref(self) -> Optional[str]:
return (self.summary_json or {}).get("structured_payload_ref")
@property
def violations_count(self) -> int:
return int((self.summary_json or {}).get("violations_count", 0))
@property
def blocking_violations_count(self) -> int:
return int((self.summary_json or {}).get("blocking_violations_count", 0))
# [/DEF:ComplianceReport:Class]
# [DEF:ApprovalDecision:Class]