feat(clean-release): complete compliance redesign phases and polish tasks T047-T052

This commit is contained in:
2026-03-10 09:11:26 +03:00
parent 6ee54d95a8
commit 87b81a365a
79 changed files with 7430 additions and 945 deletions

View File

@@ -0,0 +1,26 @@
{
"candidates": [
{
"id": "cand_v2_001",
"name": "Candidate V2 001",
"status": "DRAFT",
"created_at": "2026-03-09T12:00:00Z"
}
],
"manifests": [
{
"id": "man_v2_001",
"candidate_id": "cand_v2_001",
"version": 1,
"digest": "sha256:abc123def456",
"created_at": "2026-03-09T12:05:00Z"
}
],
"policies": [
{
"id": "pol_v2_001",
"name": "Standard Compliance Policy",
"rules": ["data_purity", "internal_sources_only"]
}
]
}

View File

@@ -0,0 +1,305 @@
# [DEF:test_clean_release_cli:Module]
# @TIER: STANDARD
# @PURPOSE: Smoke tests for the redesigned clean release CLI.
# @LAYER: Domain
"""Smoke tests for the redesigned clean release CLI commands."""
from types import SimpleNamespace
import json
from backend.src.dependencies import get_clean_release_repository, get_config_manager
from datetime import datetime, timezone
from uuid import uuid4
from backend.src.models.clean_release import CleanPolicySnapshot, ComplianceReport, ReleaseCandidate, SourceRegistrySnapshot
from backend.src.services.clean_release.enums import CandidateStatus, ComplianceDecision
from backend.src.scripts.clean_release_cli import main as cli_main
def test_cli_candidate_register_scaffold() -> None:
"""Candidate register CLI command smoke test."""
exit_code = cli_main(
[
"candidate-register",
"--candidate-id",
"cli-candidate-1",
"--version",
"1.0.0",
"--source-snapshot-ref",
"git:sha123",
"--created-by",
"cli-test",
]
)
assert exit_code == 0
def test_cli_manifest_build_scaffold() -> None:
"""Manifest build CLI command smoke test."""
register_exit = cli_main(
[
"candidate-register",
"--candidate-id",
"cli-candidate-2",
"--version",
"1.0.0",
"--source-snapshot-ref",
"git:sha234",
"--created-by",
"cli-test",
]
)
assert register_exit == 0
import_exit = cli_main(
[
"artifact-import",
"--candidate-id",
"cli-candidate-2",
"--artifact-id",
"artifact-2",
"--path",
"bin/app",
"--sha256",
"feedbeef",
"--size",
"24",
]
)
assert import_exit == 0
manifest_exit = cli_main(
[
"manifest-build",
"--candidate-id",
"cli-candidate-2",
"--created-by",
"cli-test",
]
)
assert manifest_exit == 0
def test_cli_compliance_run_scaffold() -> None:
"""Compliance CLI command smoke test for run/status/report/violations."""
repository = get_clean_release_repository()
config_manager = get_config_manager()
registry = SourceRegistrySnapshot(
id="cli-registry",
registry_id="trusted-registry",
registry_version="1.0.0",
allowed_hosts=["repo.internal.local"],
allowed_schemes=["https"],
allowed_source_types=["repo"],
immutable=True,
)
policy = CleanPolicySnapshot(
id="cli-policy",
policy_id="trusted-policy",
policy_version="1.0.0",
content_json={"rules": []},
registry_snapshot_id=registry.id,
immutable=True,
)
repository.save_registry(registry)
repository.save_policy(policy)
config = config_manager.get_config()
if getattr(config, "settings", None) is None:
config.settings = SimpleNamespace()
config.settings.clean_release = SimpleNamespace(
active_policy_id=policy.id,
active_registry_id=registry.id,
)
register_exit = cli_main(
[
"candidate-register",
"--candidate-id",
"cli-candidate-3",
"--version",
"1.0.0",
"--source-snapshot-ref",
"git:sha345",
"--created-by",
"cli-test",
]
)
assert register_exit == 0
import_exit = cli_main(
[
"artifact-import",
"--candidate-id",
"cli-candidate-3",
"--artifact-id",
"artifact-1",
"--path",
"bin/app",
"--sha256",
"deadbeef",
"--size",
"42",
]
)
assert import_exit == 0
manifest_exit = cli_main(
[
"manifest-build",
"--candidate-id",
"cli-candidate-3",
"--created-by",
"cli-test",
]
)
assert manifest_exit == 0
run_exit = cli_main(
[
"compliance-run",
"--candidate-id",
"cli-candidate-3",
"--actor",
"cli-test",
"--json",
]
)
assert run_exit == 0
run_id = next(run.id for run in repository.check_runs.values() if run.candidate_id == "cli-candidate-3")
status_exit = cli_main(["compliance-status", "--run-id", run_id, "--json"])
assert status_exit == 0
violations_exit = cli_main(["compliance-violations", "--run-id", run_id, "--json"])
assert violations_exit == 0
report_exit = cli_main(["compliance-report", "--run-id", run_id, "--json"])
assert report_exit == 0
def test_cli_release_gate_commands_scaffold() -> None:
"""Release gate CLI smoke test for approve/reject/publish/revoke commands."""
repository = get_clean_release_repository()
approved_candidate_id = f"cli-release-approved-{uuid4()}"
rejected_candidate_id = f"cli-release-rejected-{uuid4()}"
approved_report_id = f"CCR-cli-release-approved-{uuid4()}"
rejected_report_id = f"CCR-cli-release-rejected-{uuid4()}"
repository.save_candidate(
ReleaseCandidate(
id=approved_candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-approved",
created_by="cli-test",
created_at=datetime.now(timezone.utc),
status=CandidateStatus.CHECK_PASSED.value,
)
)
repository.save_candidate(
ReleaseCandidate(
id=rejected_candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-rejected",
created_by="cli-test",
created_at=datetime.now(timezone.utc),
status=CandidateStatus.CHECK_PASSED.value,
)
)
repository.save_report(
ComplianceReport(
id=approved_report_id,
run_id=f"run-{uuid4()}",
candidate_id=approved_candidate_id,
final_status=ComplianceDecision.PASSED.value,
summary_json={"operator_summary": "ok", "violations_count": 0, "blocking_violations_count": 0},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
)
repository.save_report(
ComplianceReport(
id=rejected_report_id,
run_id=f"run-{uuid4()}",
candidate_id=rejected_candidate_id,
final_status=ComplianceDecision.PASSED.value,
summary_json={"operator_summary": "ok", "violations_count": 0, "blocking_violations_count": 0},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
)
approve_exit = cli_main(
[
"approve",
"--candidate-id",
approved_candidate_id,
"--report-id",
approved_report_id,
"--actor",
"cli-test",
"--comment",
"approve candidate",
"--json",
]
)
assert approve_exit == 0
reject_exit = cli_main(
[
"reject",
"--candidate-id",
rejected_candidate_id,
"--report-id",
rejected_report_id,
"--actor",
"cli-test",
"--comment",
"reject candidate",
"--json",
]
)
assert reject_exit == 0
publish_exit = cli_main(
[
"publish",
"--candidate-id",
approved_candidate_id,
"--report-id",
approved_report_id,
"--actor",
"cli-test",
"--target-channel",
"stable",
"--publication-ref",
"rel-cli-001",
"--json",
]
)
assert publish_exit == 0
publication_records = getattr(repository, "publication_records", [])
assert publication_records
publication_id = publication_records[-1].id
revoke_exit = cli_main(
[
"revoke",
"--publication-id",
publication_id,
"--actor",
"cli-test",
"--comment",
"rollback",
"--json",
]
)
assert revoke_exit == 0
# [/DEF:test_clean_release_cli:Module]

View File

@@ -29,25 +29,18 @@ def mock_stdscr() -> MagicMock:
def test_headless_fallback(capsys):
"""
@TEST_EDGE: stdout_unavailable
Tests that if the stream is not a TTY or PYTEST_CURRENT_TEST is set,
the script falls back to a simple stdout print instead of trapping in curses.wrapper.
Tests that non-TTY startup is explicitly refused and wrapper is not invoked.
"""
# Environment should trigger headless fallback due to PYTEST_CURRENT_TEST being set
with mock.patch("backend.src.scripts.clean_release_tui.curses.wrapper") as curses_wrapper_mock:
with mock.patch("sys.stdout.isatty", return_value=False):
exit_code = main()
# Ensures wrapper wasn't used
curses_wrapper_mock.assert_not_called()
# Verify it still exits 0
assert exit_code == 0
# Verify headless info is printed
assert exit_code == 2
captured = capsys.readouterr()
assert "Enterprise Clean Release Validator (Headless Mode)" in captured.out
assert "FINAL STATUS: READY" in captured.out
assert "TTY is required for TUI mode" in captured.err
assert "Use CLI/API workflow instead" in captured.err
@patch("backend.src.scripts.clean_release_tui.curses")

View File

@@ -0,0 +1,97 @@
# [DEF:test_clean_release_tui_v2:Module]
# @TIER: STANDARD
# @PURPOSE: Smoke tests for thin-client TUI action dispatch and blocked transition behavior.
# @LAYER: Domain
# @RELATION: TESTS -> backend.src.scripts.clean_release_tui
"""Smoke tests for the redesigned clean release TUI."""
from __future__ import annotations
import curses
from unittest.mock import MagicMock, patch
from backend.src.models.clean_release import CheckFinalStatus
from backend.src.scripts.clean_release_tui import CleanReleaseTUI, main
def _build_mock_stdscr() -> MagicMock:
stdscr = MagicMock()
stdscr.getmaxyx.return_value = (40, 120)
stdscr.getch.return_value = curses.KEY_F10
return stdscr
@patch("backend.src.scripts.clean_release_tui.curses")
def test_tui_f5_dispatches_run_action(mock_curses_module: MagicMock) -> None:
"""F5 should dispatch run action from TUI loop."""
mock_curses_module.KEY_F10 = curses.KEY_F10
mock_curses_module.KEY_F5 = curses.KEY_F5
mock_curses_module.color_pair.side_effect = lambda value: value
mock_curses_module.A_BOLD = 0
stdscr = _build_mock_stdscr()
app = CleanReleaseTUI(stdscr)
stdscr.getch.side_effect = [curses.KEY_F5, curses.KEY_F10]
with patch.object(app, "run_checks", autospec=True) as run_checks_mock:
app.loop()
run_checks_mock.assert_called_once_with()
@patch("backend.src.scripts.clean_release_tui.curses")
def test_tui_f5_run_smoke_reports_blocked_state(mock_curses_module: MagicMock) -> None:
"""F5 smoke test should expose blocked outcome state after run action."""
mock_curses_module.KEY_F10 = curses.KEY_F10
mock_curses_module.KEY_F5 = curses.KEY_F5
mock_curses_module.color_pair.side_effect = lambda value: value
mock_curses_module.A_BOLD = 0
stdscr = _build_mock_stdscr()
app = CleanReleaseTUI(stdscr)
stdscr.getch.side_effect = [curses.KEY_F5, curses.KEY_F10]
def _set_blocked_state() -> None:
app.status = CheckFinalStatus.BLOCKED
app.report_id = "CCR-smoke-blocked"
app.violations_list = [object()]
with patch.object(app, "run_checks", side_effect=_set_blocked_state, autospec=True):
app.loop()
assert app.status == CheckFinalStatus.BLOCKED
assert app.report_id == "CCR-smoke-blocked"
assert app.violations_list
def test_tui_non_tty_refuses_startup(capsys) -> None:
"""Non-TTY startup must refuse TUI mode and redirect operator to CLI/API flow."""
with patch("sys.stdout.isatty", return_value=False):
exit_code = main()
captured = capsys.readouterr()
assert exit_code == 2
assert "TTY is required for TUI mode" in captured.err
assert "Use CLI/API workflow instead" in captured.err
@patch("backend.src.scripts.clean_release_tui.curses")
def test_tui_f8_blocked_without_facade_binding(mock_curses_module: MagicMock) -> None:
"""F8 should not perform hidden state mutation when facade action is not bound."""
mock_curses_module.KEY_F10 = curses.KEY_F10
mock_curses_module.KEY_F8 = curses.KEY_F8
mock_curses_module.color_pair.side_effect = lambda value: value
mock_curses_module.A_BOLD = 0
stdscr = _build_mock_stdscr()
app = CleanReleaseTUI(stdscr)
stdscr.getch.side_effect = [curses.KEY_F8, curses.KEY_F10]
app.loop()
assert app.last_error is not None
assert "F8 disabled" in app.last_error
# [/DEF:test_clean_release_tui_v2:Module]

View File

@@ -0,0 +1,199 @@
# [DEF:backend.tests.services.clean_release.test_approval_service:Module]
# @TIER: CRITICAL
# @SEMANTICS: tests, clean-release, approval, lifecycle, gate
# @PURPOSE: Define approval gate contracts for approve/reject operations over immutable compliance evidence.
# @LAYER: Tests
# @RELATION: TESTS -> src.services.clean_release.approval_service
# @RELATION: TESTS -> src.services.clean_release.enums
# @RELATION: TESTS -> src.services.clean_release.repository
# @INVARIANT: Approval is allowed only for PASSED report bound to candidate; duplicate approve and foreign report must be rejected.
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from src.models.clean_release import ComplianceReport, ReleaseCandidate
from src.services.clean_release.enums import ApprovalDecisionType, CandidateStatus, ComplianceDecision
from src.services.clean_release.exceptions import ApprovalGateError
from src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_seed_candidate_with_report:Function]
# @PURPOSE: Seed candidate and report fixtures for approval gate tests.
# @PRE: candidate_id and report_id are non-empty.
# @POST: Repository contains candidate and report linked by candidate_id.
def _seed_candidate_with_report(
*,
candidate_id: str = "cand-approve-1",
report_id: str = "CCR-approve-1",
report_status: ComplianceDecision = ComplianceDecision.PASSED,
) -> tuple[CleanReleaseRepository, str, str]:
repository = CleanReleaseRepository()
repository.save_candidate(
ReleaseCandidate(
id=candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-approve-1",
created_by="tester",
created_at=datetime.now(timezone.utc),
status=CandidateStatus.CHECK_PASSED.value,
)
)
repository.save_report(
ComplianceReport(
id=report_id,
run_id="run-approve-1",
candidate_id=candidate_id,
final_status=report_status.value,
summary_json={
"operator_summary": "seed",
"violations_count": 0,
"blocking_violations_count": 0 if report_status == ComplianceDecision.PASSED else 1,
},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
)
return repository, candidate_id, report_id
# [/DEF:_seed_candidate_with_report:Function]
# [DEF:test_approve_rejects_blocked_report:Function]
# @PURPOSE: Ensure approve is rejected when latest report final status is not PASSED.
# @PRE: Candidate has BLOCKED report.
# @POST: approve_candidate raises ApprovalGateError.
def test_approve_rejects_blocked_report():
from src.services.clean_release.approval_service import approve_candidate
repository, candidate_id, report_id = _seed_candidate_with_report(
report_status=ComplianceDecision.BLOCKED,
)
with pytest.raises(ApprovalGateError, match="PASSED"):
approve_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
decided_by="approver",
comment="blocked report cannot be approved",
)
# [/DEF:test_approve_rejects_blocked_report:Function]
# [DEF:test_approve_rejects_foreign_report:Function]
# @PURPOSE: Ensure approve is rejected when report belongs to another candidate.
# @PRE: Candidate exists, report candidate_id differs.
# @POST: approve_candidate raises ApprovalGateError.
def test_approve_rejects_foreign_report():
from src.services.clean_release.approval_service import approve_candidate
repository, candidate_id, _ = _seed_candidate_with_report()
foreign_report = ComplianceReport(
id="CCR-foreign-1",
run_id="run-foreign-1",
candidate_id="cand-foreign-1",
final_status=ComplianceDecision.PASSED.value,
summary_json={"operator_summary": "foreign", "violations_count": 0, "blocking_violations_count": 0},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
repository.save_report(foreign_report)
with pytest.raises(ApprovalGateError, match="belongs to another candidate"):
approve_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=foreign_report.id,
decided_by="approver",
comment="foreign report",
)
# [/DEF:test_approve_rejects_foreign_report:Function]
# [DEF:test_approve_rejects_duplicate_approve:Function]
# @PURPOSE: Ensure repeated approve decision for same candidate is blocked.
# @PRE: Candidate has already been approved once.
# @POST: Second approve_candidate call raises ApprovalGateError.
def test_approve_rejects_duplicate_approve():
from src.services.clean_release.approval_service import approve_candidate
repository, candidate_id, report_id = _seed_candidate_with_report()
first = approve_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
decided_by="approver",
comment="first approval",
)
assert first.decision == ApprovalDecisionType.APPROVED.value
assert repository.get_candidate(candidate_id).status == CandidateStatus.APPROVED.value
with pytest.raises(ApprovalGateError, match="already approved"):
approve_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
decided_by="approver",
comment="duplicate approval",
)
# [/DEF:test_approve_rejects_duplicate_approve:Function]
# [DEF:test_reject_persists_decision_without_promoting_candidate_state:Function]
# @PURPOSE: Ensure reject decision is immutable and does not promote candidate to APPROVED.
# @PRE: Candidate has PASSED report and CHECK_PASSED lifecycle state.
# @POST: reject_candidate persists REJECTED decision; candidate status remains unchanged.
def test_reject_persists_decision_without_promoting_candidate_state():
from src.services.clean_release.approval_service import reject_candidate
repository, candidate_id, report_id = _seed_candidate_with_report()
decision = reject_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
decided_by="approver",
comment="manual rejection",
)
candidate = repository.get_candidate(candidate_id)
assert decision.decision == ApprovalDecisionType.REJECTED.value
assert candidate is not None
assert candidate.status == CandidateStatus.CHECK_PASSED.value
# [/DEF:test_reject_persists_decision_without_promoting_candidate_state:Function]
# [DEF:test_reject_then_publish_is_blocked:Function]
# @PURPOSE: Ensure latest REJECTED decision blocks publication gate.
# @PRE: Candidate is rejected for passed report.
# @POST: publish_candidate raises PublicationGateError.
def test_reject_then_publish_is_blocked():
from src.services.clean_release.approval_service import reject_candidate
from src.services.clean_release.publication_service import publish_candidate
from src.services.clean_release.exceptions import PublicationGateError
repository, candidate_id, report_id = _seed_candidate_with_report()
reject_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
decided_by="approver",
comment="rejected before publish",
)
with pytest.raises(PublicationGateError, match="APPROVED"):
publish_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
published_by="publisher",
target_channel="stable",
publication_ref="rel-blocked",
)
# [/DEF:test_reject_then_publish_is_blocked:Function]
# [/DEF:backend.tests.services.clean_release.test_approval_service:Module]

