109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
# [DEF:backend.src.services.clean_release.manifest_builder:Module]
|
|
# @TIER: STANDARD
|
|
# @SEMANTICS: clean-release, manifest, deterministic-hash, summary
|
|
# @PURPOSE: Build deterministic distribution manifest from classified artifact input.
|
|
# @LAYER: Domain
|
|
# @RELATION: DEPENDS_ON -> backend.src.models.clean_release
|
|
# @INVARIANT: Equal semantic artifact sets produce identical deterministic hash values.
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Iterable, List, Dict, Any
|
|
|
|
from ...models.clean_release import (
|
|
ClassificationType,
|
|
DistributionManifest,
|
|
ManifestItem,
|
|
ManifestSummary,
|
|
)
|
|
|
|
|
|
def _stable_hash_payload(candidate_id: str, policy_id: str, items: List[ManifestItem]) -> str:
|
|
serialized = [
|
|
{
|
|
"path": item.path,
|
|
"category": item.category,
|
|
"classification": item.classification.value,
|
|
"reason": item.reason,
|
|
"checksum": item.checksum,
|
|
}
|
|
for item in sorted(items, key=lambda i: (i.path, i.category, i.classification.value, i.reason, i.checksum or ""))
|
|
]
|
|
payload = {
|
|
"candidate_id": candidate_id,
|
|
"policy_id": policy_id,
|
|
"items": serialized,
|
|
}
|
|
digest = hashlib.sha256(json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8")).hexdigest()
|
|
return digest
|
|
|
|
|
|
# [DEF:build_distribution_manifest:Function]
|
|
# @PURPOSE: Build DistributionManifest with deterministic hash and validated counters.
|
|
# @PRE: artifacts list contains normalized classification values.
|
|
# @POST: Returns DistributionManifest with summary counts matching items cardinality.
|
|
def build_distribution_manifest(
|
|
manifest_id: str,
|
|
candidate_id: str,
|
|
policy_id: str,
|
|
generated_by: str,
|
|
artifacts: Iterable[Dict[str, Any]],
|
|
) -> DistributionManifest:
|
|
items = [
|
|
ManifestItem(
|
|
path=a["path"],
|
|
category=a["category"],
|
|
classification=ClassificationType(a["classification"]),
|
|
reason=a["reason"],
|
|
checksum=a.get("checksum"),
|
|
)
|
|
for a in artifacts
|
|
]
|
|
|
|
included_count = sum(1 for item in items if item.classification in {ClassificationType.REQUIRED_SYSTEM, ClassificationType.ALLOWED})
|
|
excluded_count = sum(1 for item in items if item.classification == ClassificationType.EXCLUDED_PROHIBITED)
|
|
prohibited_detected_count = excluded_count
|
|
|
|
summary = ManifestSummary(
|
|
included_count=included_count,
|
|
excluded_count=excluded_count,
|
|
prohibited_detected_count=prohibited_detected_count,
|
|
)
|
|
|
|
deterministic_hash = _stable_hash_payload(candidate_id, policy_id, items)
|
|
|
|
return DistributionManifest(
|
|
manifest_id=manifest_id,
|
|
candidate_id=candidate_id,
|
|
generated_at=datetime.now(timezone.utc),
|
|
generated_by=generated_by,
|
|
items=items,
|
|
summary=summary,
|
|
deterministic_hash=deterministic_hash,
|
|
)
|
|
# [/DEF:build_distribution_manifest:Function]
|
|
|
|
|
|
# [DEF:build_manifest:Function]
|
|
# @PURPOSE: Legacy compatibility wrapper for old manifest builder import paths.
|
|
# @PRE: Same as build_distribution_manifest.
|
|
# @POST: Returns DistributionManifest produced by canonical builder.
|
|
def build_manifest(
|
|
manifest_id: str,
|
|
candidate_id: str,
|
|
policy_id: str,
|
|
generated_by: str,
|
|
artifacts: Iterable[Dict[str, Any]],
|
|
) -> DistributionManifest:
|
|
return build_distribution_manifest(
|
|
manifest_id=manifest_id,
|
|
candidate_id=candidate_id,
|
|
policy_id=policy_id,
|
|
generated_by=generated_by,
|
|
artifacts=artifacts,
|
|
)
|
|
# [/DEF:build_manifest:Function]
|
|
# [/DEF:backend.src.services.clean_release.manifest_builder:Module] |