# [DEF:backend.tests.services.clean_release.test_compliance_task_integration:Module] # @COMPLEXITY: 5 # @SEMANTICS: tests, clean-release, compliance, task-manager, integration # @PURPOSE: Verify clean release compliance runs execute through TaskManager lifecycle with observable success/failure outcomes. # @LAYER: Tests # @RELATION: TESTS -> backend.src.core.task_manager.manager.TaskManager # @RELATION: TESTS -> backend.src.services.clean_release.compliance_orchestrator.CleanComplianceOrchestrator # @INVARIANT: Compliance execution triggered as task produces terminal task status and persists run evidence. from __future__ import annotations import asyncio from datetime import datetime, timezone from typing import Any, Dict from unittest.mock import MagicMock, patch import pytest from src.core.task_manager.manager import TaskManager from src.core.task_manager.models import TaskStatus from src.models.clean_release import ( CleanPolicySnapshot, DistributionManifest, ReleaseCandidate, SourceRegistrySnapshot, ) from src.services.clean_release.compliance_orchestrator import CleanComplianceOrchestrator from src.services.clean_release.enums import CandidateStatus, RunStatus from src.services.clean_release.repository import CleanReleaseRepository # [DEF:_seed_repository:Function] # @PURPOSE: Prepare deterministic candidate/policy/registry/manifest fixtures for task integration tests. # @PRE: with_manifest controls manifest availability. # @POST: Returns initialized repository and identifiers for compliance run startup. def _seed_repository(*, with_manifest: bool) -> tuple[CleanReleaseRepository, str, str, str]: repository = CleanReleaseRepository() candidate_id = "cand-task-int-1" policy_id = "policy-task-int-1" manifest_id = "manifest-task-int-1" repository.save_candidate( ReleaseCandidate( id=candidate_id, version="1.0.0", source_snapshot_ref="git:sha-task-int", created_by="tester", created_at=datetime.now(timezone.utc), status=CandidateStatus.MANIFEST_BUILT.value, ) ) repository.save_registry( SourceRegistrySnapshot( id="registry-task-int-1", registry_id="trusted-registry", registry_version="1.0.0", allowed_hosts=["repo.internal.local"], allowed_schemes=["https"], allowed_source_types=["repo"], immutable=True, ) ) repository.save_policy( CleanPolicySnapshot( id=policy_id, policy_id="trusted-policy", policy_version="1.0.0", content_json={"rules": []}, registry_snapshot_id="registry-task-int-1", immutable=True, ) ) if with_manifest: repository.save_manifest( DistributionManifest( id=manifest_id, candidate_id=candidate_id, manifest_version=1, manifest_digest="digest-task-int", artifacts_digest="digest-task-int", source_snapshot_ref="git:sha-task-int", content_json={ "summary": { "included_count": 1, "excluded_count": 0, "prohibited_detected_count": 0, } }, created_by="tester", created_at=datetime.now(timezone.utc), immutable=True, ) ) return repository, candidate_id, policy_id, manifest_id # [/DEF:_seed_repository:Function] # [DEF:CleanReleaseCompliancePlugin:Class] # @PURPOSE: TaskManager plugin shim that executes clean release compliance orchestration. class CleanReleaseCompliancePlugin: @property def id(self) -> str: return "clean-release-compliance" @property def name(self) -> str: return "clean_release_compliance" def execute(self, params: Dict[str, Any], context=None): orchestrator = CleanComplianceOrchestrator(params["repository"]) run = orchestrator.start_check_run( candidate_id=params["candidate_id"], policy_id=params["policy_id"], requested_by=params.get("requested_by", "tester"), manifest_id=params["manifest_id"], ) run.task_id = params["_task_id"] params["repository"].save_check_run(run) run = orchestrator.execute_stages(run) run = orchestrator.finalize_run(run) if context is not None: context.logger.info("Compliance run completed via TaskManager plugin") return {"run_id": run.id, "run_status": run.status, "final_status": run.final_status} # [/DEF:CleanReleaseCompliancePlugin:Class] # [DEF:_PluginLoaderStub:Class] # @PURPOSE: Provide minimal plugin loader contract used by TaskManager in integration tests. class _PluginLoaderStub: def __init__(self, plugin: CleanReleaseCompliancePlugin): self._plugin = plugin def has_plugin(self, plugin_id: str) -> bool: return plugin_id == self._plugin.id def get_plugin(self, plugin_id: str): if plugin_id != self._plugin.id: raise ValueError("Plugin not found") return self._plugin # [/DEF:_PluginLoaderStub:Class] # [DEF:_make_task_manager:Function] # @PURPOSE: Build TaskManager with mocked persistence services for isolated integration tests. # @POST: Returns TaskManager ready for async task execution. def _make_task_manager() -> TaskManager: plugin_loader = _PluginLoaderStub(CleanReleaseCompliancePlugin()) with patch("src.core.task_manager.manager.TaskPersistenceService") as mock_persistence, patch( "src.core.task_manager.manager.TaskLogPersistenceService" ) as mock_log_persistence: mock_persistence.return_value.load_tasks.return_value = [] mock_persistence.return_value.persist_task = MagicMock() mock_log_persistence.return_value.add_logs = MagicMock() mock_log_persistence.return_value.get_logs = MagicMock(return_value=[]) mock_log_persistence.return_value.get_log_stats = MagicMock() mock_log_persistence.return_value.get_sources = MagicMock(return_value=[]) return TaskManager(plugin_loader) # [/DEF:_make_task_manager:Function] # [DEF:_wait_for_terminal_task:Function] # @PURPOSE: Poll task registry until target task reaches terminal status. # @PRE: task_id exists in manager registry. # @POST: Returns task with SUCCESS or FAILED status, otherwise raises TimeoutError. async def _wait_for_terminal_task(manager: TaskManager, task_id: str, timeout_seconds: float = 3.0): started = asyncio.get_running_loop().time() while True: task = manager.get_task(task_id) if task and task.status in {TaskStatus.SUCCESS, TaskStatus.FAILED}: return task if asyncio.get_running_loop().time() - started > timeout_seconds: raise TimeoutError(f"Task {task_id} did not reach terminal status") await asyncio.sleep(0.05) # [/DEF:_wait_for_terminal_task:Function] # [DEF:test_compliance_run_executes_as_task_manager_task:Function] # @PURPOSE: Verify successful compliance execution is observable as TaskManager SUCCESS task. # @PRE: Candidate, policy and manifest are available in repository. # @POST: Task ends with SUCCESS; run is persisted with SUCCEEDED status and task binding. @pytest.mark.asyncio async def test_compliance_run_executes_as_task_manager_task(): repository, candidate_id, policy_id, manifest_id = _seed_repository(with_manifest=True) manager = _make_task_manager() try: task = await manager.create_task( "clean-release-compliance", { "repository": repository, "candidate_id": candidate_id, "policy_id": policy_id, "manifest_id": manifest_id, "requested_by": "integration-tester", }, ) finished = await _wait_for_terminal_task(manager, task.id) assert finished.status == TaskStatus.SUCCESS assert isinstance(finished.result, dict) run_id = finished.result["run_id"] run = repository.get_check_run(run_id) assert run is not None assert run.status == RunStatus.SUCCEEDED assert run.task_id == task.id finally: manager._flusher_stop_event.set() manager._flusher_thread.join(timeout=2) # [/DEF:test_compliance_run_executes_as_task_manager_task:Function] # [DEF:test_compliance_run_missing_manifest_marks_task_failed:Function] # @PURPOSE: Verify missing manifest startup failure is surfaced as TaskManager FAILED task. # @PRE: Candidate/policy exist but manifest is absent. # @POST: Task ends with FAILED and run history remains empty. @pytest.mark.asyncio async def test_compliance_run_missing_manifest_marks_task_failed(): repository, candidate_id, policy_id, manifest_id = _seed_repository(with_manifest=False) manager = _make_task_manager() try: task = await manager.create_task( "clean-release-compliance", { "repository": repository, "candidate_id": candidate_id, "policy_id": policy_id, "manifest_id": manifest_id, "requested_by": "integration-tester", }, ) finished = await _wait_for_terminal_task(manager, task.id) assert finished.status == TaskStatus.FAILED assert len(repository.check_runs) == 0 assert any("Manifest or Policy not found" in log.message for log in finished.logs) finally: manager._flusher_stop_event.set() manager._flusher_thread.join(timeout=2) # [/DEF:test_compliance_run_missing_manifest_marks_task_failed:Function] # [/DEF:backend.tests.services.clean_release.test_compliance_task_integration:Module]