View File

@@ -0,0 +1,203 @@
# [DEF:test_candidate_manifest_services:Module]
# @TIER: STANDARD
# @PURPOSE: Test lifecycle and manifest versioning for release candidates.
# @LAYER: Tests
import pytest
from datetime import datetime, timezone
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.core.database import Base
from src.models.clean_release import ReleaseCandidate, DistributionManifest, CandidateArtifact
from backend.src.services.clean_release.enums import CandidateStatus
from backend.src.services.clean_release.candidate_service import register_candidate
from backend.src.services.clean_release.manifest_service import build_manifest_snapshot
from backend.src.services.clean_release.repository import CleanReleaseRepository
@pytest.fixture
def db_session():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
def test_candidate_lifecycle_transitions(db_session):
"""
@PURPOSE: Verify legal state transitions for ReleaseCandidate.
"""
candidate = ReleaseCandidate(
id="test-candidate-1",
name="Test Candidate",
version="1.0.0",
source_snapshot_ref="ref-1",
created_by="operator",
status=CandidateStatus.DRAFT
)
db_session.add(candidate)
db_session.commit()
# Valid transition: DRAFT -> PREPARED
candidate.transition_to(CandidateStatus.PREPARED)
assert candidate.status == CandidateStatus.PREPARED
# Invalid transition: PREPARED -> DRAFT (should raise IllegalTransitionError)
from backend.src.services.clean_release.exceptions import IllegalTransitionError
with pytest.raises(IllegalTransitionError, match="Forbidden transition"):
candidate.transition_to(CandidateStatus.DRAFT)
def test_manifest_versioning_and_immutability(db_session):
"""
@PURPOSE: Verify manifest versioning and immutability invariants.
"""
candidate_id = "test-candidate-2"
# Create version 1
m1 = DistributionManifest(
id="manifest-v1",
candidate_id=candidate_id,
manifest_version=1,
manifest_digest="hash1",
artifacts_digest="hash1",
source_snapshot_ref="ref1",
content_json={},
created_at=datetime.now(timezone.utc),
created_by="operator"
)
db_session.add(m1)
# Create version 2
m2 = DistributionManifest(
id="manifest-v2",
candidate_id=candidate_id,
manifest_version=2,
manifest_digest="hash2",
artifacts_digest="hash2",
source_snapshot_ref="ref1",
content_json={},
created_at=datetime.now(timezone.utc),
created_by="operator"
)
db_session.add(m2)
db_session.commit()
latest = db_session.query(DistributionManifest).filter_by(candidate_id=candidate_id).order_by(DistributionManifest.manifest_version.desc()).first()
assert latest.manifest_version == 2
assert latest.id == "manifest-v2"
all_manifests = db_session.query(DistributionManifest).filter_by(candidate_id=candidate_id).all()
assert len(all_manifests) == 2
def _valid_artifacts():
return [
{
"id": "art-1",
"path": "bin/app",
"sha256": "abc123",
"size": 42,
}
]
def test_register_candidate_rejects_duplicate_candidate_id():
repository = CleanReleaseRepository()
register_candidate(
repository=repository,
candidate_id="dup-1",
version="1.0.0",
source_snapshot_ref="git:sha1",
created_by="operator",
artifacts=_valid_artifacts(),
)
with pytest.raises(ValueError, match="already exists"):
register_candidate(
repository=repository,
candidate_id="dup-1",
version="1.0.0",
source_snapshot_ref="git:sha1",
created_by="operator",
artifacts=_valid_artifacts(),
)
def test_register_candidate_rejects_malformed_artifact_input():
repository = CleanReleaseRepository()
bad_artifacts = [{"id": "art-1", "path": "bin/app", "size": 42}] # missing sha256
with pytest.raises(ValueError, match="missing required field 'sha256'"):
register_candidate(
repository=repository,
candidate_id="bad-art-1",
version="1.0.0",
source_snapshot_ref="git:sha2",
created_by="operator",
artifacts=bad_artifacts,
)
def test_register_candidate_rejects_empty_artifact_set():
repository = CleanReleaseRepository()
with pytest.raises(ValueError, match="artifacts must not be empty"):
register_candidate(
repository=repository,
candidate_id="empty-art-1",
version="1.0.0",
source_snapshot_ref="git:sha3",
created_by="operator",
artifacts=[],
)
def test_manifest_service_rebuild_creates_new_version():
repository = CleanReleaseRepository()
register_candidate(
repository=repository,
candidate_id="manifest-version-1",
version="1.0.0",
source_snapshot_ref="git:sha10",
created_by="operator",
artifacts=_valid_artifacts(),
)
first = build_manifest_snapshot(repository=repository, candidate_id="manifest-version-1", created_by="operator")
second = build_manifest_snapshot(repository=repository, candidate_id="manifest-version-1", created_by="operator")
assert first.manifest_version == 1
assert second.manifest_version == 2
assert first.id != second.id
def test_manifest_service_existing_manifest_cannot_be_mutated():
repository = CleanReleaseRepository()
register_candidate(
repository=repository,
candidate_id="manifest-immutable-1",
version="1.0.0",
source_snapshot_ref="git:sha11",
created_by="operator",
artifacts=_valid_artifacts(),
)
created = build_manifest_snapshot(repository=repository, candidate_id="manifest-immutable-1", created_by="operator")
original_digest = created.manifest_digest
rebuilt = build_manifest_snapshot(repository=repository, candidate_id="manifest-immutable-1", created_by="operator")
old_manifest = repository.get_manifest(created.id)
assert old_manifest is not None
assert old_manifest.manifest_digest == original_digest
assert old_manifest.id == created.id
assert rebuilt.id != created.id
def test_manifest_service_rejects_missing_candidate():
repository = CleanReleaseRepository()
with pytest.raises(ValueError, match="not found"):
build_manifest_snapshot(repository=repository, candidate_id="missing-candidate", created_by="operator")
# [/DEF:test_candidate_manifest_services:Module]

