Files
ss-tools/backend/tests/services/clean_release/test_compliance_task_integration.py
busya 274510fc38 refactor(semantics): migrate legacy @TIER to @COMPLEXITY annotations
- Replaced @TIER: TRIVIAL with @COMPLEXITY: 1
- Replaced @TIER: STANDARD with @COMPLEXITY: 3
- Replaced @TIER: CRITICAL with @COMPLEXITY: 5
- Manually elevated specific critical/complex components to levels 2 and 4
- Ignored legacy, specs, and node_modules directories
- Updated generated semantic map
2026-03-16 10:06:44 +03:00

251 lines
9.7 KiB
Python

# [DEF:backend.tests.services.clean_release.test_compliance_task_integration:Module]
# @COMPLEXITY: 5
# @SEMANTICS: tests, clean-release, compliance, task-manager, integration
# @PURPOSE: Verify clean release compliance runs execute through TaskManager lifecycle with observable success/failure outcomes.
# @LAYER: Tests
# @RELATION: TESTS -> backend.src.core.task_manager.manager.TaskManager
# @RELATION: TESTS -> backend.src.services.clean_release.compliance_orchestrator.CleanComplianceOrchestrator
# @INVARIANT: Compliance execution triggered as task produces terminal task status and persists run evidence.
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Any, Dict
from unittest.mock import MagicMock, patch
import pytest
from src.core.task_manager.manager import TaskManager
from src.core.task_manager.models import TaskStatus
from src.models.clean_release import (
CleanPolicySnapshot,
DistributionManifest,
ReleaseCandidate,
SourceRegistrySnapshot,
)
from src.services.clean_release.compliance_orchestrator import CleanComplianceOrchestrator
from src.services.clean_release.enums import CandidateStatus, RunStatus
from src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_seed_repository:Function]
# @PURPOSE: Prepare deterministic candidate/policy/registry/manifest fixtures for task integration tests.
# @PRE: with_manifest controls manifest availability.
# @POST: Returns initialized repository and identifiers for compliance run startup.
def _seed_repository(*, with_manifest: bool) -> tuple[CleanReleaseRepository, str, str, str]:
repository = CleanReleaseRepository()
candidate_id = "cand-task-int-1"
policy_id = "policy-task-int-1"
manifest_id = "manifest-task-int-1"
repository.save_candidate(
ReleaseCandidate(
id=candidate_id,
version="1.0.0",
source_snapshot_ref="git:sha-task-int",
created_by="tester",
created_at=datetime.now(timezone.utc),
status=CandidateStatus.MANIFEST_BUILT.value,
)
)
repository.save_registry(
SourceRegistrySnapshot(
id="registry-task-int-1",
registry_id="trusted-registry",
registry_version="1.0.0",
allowed_hosts=["repo.internal.local"],
allowed_schemes=["https"],
allowed_source_types=["repo"],
immutable=True,
)
)
repository.save_policy(
CleanPolicySnapshot(
id=policy_id,
policy_id="trusted-policy",
policy_version="1.0.0",
content_json={"rules": []},
registry_snapshot_id="registry-task-int-1",
immutable=True,
)
)
if with_manifest:
repository.save_manifest(
DistributionManifest(
id=manifest_id,
candidate_id=candidate_id,
manifest_version=1,
manifest_digest="digest-task-int",
artifacts_digest="digest-task-int",
source_snapshot_ref="git:sha-task-int",
content_json={
"summary": {
"included_count": 1,
"excluded_count": 0,
"prohibited_detected_count": 0,
}
},
created_by="tester",
created_at=datetime.now(timezone.utc),
immutable=True,
)
)
return repository, candidate_id, policy_id, manifest_id
# [/DEF:_seed_repository:Function]
# [DEF:CleanReleaseCompliancePlugin:Class]
# @PURPOSE: TaskManager plugin shim that executes clean release compliance orchestration.
class CleanReleaseCompliancePlugin:
@property
def id(self) -> str:
return "clean-release-compliance"
@property
def name(self) -> str:
return "clean_release_compliance"
def execute(self, params: Dict[str, Any], context=None):
orchestrator = CleanComplianceOrchestrator(params["repository"])
run = orchestrator.start_check_run(
candidate_id=params["candidate_id"],
policy_id=params["policy_id"],
requested_by=params.get("requested_by", "tester"),
manifest_id=params["manifest_id"],
)
run.task_id = params["_task_id"]
params["repository"].save_check_run(run)
run = orchestrator.execute_stages(run)
run = orchestrator.finalize_run(run)
if context is not None:
context.logger.info("Compliance run completed via TaskManager plugin")
return {"run_id": run.id, "run_status": run.status, "final_status": run.final_status}
# [/DEF:CleanReleaseCompliancePlugin:Class]
# [DEF:_PluginLoaderStub:Class]
# @PURPOSE: Provide minimal plugin loader contract used by TaskManager in integration tests.
class _PluginLoaderStub:
def __init__(self, plugin: CleanReleaseCompliancePlugin):
self._plugin = plugin
def has_plugin(self, plugin_id: str) -> bool:
return plugin_id == self._plugin.id
def get_plugin(self, plugin_id: str):
if plugin_id != self._plugin.id:
raise ValueError("Plugin not found")
return self._plugin
# [/DEF:_PluginLoaderStub:Class]
# [DEF:_make_task_manager:Function]
# @PURPOSE: Build TaskManager with mocked persistence services for isolated integration tests.
# @POST: Returns TaskManager ready for async task execution.
def _make_task_manager() -> TaskManager:
plugin_loader = _PluginLoaderStub(CleanReleaseCompliancePlugin())
with patch("src.core.task_manager.manager.TaskPersistenceService") as mock_persistence, patch(
"src.core.task_manager.manager.TaskLogPersistenceService"
) as mock_log_persistence:
mock_persistence.return_value.load_tasks.return_value = []
mock_persistence.return_value.persist_task = MagicMock()
mock_log_persistence.return_value.add_logs = MagicMock()
mock_log_persistence.return_value.get_logs = MagicMock(return_value=[])
mock_log_persistence.return_value.get_log_stats = MagicMock()
mock_log_persistence.return_value.get_sources = MagicMock(return_value=[])
return TaskManager(plugin_loader)
# [/DEF:_make_task_manager:Function]
# [DEF:_wait_for_terminal_task:Function]
# @PURPOSE: Poll task registry until target task reaches terminal status.
# @PRE: task_id exists in manager registry.
# @POST: Returns task with SUCCESS or FAILED status, otherwise raises TimeoutError.
async def _wait_for_terminal_task(manager: TaskManager, task_id: str, timeout_seconds: float = 3.0):
started = asyncio.get_running_loop().time()
while True:
task = manager.get_task(task_id)
if task and task.status in {TaskStatus.SUCCESS, TaskStatus.FAILED}:
return task
if asyncio.get_running_loop().time() - started > timeout_seconds:
raise TimeoutError(f"Task {task_id} did not reach terminal status")
await asyncio.sleep(0.05)
# [/DEF:_wait_for_terminal_task:Function]
# [DEF:test_compliance_run_executes_as_task_manager_task:Function]
# @PURPOSE: Verify successful compliance execution is observable as TaskManager SUCCESS task.
# @PRE: Candidate, policy and manifest are available in repository.
# @POST: Task ends with SUCCESS; run is persisted with SUCCEEDED status and task binding.
@pytest.mark.asyncio
async def test_compliance_run_executes_as_task_manager_task():
repository, candidate_id, policy_id, manifest_id = _seed_repository(with_manifest=True)
manager = _make_task_manager()
try:
task = await manager.create_task(
"clean-release-compliance",
{
"repository": repository,
"candidate_id": candidate_id,
"policy_id": policy_id,
"manifest_id": manifest_id,
"requested_by": "integration-tester",
},
)
finished = await _wait_for_terminal_task(manager, task.id)
assert finished.status == TaskStatus.SUCCESS
assert isinstance(finished.result, dict)
run_id = finished.result["run_id"]
run = repository.get_check_run(run_id)
assert run is not None
assert run.status == RunStatus.SUCCEEDED
assert run.task_id == task.id
finally:
manager._flusher_stop_event.set()
manager._flusher_thread.join(timeout=2)
# [/DEF:test_compliance_run_executes_as_task_manager_task:Function]
# [DEF:test_compliance_run_missing_manifest_marks_task_failed:Function]
# @PURPOSE: Verify missing manifest startup failure is surfaced as TaskManager FAILED task.
# @PRE: Candidate/policy exist but manifest is absent.
# @POST: Task ends with FAILED and run history remains empty.
@pytest.mark.asyncio
async def test_compliance_run_missing_manifest_marks_task_failed():
repository, candidate_id, policy_id, manifest_id = _seed_repository(with_manifest=False)
manager = _make_task_manager()
try:
task = await manager.create_task(
"clean-release-compliance",
{
"repository": repository,
"candidate_id": candidate_id,
"policy_id": policy_id,
"manifest_id": manifest_id,
"requested_by": "integration-tester",
},
)
finished = await _wait_for_terminal_task(manager, task.id)
assert finished.status == TaskStatus.FAILED
assert len(repository.check_runs) == 0
assert any("Manifest or Policy not found" in log.message for log in finished.logs)
finally:
manager._flusher_stop_event.set()
manager._flusher_thread.join(timeout=2)
# [/DEF:test_compliance_run_missing_manifest_marks_task_failed:Function]
# [/DEF:backend.tests.services.clean_release.test_compliance_task_integration:Module]