152 lines
7.0 KiB
Python
152 lines
7.0 KiB
Python
# [DEF:backend.src.services.clean_release.policy_engine:Module]
|
|
# @TIER: CRITICAL
|
|
# @SEMANTICS: clean-release, policy, classification, source-isolation
|
|
# @PURPOSE: Evaluate artifact/source policies for enterprise clean profile with deterministic outcomes.
|
|
# @LAYER: Domain
|
|
# @RELATION: DEPENDS_ON -> backend.src.models.clean_release.CleanProfilePolicy
|
|
# @RELATION: DEPENDS_ON -> backend.src.models.clean_release.ResourceSourceRegistry
|
|
# @INVARIANT: Enterprise-clean policy always treats non-registry sources as violations.
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Iterable, List, Tuple
|
|
|
|
from ...core.logger import belief_scope, logger
|
|
from ...models.clean_release import CleanPolicySnapshot, SourceRegistrySnapshot
|
|
|
|
|
|
@dataclass
|
|
class PolicyValidationResult:
|
|
ok: bool
|
|
blocking_reasons: List[str]
|
|
|
|
|
|
@dataclass
|
|
class SourceValidationResult:
|
|
ok: bool
|
|
violation: Dict | None
|
|
|
|
|
|
# [DEF:CleanPolicyEngine:Class]
|
|
# @PRE: Active policy exists and is internally consistent.
|
|
# @POST: Deterministic classification and source validation are available.
|
|
# @TEST_CONTRACT: CandidateEvaluationInput -> PolicyValidationResult|SourceValidationResult
|
|
# @TEST_SCENARIO: policy_valid -> Enterprise clean policy with matching registry returns ok=True
|
|
# @TEST_FIXTURE: policy_enterprise_clean -> file:backend/tests/fixtures/clean_release/fixtures_clean_release.json
|
|
# @TEST_EDGE: missing_registry_ref -> policy has empty registry_snapshot_id
|
|
# @TEST_EDGE: conflicting_registry -> policy registry ref does not match registry id
|
|
# @TEST_EDGE: external_endpoint -> endpoint not present in enabled internal registry entries
|
|
# @TEST_INVARIANT: deterministic_classification -> VERIFIED_BY: [policy_valid]
|
|
class CleanPolicyEngine:
|
|
def __init__(self, policy: CleanPolicySnapshot, registry: SourceRegistrySnapshot):
|
|
self.policy = policy
|
|
self.registry = registry
|
|
|
|
def validate_policy(self) -> PolicyValidationResult:
|
|
with belief_scope("clean_policy_engine.validate_policy"):
|
|
logger.reason("Validating enterprise-clean policy and internal registry consistency")
|
|
reasons: List[str] = []
|
|
|
|
# Snapshots are immutable and assumed active if resolved by facade
|
|
if not self.policy.registry_snapshot_id.strip():
|
|
reasons.append("Policy missing registry_snapshot_id")
|
|
|
|
content = self.policy.content_json or {}
|
|
profile = content.get("profile", "standard")
|
|
|
|
if profile == "enterprise-clean":
|
|
if not content.get("prohibited_artifact_categories"):
|
|
reasons.append("Enterprise policy requires prohibited artifact categories")
|
|
if not content.get("external_source_forbidden"):
|
|
reasons.append("Enterprise policy requires external_source_forbidden=true")
|
|
|
|
if self.registry.id != self.policy.registry_snapshot_id:
|
|
reasons.append("Policy registry ref does not match provided registry")
|
|
|
|
if not self.registry.allowed_hosts:
|
|
reasons.append("Registry must contain allowed hosts")
|
|
|
|
logger.reflect(f"Policy validation completed. blocking_reasons={len(reasons)}")
|
|
return PolicyValidationResult(ok=len(reasons) == 0, blocking_reasons=reasons)
|
|
|
|
def classify_artifact(self, artifact: Dict) -> str:
|
|
category = (artifact.get("category") or "").strip()
|
|
content = self.policy.content_json or {}
|
|
|
|
required = content.get("required_system_categories", [])
|
|
prohibited = content.get("prohibited_artifact_categories", [])
|
|
|
|
if category in required:
|
|
logger.reason(f"Artifact category '{category}' classified as required-system")
|
|
return "required-system"
|
|
if category in prohibited:
|
|
logger.reason(f"Artifact category '{category}' classified as excluded-prohibited")
|
|
return "excluded-prohibited"
|
|
logger.reflect(f"Artifact category '{category}' classified as allowed")
|
|
return "allowed"
|
|
|
|
def validate_resource_source(self, endpoint: str) -> SourceValidationResult:
|
|
with belief_scope("clean_policy_engine.validate_resource_source"):
|
|
if not endpoint:
|
|
logger.explore("Empty endpoint detected; treating as blocking external-source violation")
|
|
return SourceValidationResult(
|
|
ok=False,
|
|
violation={
|
|
"category": "external-source",
|
|
"location": "<empty-endpoint>",
|
|
"remediation": "Replace with approved internal server",
|
|
"blocked_release": True,
|
|
},
|
|
)
|
|
|
|
allowed_hosts = set(self.registry.allowed_hosts or [])
|
|
normalized = endpoint.strip().lower()
|
|
|
|
if normalized in allowed_hosts:
|
|
logger.reason(f"Endpoint '{normalized}' is present in internal allowlist")
|
|
return SourceValidationResult(ok=True, violation=None)
|
|
|
|
logger.explore(f"Endpoint '{endpoint}' is outside internal allowlist")
|
|
return SourceValidationResult(
|
|
ok=False,
|
|
violation={
|
|
"category": "external-source",
|
|
"location": endpoint,
|
|
"remediation": "Replace with approved internal server",
|
|
"blocked_release": True,
|
|
},
|
|
)
|
|
|
|
def evaluate_candidate(self, artifacts: Iterable[Dict], sources: Iterable[str]) -> Tuple[List[Dict], List[Dict]]:
|
|
with belief_scope("clean_policy_engine.evaluate_candidate"):
|
|
logger.reason("Evaluating candidate artifacts and resource sources against enterprise policy")
|
|
classified: List[Dict] = []
|
|
violations: List[Dict] = []
|
|
|
|
for artifact in artifacts:
|
|
classification = self.classify_artifact(artifact)
|
|
enriched = dict(artifact)
|
|
enriched["classification"] = classification
|
|
if classification == "excluded-prohibited":
|
|
violations.append(
|
|
{
|
|
"category": "data-purity",
|
|
"location": artifact.get("path", "<unknown-path>"),
|
|
"remediation": "Remove prohibited content",
|
|
"blocked_release": True,
|
|
}
|
|
)
|
|
classified.append(enriched)
|
|
|
|
for source in sources:
|
|
source_result = self.validate_resource_source(source)
|
|
if not source_result.ok and source_result.violation:
|
|
violations.append(source_result.violation)
|
|
|
|
logger.reflect(
|
|
f"Candidate evaluation finished. artifacts={len(classified)} violations={len(violations)}"
|
|
)
|
|
return classified, violations
|
|
# [/DEF:CleanPolicyEngine:Class]
|
|
# [/DEF:backend.src.services.clean_release.policy_engine:Module] |