View File

@@ -0,0 +1,173 @@
# [DEF:backend.tests.services.clean_release.test_compliance_execution_service:Module]
# @TIER: CRITICAL
# @SEMANTICS: tests, clean-release, compliance, pipeline, run-finalization
# @PURPOSE: Validate stage pipeline and run finalization contracts for compliance execution.
# @LAYER: Tests
# @RELATION: TESTS -> backend.src.services.clean_release.compliance_orchestrator
# @RELATION: TESTS -> backend.src.services.clean_release.report_builder
# @INVARIANT: Missing manifest prevents run startup; failed execution cannot finalize as PASSED.
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from backend.src.models.clean_release import (
CleanPolicySnapshot,
ComplianceDecision,
DistributionManifest,
ReleaseCandidate,
SourceRegistrySnapshot,
)
from backend.src.services.clean_release.compliance_orchestrator import CleanComplianceOrchestrator
from backend.src.services.clean_release.enums import CandidateStatus, RunStatus
from backend.src.services.clean_release.report_builder import ComplianceReportBuilder
from backend.src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_seed_with_candidate_policy_registry:Function]
# @PURPOSE: Build deterministic repository state for run startup tests.
# @PRE: candidate_id and snapshot ids are non-empty.
# @POST: Returns repository with candidate, policy and registry; manifest is optional.
def _seed_with_candidate_policy_registry(
*,
with_manifest: bool,
prohibited_detected_count: int = 0,
) -> tuple[CleanReleaseRepository, str, str, str]:
repository = CleanReleaseRepository()
candidate_id = "cand-us2-1"
policy_id = "policy-us2-1"
registry_id = "registry-us2-1"
manifest_id = "manifest-us2-1"
repository.save_candidate(
ReleaseCandidate(
id=candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-us2",
created_by="tester",
created_at=datetime.now(timezone.utc),
status=CandidateStatus.MANIFEST_BUILT.value,
)
)
repository.save_registry(
SourceRegistrySnapshot(
id=registry_id,
registry_id="trusted-registry",
registry_version="1.0.0",
allowed_hosts=["repo.internal.local"],
allowed_schemes=["https"],
allowed_source_types=["repo"],
immutable=True,
)
)
repository.save_policy(
CleanPolicySnapshot(
id=policy_id,
policy_id="trusted-policy",
policy_version="1.0.0",
content_json={"rules": []},
registry_snapshot_id=registry_id,
immutable=True,
)
)
if with_manifest:
repository.save_manifest(
DistributionManifest(
id=manifest_id,
candidate_id=candidate_id,
manifest_version=1,
manifest_digest="digest-us2-1",
artifacts_digest="digest-us2-1",
source_snapshot_ref="git:sha-us2",
content_json={
"summary": {
"included_count": 1,
"excluded_count": 0 if prohibited_detected_count == 0 else prohibited_detected_count,
"prohibited_detected_count": prohibited_detected_count,
}
},
created_by="tester",
created_at=datetime.now(timezone.utc),
immutable=True,
)
)
return repository, candidate_id, policy_id, manifest_id
# [/DEF:_seed_with_candidate_policy_registry:Function]
# [DEF:test_run_without_manifest_rejected:Function]
# @PURPOSE: Ensure compliance run cannot start when manifest is unresolved.
# @PRE: Candidate/policy exist but manifest is missing.
# @POST: start_check_run raises ValueError and no run is persisted.
def test_run_without_manifest_rejected():
repository, candidate_id, policy_id, manifest_id = _seed_with_candidate_policy_registry(with_manifest=False)
orchestrator = CleanComplianceOrchestrator(repository)
with pytest.raises(ValueError, match="Manifest or Policy not found"):
orchestrator.start_check_run(
candidate_id=candidate_id,
policy_id=policy_id,
requested_by="tester",
manifest_id=manifest_id,
)
assert len(repository.check_runs) == 0
# [/DEF:test_run_without_manifest_rejected:Function]
# [DEF:test_task_crash_mid_run_marks_failed:Function]
# @PURPOSE: Ensure execution crash conditions force FAILED run status.
# @PRE: Run exists, then required dependency becomes unavailable before execute_stages.
# @POST: execute_stages persists run with FAILED status.
def test_task_crash_mid_run_marks_failed():
repository, candidate_id, policy_id, manifest_id = _seed_with_candidate_policy_registry(with_manifest=True)
orchestrator = CleanComplianceOrchestrator(repository)
run = orchestrator.start_check_run(
candidate_id=candidate_id,
policy_id=policy_id,
requested_by="tester",
manifest_id=manifest_id,
)
# Simulate mid-run crash dependency loss: registry snapshot disappears.
repository.registries.clear()
failed = orchestrator.execute_stages(run)
assert failed.status == RunStatus.FAILED
# [/DEF:test_task_crash_mid_run_marks_failed:Function]
# [DEF:test_blocked_run_finalization_blocks_report_builder:Function]
# @PURPOSE: Ensure blocked runs require blocking violations before report creation.
# @PRE: Manifest contains prohibited artifacts leading to BLOCKED decision.
# @POST: finalize keeps BLOCKED and report_builder rejects zero blocking violations.
def test_blocked_run_finalization_blocks_report_builder():
repository, candidate_id, policy_id, manifest_id = _seed_with_candidate_policy_registry(
with_manifest=True,
prohibited_detected_count=1,
)
orchestrator = CleanComplianceOrchestrator(repository)
builder = ComplianceReportBuilder(repository)
run = orchestrator.start_check_run(
candidate_id=candidate_id,
policy_id=policy_id,
requested_by="tester",
manifest_id=manifest_id,
)
run = orchestrator.execute_stages(run)
run = orchestrator.finalize_run(run)
assert run.final_status == ComplianceDecision.BLOCKED
assert run.status == RunStatus.SUCCEEDED
with pytest.raises(ValueError, match="Blocked run requires at least one blocking violation"):
builder.build_report_payload(run, [])
# [/DEF:test_blocked_run_finalization_blocks_report_builder:Function]
# [/DEF:backend.tests.services.clean_release.test_compliance_execution_service:Module]

