# [DEF:PolicyEngine:Module] # @COMPLEXITY: 5 # @SEMANTICS: clean-release, policy, classification, source-isolation # @PURPOSE: Evaluate artifact/source policies for enterprise clean profile with deterministic outcomes. # @LAYER: Domain # @RELATION: [DEPENDS_ON] ->[CleanReleaseModels] # @RELATION: [DEPENDS_ON] ->[LoggerModule] # @INVARIANT: Enterprise-clean policy always treats non-registry sources as violations. # @DATA_CONTRACT: Candidate -> PolicyDecision # @PRE: PolicyRepository is accessible # @POST: PolicyDecision returned with approval status # @SIDE_EFFECT: Read-only policy evaluation; no state changes from __future__ import annotations from dataclasses import dataclass from typing import Dict, Iterable, List, Tuple from ...core.logger import belief_scope, logger from ...models.clean_release import ( CleanPolicySnapshot, SourceRegistrySnapshot, CleanProfilePolicy, ResourceSourceRegistry, ) @dataclass class PolicyValidationResult: ok: bool blocking_reasons: List[str] @dataclass class SourceValidationResult: ok: bool violation: Dict | None # [DEF:CleanPolicyEngine:Class] # @PRE: Active policy exists and is internally consistent. # @POST: Deterministic classification and source validation are available. # @TEST_CONTRACT: CandidateEvaluationInput -> PolicyValidationResult|SourceValidationResult # @TEST_SCENARIO: policy_valid -> Enterprise clean policy with matching registry returns ok=True # @TEST_FIXTURE: policy_enterprise_clean -> file:backend/tests/fixtures/clean_release/fixtures_clean_release.json # @TEST_EDGE: missing_registry_ref -> policy has empty registry_snapshot_id # @TEST_EDGE: conflicting_registry -> policy registry ref does not match registry id # @TEST_EDGE: external_endpoint -> endpoint not present in enabled internal registry entries # @TEST_INVARIANT: deterministic_classification -> VERIFIED_BY: [policy_valid] class CleanPolicyEngine: def __init__( self, policy: CleanPolicySnapshot | CleanProfilePolicy, registry: SourceRegistrySnapshot | ResourceSourceRegistry, ): self.policy = policy self.registry = registry def validate_policy(self) -> PolicyValidationResult: with belief_scope("clean_policy_engine.validate_policy"): logger.reason( "Validating enterprise-clean policy and internal registry consistency" ) reasons: List[str] = [] registry_ref = ( getattr(self.policy, "registry_snapshot_id", None) or getattr(self.policy, "internal_source_registry_ref", "") or "" ) if not str(registry_ref).strip(): reasons.append("Policy missing internal_source_registry_ref") content = dict(getattr(self.policy, "content_json", None) or {}) if not content: content = { "profile": getattr( getattr(self.policy, "profile", None), "value", getattr(self.policy, "profile", "standard"), ), "prohibited_artifact_categories": list( getattr(self.policy, "prohibited_artifact_categories", []) or [] ), "required_system_categories": list( getattr(self.policy, "required_system_categories", []) or [] ), "external_source_forbidden": getattr( self.policy, "external_source_forbidden", False ), } profile = content.get("profile", "standard") if profile == "enterprise-clean": if not content.get("prohibited_artifact_categories"): reasons.append( "Enterprise policy requires prohibited artifact categories" ) if not content.get("external_source_forbidden"): reasons.append( "Enterprise policy requires external_source_forbidden=true" ) registry_id = getattr(self.registry, "id", None) or getattr( self.registry, "registry_id", None ) if registry_id != registry_ref: reasons.append("Policy registry ref does not match provided registry") allowed_hosts = getattr(self.registry, "allowed_hosts", None) if allowed_hosts is None: entries = getattr(self.registry, "entries", []) or [] allowed_hosts = [ entry.host for entry in entries if getattr(entry, "enabled", True) ] if not allowed_hosts: reasons.append("Registry must contain allowed hosts") logger.reflect( f"Policy validation completed. blocking_reasons={len(reasons)}" ) return PolicyValidationResult( ok=len(reasons) == 0, blocking_reasons=reasons ) def classify_artifact(self, artifact: Dict) -> str: category = (artifact.get("category") or "").strip() content = dict(getattr(self.policy, "content_json", None) or {}) if not content: content = { "required_system_categories": list( getattr(self.policy, "required_system_categories", []) or [] ), "prohibited_artifact_categories": list( getattr(self.policy, "prohibited_artifact_categories", []) or [] ), } required = content.get("required_system_categories", []) prohibited = content.get("prohibited_artifact_categories", []) if category in required: logger.reason( f"Artifact category '{category}' classified as required-system" ) return "required-system" if category in prohibited: logger.reason( f"Artifact category '{category}' classified as excluded-prohibited" ) return "excluded-prohibited" logger.reflect(f"Artifact category '{category}' classified as allowed") return "allowed" def validate_resource_source(self, endpoint: str) -> SourceValidationResult: with belief_scope("clean_policy_engine.validate_resource_source"): if not endpoint: logger.explore( "Empty endpoint detected; treating as blocking external-source violation" ) return SourceValidationResult( ok=False, violation={ "category": "external-source", "location": "", "remediation": "Replace with approved internal server", "blocked_release": True, }, ) allowed_hosts = getattr(self.registry, "allowed_hosts", None) if allowed_hosts is None: entries = getattr(self.registry, "entries", []) or [] allowed_hosts = [ entry.host for entry in entries if getattr(entry, "enabled", True) ] allowed_hosts = set(allowed_hosts or []) normalized = endpoint.strip().lower() if normalized in allowed_hosts: logger.reason( f"Endpoint '{normalized}' is present in internal allowlist" ) return SourceValidationResult(ok=True, violation=None) logger.explore(f"Endpoint '{endpoint}' is outside internal allowlist") return SourceValidationResult( ok=False, violation={ "category": "external-source", "location": endpoint, "remediation": "Replace with approved internal server", "blocked_release": True, }, ) def evaluate_candidate( self, artifacts: Iterable[Dict], sources: Iterable[str] ) -> Tuple[List[Dict], List[Dict]]: with belief_scope("clean_policy_engine.evaluate_candidate"): logger.reason( "Evaluating candidate artifacts and resource sources against enterprise policy" ) classified: List[Dict] = [] violations: List[Dict] = [] for artifact in artifacts: classification = self.classify_artifact(artifact) enriched = dict(artifact) enriched["classification"] = classification if classification == "excluded-prohibited": violations.append( { "category": "data-purity", "location": artifact.get("path", ""), "remediation": "Remove prohibited content", "blocked_release": True, } ) classified.append(enriched) for source in sources: source_result = self.validate_resource_source(source) if not source_result.ok and source_result.violation: violations.append(source_result.violation) logger.reflect( f"Candidate evaluation finished. artifacts={len(classified)} violations={len(violations)}" ) return classified, violations # [/DEF:CleanPolicyEngine:Class] # [/DEF:PolicyEngine:Module]