View File

@@ -0,0 +1,250 @@
# [DEF:backend.tests.services.clean_release.test_compliance_task_integration:Module]
# @TIER: CRITICAL
# @SEMANTICS: tests, clean-release, compliance, task-manager, integration
# @PURPOSE: Verify clean release compliance runs execute through TaskManager lifecycle with observable success/failure outcomes.
# @LAYER: Tests
# @RELATION: TESTS -> backend.src.core.task_manager.manager.TaskManager
# @RELATION: TESTS -> backend.src.services.clean_release.compliance_orchestrator.CleanComplianceOrchestrator
# @INVARIANT: Compliance execution triggered as task produces terminal task status and persists run evidence.
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Any, Dict
from unittest.mock import MagicMock, patch
import pytest
from src.core.task_manager.manager import TaskManager
from src.core.task_manager.models import TaskStatus
from src.models.clean_release import (
CleanPolicySnapshot,
DistributionManifest,
ReleaseCandidate,
SourceRegistrySnapshot,
)
from src.services.clean_release.compliance_orchestrator import CleanComplianceOrchestrator
from src.services.clean_release.enums import CandidateStatus, RunStatus
from src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_seed_repository:Function]
# @PURPOSE: Prepare deterministic candidate/policy/registry/manifest fixtures for task integration tests.
# @PRE: with_manifest controls manifest availability.
# @POST: Returns initialized repository and identifiers for compliance run startup.
def _seed_repository(*, with_manifest: bool) -> tuple[CleanReleaseRepository, str, str, str]:
repository = CleanReleaseRepository()
candidate_id = "cand-task-int-1"
policy_id = "policy-task-int-1"
manifest_id = "manifest-task-int-1"
repository.save_candidate(
ReleaseCandidate(
id=candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-task-int",
created_by="tester",
created_at=datetime.now(timezone.utc),
status=CandidateStatus.MANIFEST_BUILT.value,
)
)
repository.save_registry(
SourceRegistrySnapshot(
id="registry-task-int-1",
registry_id="trusted-registry",
registry_version="1.0.0",
allowed_hosts=["repo.internal.local"],
allowed_schemes=["https"],
allowed_source_types=["repo"],
immutable=True,
)
)
repository.save_policy(
CleanPolicySnapshot(
id=policy_id,
policy_id="trusted-policy",
policy_version="1.0.0",
content_json={"rules": []},
registry_snapshot_id="registry-task-int-1",
immutable=True,
)
)
if with_manifest:
repository.save_manifest(
DistributionManifest(
id=manifest_id,
candidate_id=candidate_id,
manifest_version=1,
manifest_digest="digest-task-int",
artifacts_digest="digest-task-int",
source_snapshot_ref="git:sha-task-int",
content_json={
"summary": {
"included_count": 1,
"excluded_count": 0,
"prohibited_detected_count": 0,
}
},
created_by="tester",
created_at=datetime.now(timezone.utc),
immutable=True,
)
)
return repository, candidate_id, policy_id, manifest_id
# [/DEF:_seed_repository:Function]
# [DEF:CleanReleaseCompliancePlugin:Class]
# @PURPOSE: TaskManager plugin shim that executes clean release compliance orchestration.
class CleanReleaseCompliancePlugin:
@property
def id(self) -> str:
return "clean-release-compliance"
@property
def name(self) -> str:
return "clean_release_compliance"
def execute(self, params: Dict[str, Any], context=None):
orchestrator = CleanComplianceOrchestrator(params["repository"])
run = orchestrator.start_check_run(
candidate_id=params["candidate_id"],
policy_id=params["policy_id"],
requested_by=params.get("requested_by", "tester"),
manifest_id=params["manifest_id"],
)
run.task_id = params["_task_id"]
params["repository"].save_check_run(run)
run = orchestrator.execute_stages(run)
run = orchestrator.finalize_run(run)
if context is not None:
context.logger.info("Compliance run completed via TaskManager plugin")
return {"run_id": run.id, "run_status": run.status, "final_status": run.final_status}
# [/DEF:CleanReleaseCompliancePlugin:Class]
# [DEF:_PluginLoaderStub:Class]
# @PURPOSE: Provide minimal plugin loader contract used by TaskManager in integration tests.
class _PluginLoaderStub:
def __init__(self, plugin: CleanReleaseCompliancePlugin):
self._plugin = plugin
def has_plugin(self, plugin_id: str) -> bool:
return plugin_id == self._plugin.id
def get_plugin(self, plugin_id: str):
if plugin_id != self._plugin.id:
raise ValueError("Plugin not found")
return self._plugin
# [/DEF:_PluginLoaderStub:Class]
# [DEF:_make_task_manager:Function]
# @PURPOSE: Build TaskManager with mocked persistence services for isolated integration tests.
# @POST: Returns TaskManager ready for async task execution.
def _make_task_manager() -> TaskManager:
plugin_loader = _PluginLoaderStub(CleanReleaseCompliancePlugin())
with patch("backend.src.core.task_manager.manager.TaskPersistenceService") as mock_persistence, patch(
"backend.src.core.task_manager.manager.TaskLogPersistenceService"
) as mock_log_persistence:
mock_persistence.return_value.load_tasks.return_value = []
mock_persistence.return_value.persist_task = MagicMock()
mock_log_persistence.return_value.add_logs = MagicMock()
mock_log_persistence.return_value.get_logs = MagicMock(return_value=[])
mock_log_persistence.return_value.get_log_stats = MagicMock()
mock_log_persistence.return_value.get_sources = MagicMock(return_value=[])
return TaskManager(plugin_loader)
# [/DEF:_make_task_manager:Function]
# [DEF:_wait_for_terminal_task:Function]
# @PURPOSE: Poll task registry until target task reaches terminal status.
# @PRE: task_id exists in manager registry.
# @POST: Returns task with SUCCESS or FAILED status, otherwise raises TimeoutError.
async def _wait_for_terminal_task(manager: TaskManager, task_id: str, timeout_seconds: float = 3.0):
started = asyncio.get_running_loop().time()
while True:
task = manager.get_task(task_id)
if task and task.status in {TaskStatus.SUCCESS, TaskStatus.FAILED}:
return task
if asyncio.get_running_loop().time() - started > timeout_seconds:
raise TimeoutError(f"Task {task_id} did not reach terminal status")
await asyncio.sleep(0.05)
# [/DEF:_wait_for_terminal_task:Function]
# [DEF:test_compliance_run_executes_as_task_manager_task:Function]
# @PURPOSE: Verify successful compliance execution is observable as TaskManager SUCCESS task.
# @PRE: Candidate, policy and manifest are available in repository.
# @POST: Task ends with SUCCESS; run is persisted with SUCCEEDED status and task binding.
@pytest.mark.asyncio
async def test_compliance_run_executes_as_task_manager_task():
repository, candidate_id, policy_id, manifest_id = _seed_repository(with_manifest=True)
manager = _make_task_manager()
try:
task = await manager.create_task(
"clean-release-compliance",
{
"repository": repository,
"candidate_id": candidate_id,
"policy_id": policy_id,
"manifest_id": manifest_id,
"requested_by": "integration-tester",
},
)
finished = await _wait_for_terminal_task(manager, task.id)
assert finished.status == TaskStatus.SUCCESS
assert isinstance(finished.result, dict)
run_id = finished.result["run_id"]
run = repository.get_check_run(run_id)
assert run is not None
assert run.status == RunStatus.SUCCEEDED
assert run.task_id == task.id
finally:
manager._flusher_stop_event.set()
manager._flusher_thread.join(timeout=2)
# [/DEF:test_compliance_run_executes_as_task_manager_task:Function]
# [DEF:test_compliance_run_missing_manifest_marks_task_failed:Function]
# @PURPOSE: Verify missing manifest startup failure is surfaced as TaskManager FAILED task.
# @PRE: Candidate/policy exist but manifest is absent.
# @POST: Task ends with FAILED and run history remains empty.
@pytest.mark.asyncio
async def test_compliance_run_missing_manifest_marks_task_failed():
repository, candidate_id, policy_id, manifest_id = _seed_repository(with_manifest=False)
manager = _make_task_manager()
try:
task = await manager.create_task(
"clean-release-compliance",
{
"repository": repository,
"candidate_id": candidate_id,
"policy_id": policy_id,
"manifest_id": manifest_id,
"requested_by": "integration-tester",
},
)
finished = await _wait_for_terminal_task(manager, task.id)
assert finished.status == TaskStatus.FAILED
assert len(repository.check_runs) == 0
assert any("Manifest or Policy not found" in log.message for log in finished.logs)
finally:
manager._flusher_stop_event.set()
manager._flusher_thread.join(timeout=2)
# [/DEF:test_compliance_run_missing_manifest_marks_task_failed:Function]
# [/DEF:backend.tests.services.clean_release.test_compliance_task_integration:Module]

View File

@@ -0,0 +1,87 @@
# [DEF:backend.tests.services.clean_release.test_demo_mode_isolation:Module]
# @TIER: STANDARD
# @SEMANTICS: clean-release, demo-mode, isolation, namespace, repository
# @PURPOSE: Verify demo and real mode namespace isolation contracts before TUI integration.
# @LAYER: Tests
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.demo_data_service
from __future__ import annotations
from datetime import datetime, timezone
from backend.src.models.clean_release import ReleaseCandidate
from backend.src.services.clean_release.demo_data_service import (
build_namespaced_id,
create_isolated_repository,
resolve_namespace,
)
# [DEF:test_resolve_namespace_separates_demo_and_real:Function]
# @PURPOSE: Ensure namespace resolver returns deterministic and distinct namespaces.
# @PRE: Mode names are provided as user/runtime strings.
# @POST: Demo and real namespaces are different and stable.
def test_resolve_namespace_separates_demo_and_real() -> None:
demo = resolve_namespace("demo")
real = resolve_namespace("real")
assert demo == "clean-release:demo"
assert real == "clean-release:real"
assert demo != real
# [/DEF:test_resolve_namespace_separates_demo_and_real:Function]
# [DEF:test_build_namespaced_id_prevents_cross_mode_collisions:Function]
# @PURPOSE: Ensure ID generation prevents demo/real collisions for identical logical IDs.
# @PRE: Same logical candidate id is used in two different namespaces.
# @POST: Produced physical IDs differ by namespace prefix.
def test_build_namespaced_id_prevents_cross_mode_collisions() -> None:
logical_id = "2026.03.09-rc1"
demo_id = build_namespaced_id(resolve_namespace("demo"), logical_id)
real_id = build_namespaced_id(resolve_namespace("real"), logical_id)
assert demo_id != real_id
assert demo_id.startswith("clean-release:demo::")
assert real_id.startswith("clean-release:real::")
# [/DEF:test_build_namespaced_id_prevents_cross_mode_collisions:Function]
# [DEF:test_create_isolated_repository_keeps_mode_data_separate:Function]
# @PURPOSE: Verify demo and real repositories do not leak state across mode boundaries.
# @PRE: Two repositories are created for distinct modes.
# @POST: Candidate mutations in one mode are not visible in the other mode.
def test_create_isolated_repository_keeps_mode_data_separate() -> None:
demo_repo = create_isolated_repository("demo")
real_repo = create_isolated_repository("real")
demo_candidate_id = build_namespaced_id(resolve_namespace("demo"), "candidate-1")
real_candidate_id = build_namespaced_id(resolve_namespace("real"), "candidate-1")
demo_repo.save_candidate(
ReleaseCandidate(
id=demo_candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-demo",
created_by="demo-operator",
created_at=datetime.now(timezone.utc),
status="DRAFT",
)
)
real_repo.save_candidate(
ReleaseCandidate(
id=real_candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-real",
created_by="real-operator",
created_at=datetime.now(timezone.utc),
status="DRAFT",
)
)
assert demo_repo.get_candidate(demo_candidate_id) is not None
assert demo_repo.get_candidate(real_candidate_id) is None
assert real_repo.get_candidate(real_candidate_id) is not None
assert real_repo.get_candidate(demo_candidate_id) is None
# [/DEF:test_create_isolated_repository_keeps_mode_data_separate:Function]
# [/DEF:backend.tests.services.clean_release.test_demo_mode_isolation:Module]

View File

@@ -0,0 +1,105 @@
# [DEF:backend.tests.services.clean_release.test_policy_resolution_service:Module]
# @TIER: CRITICAL
# @SEMANTICS: clean-release, policy-resolution, trusted-snapshots, contracts
# @PURPOSE: Verify trusted policy snapshot resolution contract and error guards.
# @LAYER: Tests
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.policy_resolution_service
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.repository
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.exceptions
# @INVARIANT: Resolution uses only ConfigManager active IDs and rejects runtime override attempts.
from __future__ import annotations
from types import SimpleNamespace
import pytest
from backend.src.models.clean_release import CleanPolicySnapshot, SourceRegistrySnapshot
from backend.src.services.clean_release.exceptions import PolicyResolutionError
from backend.src.services.clean_release.policy_resolution_service import resolve_trusted_policy_snapshots
from backend.src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_config_manager:Function]
# @PURPOSE: Build deterministic ConfigManager-like stub for tests.
# @PRE: policy_id and registry_id may be None or non-empty strings.
# @POST: Returns object exposing get_config().settings.clean_release active IDs.
def _config_manager(policy_id, registry_id):
clean_release = SimpleNamespace(active_policy_id=policy_id, active_registry_id=registry_id)
settings = SimpleNamespace(clean_release=clean_release)
config = SimpleNamespace(settings=settings)
return SimpleNamespace(get_config=lambda: config)
# [/DEF:_config_manager:Function]
# [DEF:test_resolve_trusted_policy_snapshots_missing_profile:Function]
# @PURPOSE: Ensure resolution fails when trusted profile is not configured.
# @PRE: active_policy_id is None.
# @POST: Raises PolicyResolutionError with missing trusted profile reason.
def test_resolve_trusted_policy_snapshots_missing_profile():
repository = CleanReleaseRepository()
config_manager = _config_manager(policy_id=None, registry_id="registry-1")
with pytest.raises(PolicyResolutionError, match="missing trusted profile"):
resolve_trusted_policy_snapshots(
config_manager=config_manager,
repository=repository,
)
# [/DEF:test_resolve_trusted_policy_snapshots_missing_profile:Function]
# [DEF:test_resolve_trusted_policy_snapshots_missing_registry:Function]
# @PURPOSE: Ensure resolution fails when trusted registry is not configured.
# @PRE: active_registry_id is None and active_policy_id is set.
# @POST: Raises PolicyResolutionError with missing trusted registry reason.
def test_resolve_trusted_policy_snapshots_missing_registry():
repository = CleanReleaseRepository()
config_manager = _config_manager(policy_id="policy-1", registry_id=None)
with pytest.raises(PolicyResolutionError, match="missing trusted registry"):
resolve_trusted_policy_snapshots(
config_manager=config_manager,
repository=repository,
)
# [/DEF:test_resolve_trusted_policy_snapshots_missing_registry:Function]
# [DEF:test_resolve_trusted_policy_snapshots_rejects_override_attempt:Function]
# @PURPOSE: Ensure runtime override attempt is rejected even if snapshots exist.
# @PRE: valid trusted snapshots exist in repository and override is provided.
# @POST: Raises PolicyResolutionError with override forbidden reason.
def test_resolve_trusted_policy_snapshots_rejects_override_attempt():
repository = CleanReleaseRepository()
repository.save_policy(
CleanPolicySnapshot(
id="policy-1",
policy_id="baseline",
policy_version="1.0.0",
content_json={"rules": []},
registry_snapshot_id="registry-1",
immutable=True,
)
)
repository.save_registry(
SourceRegistrySnapshot(
id="registry-1",
registry_id="trusted",
registry_version="1.0.0",
allowed_hosts=["internal.local"],
allowed_schemes=["https"],
allowed_source_types=["repo"],
immutable=True,
)
)
config_manager = _config_manager(policy_id="policy-1", registry_id="registry-1")
with pytest.raises(PolicyResolutionError, match="override attempt is forbidden"):
resolve_trusted_policy_snapshots(
config_manager=config_manager,
repository=repository,
policy_id_override="policy-override",
)
# [/DEF:test_resolve_trusted_policy_snapshots_rejects_override_attempt:Function]
# [/DEF:backend.tests.services.clean_release.test_policy_resolution_service:Module]

View File

@@ -0,0 +1,148 @@
# [DEF:backend.tests.services.clean_release.test_publication_service:Module]
# @TIER: CRITICAL
# @SEMANTICS: tests, clean-release, publication, revoke, gate
# @PURPOSE: Define publication gate contracts over approved candidates and immutable publication records.
# @LAYER: Tests
# @RELATION: TESTS -> src.services.clean_release.publication_service
# @RELATION: TESTS -> src.services.clean_release.approval_service
# @RELATION: TESTS -> src.services.clean_release.repository
# @INVARIANT: Publish requires approval; revoke requires existing publication; republish after revoke is allowed as a new record.
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from src.models.clean_release import ComplianceReport, ReleaseCandidate
from src.services.clean_release.enums import CandidateStatus, ComplianceDecision, PublicationStatus
from src.services.clean_release.exceptions import PublicationGateError
from src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_seed_candidate_with_passed_report:Function]
# @PURPOSE: Seed candidate/report fixtures for publication gate scenarios.
# @PRE: candidate_id and report_id are non-empty.
# @POST: Repository contains candidate and PASSED report.
def _seed_candidate_with_passed_report(
*,
candidate_id: str = "cand-publish-1",
report_id: str = "CCR-publish-1",
candidate_status: CandidateStatus = CandidateStatus.CHECK_PASSED,
) -> tuple[CleanReleaseRepository, str, str]:
repository = CleanReleaseRepository()
repository.save_candidate(
ReleaseCandidate(
id=candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-publish-1",
created_by="tester",
created_at=datetime.now(timezone.utc),
status=candidate_status.value,
)
)
repository.save_report(
ComplianceReport(
id=report_id,
run_id="run-publish-1",
candidate_id=candidate_id,
final_status=ComplianceDecision.PASSED.value,
summary_json={"operator_summary": "seed", "violations_count": 0, "blocking_violations_count": 0},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
)
return repository, candidate_id, report_id
# [/DEF:_seed_candidate_with_passed_report:Function]
# [DEF:test_publish_without_approval_rejected:Function]
# @PURPOSE: Ensure publish action is blocked until candidate is approved.
# @PRE: Candidate has PASSED report but status is not APPROVED.
# @POST: publish_candidate raises PublicationGateError.
def test_publish_without_approval_rejected():
from src.services.clean_release.publication_service import publish_candidate
repository, candidate_id, report_id = _seed_candidate_with_passed_report(
candidate_status=CandidateStatus.CHECK_PASSED,
)
with pytest.raises(PublicationGateError, match="APPROVED"):
publish_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
published_by="publisher",
target_channel="stable",
publication_ref="rel-1",
)
# [/DEF:test_publish_without_approval_rejected:Function]
# [DEF:test_revoke_unknown_publication_rejected:Function]
# @PURPOSE: Ensure revocation is rejected for unknown publication id.
# @PRE: Repository has no matching publication record.
# @POST: revoke_publication raises PublicationGateError.
def test_revoke_unknown_publication_rejected():
from src.services.clean_release.publication_service import revoke_publication
repository, _, _ = _seed_candidate_with_passed_report()
with pytest.raises(PublicationGateError, match="not found"):
revoke_publication(
repository=repository,
publication_id="missing-publication",
revoked_by="publisher",
comment="unknown publication id",
)
# [/DEF:test_revoke_unknown_publication_rejected:Function]
# [DEF:test_republish_after_revoke_creates_new_active_record:Function]
# @PURPOSE: Ensure republish after revoke is allowed and creates a new ACTIVE record.
# @PRE: Candidate is APPROVED and first publication has been revoked.
# @POST: New publish call returns distinct publication id with ACTIVE status.
def test_republish_after_revoke_creates_new_active_record():
from src.services.clean_release.approval_service import approve_candidate
from src.services.clean_release.publication_service import publish_candidate, revoke_publication
repository, candidate_id, report_id = _seed_candidate_with_passed_report(
candidate_status=CandidateStatus.CHECK_PASSED,
)
approve_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
decided_by="approver",
comment="approval before publication",
)
first = publish_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
published_by="publisher",
target_channel="stable",
publication_ref="release-1",
)
revoked = revoke_publication(
repository=repository,
publication_id=first.id,
revoked_by="publisher",
comment="rollback",
)
second = publish_candidate(
repository=repository,
candidate_id=candidate_id,
report_id=report_id,
published_by="publisher",
target_channel="stable",
publication_ref="release-2",
)
assert first.id != second.id
assert revoked.status == PublicationStatus.REVOKED.value
assert second.status == PublicationStatus.ACTIVE.value
# [/DEF:test_republish_after_revoke_creates_new_active_record:Function]
# [/DEF:backend.tests.services.clean_release.test_publication_service:Module]

View File

@@ -0,0 +1,114 @@
# [DEF:backend.tests.services.clean_release.test_report_audit_immutability:Module]
# @TIER: CRITICAL
# @SEMANTICS: tests, clean-release, report, audit, immutability, append-only
# @PURPOSE: Validate report snapshot immutability expectations and append-only audit hook behavior for US2.
# @LAYER: Tests
# @RELATION: TESTS -> src.services.clean_release.report_builder.ComplianceReportBuilder
# @RELATION: TESTS -> src.services.clean_release.audit_service
# @RELATION: TESTS -> src.services.clean_release.repository.CleanReleaseRepository
# @INVARIANT: Built reports are immutable snapshots; audit hooks produce append-only event traces.
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import patch
import pytest
from src.models.clean_release import ComplianceReport, ComplianceRun, ComplianceViolation
from src.services.clean_release.audit_service import audit_check_run, audit_preparation, audit_report, audit_violation
from src.services.clean_release.enums import ComplianceDecision, RunStatus
from src.services.clean_release.report_builder import ComplianceReportBuilder
from src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_terminal_run:Function]
# @PURPOSE: Build deterministic terminal run fixture for report snapshot tests.
# @PRE: final_status is a valid ComplianceDecision value.
# @POST: Returns a terminal ComplianceRun suitable for report generation.
def _terminal_run(final_status: ComplianceDecision = ComplianceDecision.PASSED) -> ComplianceRun:
return ComplianceRun(
id="run-immut-1",
candidate_id="cand-immut-1",
manifest_id="manifest-immut-1",
manifest_digest="digest-immut-1",
policy_snapshot_id="policy-immut-1",
registry_snapshot_id="registry-immut-1",
requested_by="tester",
requested_at=datetime.now(timezone.utc),
started_at=datetime.now(timezone.utc),
finished_at=datetime.now(timezone.utc),
status=RunStatus.SUCCEEDED,
final_status=final_status,
)
# [/DEF:_terminal_run:Function]
# [DEF:test_report_builder_sets_immutable_snapshot_flag:Function]
# @PURPOSE: Ensure generated report payload is marked immutable and persisted as snapshot.
# @PRE: Terminal run exists.
# @POST: Built report has immutable=True and repository stores same immutable object.
def test_report_builder_sets_immutable_snapshot_flag():
repository = CleanReleaseRepository()
builder = ComplianceReportBuilder(repository)
run = _terminal_run()
report = builder.build_report_payload(run, [])
persisted = builder.persist_report(report)
assert report.immutable is True
assert persisted.immutable is True
assert repository.get_report(report.id) is persisted
# [/DEF:test_report_builder_sets_immutable_snapshot_flag:Function]
# [DEF:test_repository_rejects_report_overwrite_for_same_report_id:Function]
# @PURPOSE: Define immutability contract that report snapshots cannot be overwritten by same identifier.
# @PRE: Existing report with id is already persisted.
# @POST: Second save for same report id is rejected with explicit immutability error.
def test_repository_rejects_report_overwrite_for_same_report_id():
repository = CleanReleaseRepository()
original = ComplianceReport(
id="CCR-immut-fixed-id",
run_id="run-immut-1",
candidate_id="cand-immut-1",
final_status=ComplianceDecision.PASSED,
summary_json={"operator_summary": "original", "violations_count": 0, "blocking_violations_count": 0},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
mutated = ComplianceReport(
id="CCR-immut-fixed-id",
run_id="run-immut-2",
candidate_id="cand-immut-2",
final_status=ComplianceDecision.ERROR,
summary_json={"operator_summary": "mutated", "violations_count": 1, "blocking_violations_count": 1},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
repository.save_report(original)
with pytest.raises(ValueError, match="immutable"):
repository.save_report(mutated)
# [/DEF:test_repository_rejects_report_overwrite_for_same_report_id:Function]
# [DEF:test_audit_hooks_emit_append_only_event_stream:Function]
# @PURPOSE: Verify audit hooks emit one event per action call and preserve call order.
# @PRE: Logger backend is patched.
# @POST: Three calls produce three ordered info entries with molecular prefixes.
@patch("src.services.clean_release.audit_service.logger")
def test_audit_hooks_emit_append_only_event_stream(mock_logger):
audit_preparation("cand-immut-1", "PREPARED")
audit_check_run("run-immut-1", "PASSED")
audit_report("CCR-immut-1", "cand-immut-1")
assert mock_logger.info.call_count == 3
logged_messages = [call.args[0] for call in mock_logger.info.call_args_list]
assert logged_messages[0].startswith("[REASON]")
assert logged_messages[1].startswith("[REFLECT]")
assert logged_messages[2].startswith("[EXPLORE]")
# [/DEF:test_audit_hooks_emit_append_only_event_stream:Function]
# [/DEF:backend.tests.services.clean_release.test_report_audit_immutability:Module]