fix: finalize semantic repair and test updates

This commit is contained in:
2026-03-21 15:07:06 +03:00
parent 005797334b
commit 9b47b9b667
99 changed files with 2484 additions and 985 deletions

View File

@@ -22,6 +22,7 @@ from src.core.migration.archive_parser import MigrationArchiveParser
# [DEF:test_extract_objects_from_zip_collects_all_types:Function]
# @RELATION: BINDS_TO -> TestArchiveParser
# @PURPOSE: Verify archive parser collects dashboard/chart/dataset YAML objects into typed buckets.
def test_extract_objects_from_zip_collects_all_types():
parser = MigrationArchiveParser()
with tempfile.TemporaryDirectory() as td:
@@ -33,11 +34,19 @@ def test_extract_objects_from_zip_collects_all_types():
(src_dir / "datasets").mkdir(parents=True)
with open(src_dir / "dashboards" / "dash.yaml", "w") as file_obj:
yaml.dump({"uuid": "dash-u1", "dashboard_title": "D1", "json_metadata": "{}"}, file_obj)
yaml.dump(
{"uuid": "dash-u1", "dashboard_title": "D1", "json_metadata": "{}"},
file_obj,
)
with open(src_dir / "charts" / "chart.yaml", "w") as file_obj:
yaml.dump({"uuid": "chart-u1", "slice_name": "C1", "viz_type": "bar"}, file_obj)
yaml.dump(
{"uuid": "chart-u1", "slice_name": "C1", "viz_type": "bar"}, file_obj
)
with open(src_dir / "datasets" / "dataset.yaml", "w") as file_obj:
yaml.dump({"uuid": "ds-u1", "table_name": "orders", "database_uuid": "db-u1"}, file_obj)
yaml.dump(
{"uuid": "ds-u1", "table_name": "orders", "database_uuid": "db-u1"},
file_obj,
)
with zipfile.ZipFile(zip_path, "w") as zip_obj:
for root, _, files in os.walk(src_dir):
@@ -61,5 +70,5 @@ def test_extract_objects_from_zip_collects_all_types():
raise AssertionError("dataset uuid mismatch")
# [/DEF:TestArchiveParser:Module]
# [/DEF:test_extract_objects_from_zip_collects_all_types:Function]
# [/DEF:TestArchiveParser:Module]

View File

@@ -3,7 +3,7 @@
# @COMPLEXITY: 3
# @PURPOSE: Unit tests for MigrationDryRunService diff and risk computation contracts.
# @LAYER: Domain
# @RELATION: VERIFIES -> backend.src.core.migration.dry_run_orchestrator
# @RELATION: VERIFIES -> [MigrationDryRunOrchestratorModule]
#
import json
import sys
@@ -24,16 +24,21 @@ from src.models.mapping import Base
# [DEF:_load_fixture:Function]
# @RELATION: BINDS_TO -> TestDryRunOrchestrator
# @RELATION: BINDS_TO -> [TestDryRunOrchestrator]
# @PURPOSE: Load canonical migration dry-run fixture payload used by deterministic orchestration assertions.
def _load_fixture() -> dict:
fixture_path = Path(__file__).parents[2] / "fixtures" / "migration_dry_run_fixture.json"
fixture_path = (
Path(__file__).parents[2] / "fixtures" / "migration_dry_run_fixture.json"
)
return json.loads(fixture_path.read_text())
# [/DEF:_load_fixture:Function]
# [DEF:_make_session:Function]
# @RELATION: BINDS_TO -> TestDryRunOrchestrator
# @RELATION: BINDS_TO -> [TestDryRunOrchestrator]
# @PURPOSE: Build isolated in-memory SQLAlchemy session for dry-run service tests.
def _make_session():
engine = create_engine(
"sqlite:///:memory:",
@@ -47,8 +52,10 @@ def _make_session():
# [/DEF:_make_session:Function]
# [DEF:test_migration_dry_run_service_builds_diff_and_risk:Function]
# @RELATION: BINDS_TO -> TestDryRunOrchestrator
# @RELATION: BINDS_TO -> [TestDryRunOrchestrator]
# @PURPOSE: Verify dry-run orchestration returns stable diff summary and required risk codes.
def test_migration_dry_run_service_builds_diff_and_risk():
# @TEST_CONTRACT: dry_run_result_contract -> {
# required_fields: {diff: object, summary: object, risk: object},
@@ -68,7 +75,9 @@ def test_migration_dry_run_service_builds_diff_and_risk():
)
source_client = MagicMock()
source_client.get_dashboards_summary.return_value = fixture["source_dashboard_summary"]
source_client.get_dashboards_summary.return_value = fixture[
"source_dashboard_summary"
]
source_client.export_dashboard.return_value = (b"PK\x03\x04", "source.zip")
target_client = MagicMock()
@@ -117,5 +126,5 @@ def test_migration_dry_run_service_builds_diff_and_risk():
raise AssertionError("breaking_reference risk is not detected")
# [/DEF:TestDryRunOrchestrator:Module]
# [/DEF:test_migration_dry_run_service_builds_diff_and_risk:Function]
# [/DEF:TestDryRunOrchestrator:Module]

View File

@@ -3,7 +3,7 @@
# @COMPLEXITY: 3
# @PURPOSE: Unit tests for MigrationEngine's cross-filter patching algorithms.
# @LAYER: Domain
# @RELATION: VERIFIES -> backend.src.core.migration_engine
# @RELATION: VERIFIES -> [src.core.migration_engine:Module]
#
import pytest
import tempfile
@@ -28,7 +28,7 @@ from src.models.mapping import ResourceType
# [DEF:MockMappingService:Class]
# @RELATION: BINDS_TO ->[TestMigrationEngine]
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @COMPLEXITY: 2
# @PURPOSE: Deterministic mapping service double for native filter ID remapping scenarios.
# @INVARIANT: Returns mappings only for requested UUID keys present in seeded map.
@@ -50,7 +50,8 @@ class MockMappingService:
# [DEF:_write_dashboard_yaml:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Serialize dashboard metadata into YAML fixture with json_metadata payload for patch tests.
def _write_dashboard_yaml(dir_path: Path, metadata: dict) -> Path:
"""Helper: writes a dashboard YAML file with json_metadata."""
file_path = dir_path / "dash.yaml"
@@ -65,7 +66,8 @@ def _write_dashboard_yaml(dir_path: Path, metadata: dict) -> Path:
# [DEF:test_patch_dashboard_metadata_replaces_chart_ids:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Verify native filter target chartId values are remapped via mapping service results.
def test_patch_dashboard_metadata_replaces_chart_ids():
"""Verifies that chartId values are replaced using the mapping service."""
mock_service = MockMappingService({"uuid-chart-A": 999})
@@ -91,7 +93,8 @@ def test_patch_dashboard_metadata_replaces_chart_ids():
# [DEF:test_patch_dashboard_metadata_replaces_dataset_ids:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Verify native filter target datasetId values are remapped via mapping service results.
def test_patch_dashboard_metadata_replaces_dataset_ids():
"""Verifies that datasetId values are replaced using the mapping service."""
mock_service = MockMappingService({"uuid-ds-B": 500})
@@ -118,7 +121,8 @@ def test_patch_dashboard_metadata_replaces_dataset_ids():
# [DEF:test_patch_dashboard_metadata_skips_when_no_metadata:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Ensure dashboard files without json_metadata are left unchanged by metadata patching.
def test_patch_dashboard_metadata_skips_when_no_metadata():
"""Verifies early return when json_metadata key is absent."""
mock_service = MockMappingService({})
@@ -140,7 +144,8 @@ def test_patch_dashboard_metadata_skips_when_no_metadata():
# [DEF:test_patch_dashboard_metadata_handles_missing_targets:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Verify patching updates mapped targets while preserving unmapped native filter IDs.
def test_patch_dashboard_metadata_handles_missing_targets():
"""When some source IDs have no target mapping, patches what it can and leaves the rest."""
mock_service = MockMappingService({"uuid-A": 100}) # Only uuid-A maps
@@ -173,7 +178,8 @@ def test_patch_dashboard_metadata_handles_missing_targets():
# [DEF:test_extract_chart_uuids_from_archive:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Verify chart archive scan returns complete local chart id-to-uuid mapping.
def test_extract_chart_uuids_from_archive():
"""Verifies that chart YAML files are parsed for id->uuid mappings."""
engine = MigrationEngine()
@@ -201,7 +207,8 @@ def test_extract_chart_uuids_from_archive():
# [DEF:test_transform_yaml_replaces_database_uuid:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Ensure dataset YAML database_uuid fields are replaced when source UUID mapping exists.
def test_transform_yaml_replaces_database_uuid():
"""Verifies that database_uuid in a dataset YAML is replaced."""
engine = MigrationEngine()
@@ -223,7 +230,8 @@ def test_transform_yaml_replaces_database_uuid():
# [DEF:test_transform_yaml_ignores_unmapped_uuid:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Ensure transform_yaml leaves dataset files untouched when database_uuid is not mapped.
def test_transform_yaml_ignores_unmapped_uuid():
"""Verifies no changes when UUID is not in the mapping."""
engine = MigrationEngine()
@@ -247,7 +255,8 @@ def test_transform_yaml_ignores_unmapped_uuid():
# [DEF:test_transform_zip_end_to_end:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Validate full ZIP transform pipeline remaps datasets and dashboard cross-filter chart IDs.
def test_transform_zip_end_to_end():
"""Verifies full orchestration: extraction, transformation, patching, and re-packaging."""
mock_service = MockMappingService({"char-uuid": 101, "ds-uuid": 202})
@@ -327,7 +336,8 @@ def test_transform_zip_end_to_end():
# [DEF:test_transform_zip_invalid_path:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Verify transform_zip returns False when source archive path does not exist.
def test_transform_zip_invalid_path():
"""@PRE: Verify behavior (False) on invalid ZIP path."""
engine = MigrationEngine()
@@ -339,7 +349,8 @@ def test_transform_zip_invalid_path():
# [DEF:test_transform_yaml_nonexistent_file:Function]
# @RELATION: BINDS_TO -> TestMigrationEngine
# @RELATION: BINDS_TO -> [TestMigrationEngine:Module]
# @PURPOSE: Verify transform_yaml raises FileNotFoundError for missing YAML source files.
def test_transform_yaml_nonexistent_file():
"""@PRE: Verify behavior on non-existent YAML file."""
engine = MigrationEngine()
@@ -349,5 +360,5 @@ def test_transform_yaml_nonexistent_file():
engine._transform_yaml(Path("non_existent.yaml"), {})
# [/DEF:TestMigrationEngine:Module]
# [/DEF:test_transform_yaml_nonexistent_file:Function]
# [/DEF:TestMigrationEngine:Module]

View File

@@ -13,13 +13,19 @@ from src.dependencies import get_clean_release_repository, get_config_manager
from datetime import datetime, timezone
from uuid import uuid4
from src.models.clean_release import CleanPolicySnapshot, ComplianceReport, ReleaseCandidate, SourceRegistrySnapshot
from src.models.clean_release import (
CleanPolicySnapshot,
ComplianceReport,
ReleaseCandidate,
SourceRegistrySnapshot,
)
from src.services.clean_release.enums import CandidateStatus, ComplianceDecision
from src.scripts.clean_release_cli import main as cli_main
# [DEF:test_cli_candidate_register_scaffold:Function]
# @RELATION: BINDS_TO -> test_clean_release_cli
# @PURPOSE: Verify candidate-register command exits successfully for valid required arguments.
def test_cli_candidate_register_scaffold() -> None:
"""Candidate register CLI command smoke test."""
exit_code = cli_main(
@@ -40,8 +46,10 @@ def test_cli_candidate_register_scaffold() -> None:
# [/DEF:test_cli_candidate_register_scaffold:Function]
# [DEF:test_cli_manifest_build_scaffold:Function]
# @RELATION: BINDS_TO -> test_clean_release_cli
# @PURPOSE: Verify candidate-register/artifact-import/manifest-build smoke path succeeds end-to-end.
def test_cli_manifest_build_scaffold() -> None:
"""Manifest build CLI command smoke test."""
register_exit = cli_main(
@@ -90,8 +98,10 @@ def test_cli_manifest_build_scaffold() -> None:
# [/DEF:test_cli_manifest_build_scaffold:Function]
# [DEF:test_cli_compliance_run_scaffold:Function]
# @RELATION: BINDS_TO -> test_clean_release_cli
# @PURPOSE: Verify compliance run/status/violations/report commands complete for prepared candidate.
def test_cli_compliance_run_scaffold() -> None:
"""Compliance CLI command smoke test for run/status/report/violations."""
repository = get_clean_release_repository()
@@ -119,6 +129,7 @@ def test_cli_compliance_run_scaffold() -> None:
config = config_manager.get_config()
if getattr(config, "settings", None) is None:
# @INVARIANT: SimpleNamespace substitutes for GlobalSettings — any field rename in GlobalSettings will silently not propagate here; re-verify on GlobalSettings schema changes.
config.settings = SimpleNamespace()
config.settings.clean_release = SimpleNamespace(
active_policy_id=policy.id,
@@ -180,7 +191,11 @@ def test_cli_compliance_run_scaffold() -> None:
)
assert run_exit == 0
run_id = next(run.id for run in repository.check_runs.values() if run.candidate_id == "cli-candidate-3")
run_id = next(
run.id
for run in repository.check_runs.values()
if run.candidate_id == "cli-candidate-3"
)
status_exit = cli_main(["compliance-status", "--run-id", run_id, "--json"])
assert status_exit == 0
@@ -194,8 +209,10 @@ def test_cli_compliance_run_scaffold() -> None:
# [/DEF:test_cli_compliance_run_scaffold:Function]
# [DEF:test_cli_release_gate_commands_scaffold:Function]
# @RELATION: BINDS_TO -> test_clean_release_cli
# @PURPOSE: Verify approve/reject/publish/revoke release-gate commands execute with valid fixtures.
def test_cli_release_gate_commands_scaffold() -> None:
"""Release gate CLI smoke test for approve/reject/publish/revoke commands."""
repository = get_clean_release_repository()
@@ -231,7 +248,11 @@ def test_cli_release_gate_commands_scaffold() -> None:
run_id=f"run-{uuid4()}",
candidate_id=approved_candidate_id,
final_status=ComplianceDecision.PASSED.value,
summary_json={"operator_summary": "ok", "violations_count": 0, "blocking_violations_count": 0},
summary_json={
"operator_summary": "ok",
"violations_count": 0,
"blocking_violations_count": 0,
},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
@@ -242,7 +263,11 @@ def test_cli_release_gate_commands_scaffold() -> None:
run_id=f"run-{uuid4()}",
candidate_id=rejected_candidate_id,
final_status=ComplianceDecision.PASSED.value,
summary_json={"operator_summary": "ok", "violations_count": 0, "blocking_violations_count": 0},
summary_json={
"operator_summary": "ok",
"violations_count": 0,
"blocking_violations_count": 0,
},
generated_at=datetime.now(timezone.utc),
immutable=True,
)
@@ -317,5 +342,5 @@ def test_cli_release_gate_commands_scaffold() -> None:
assert revoke_exit == 0
# [/DEF:test_clean_release_cli:Module]
# [/DEF:test_cli_release_gate_commands_scaffold:Function]
# [/DEF:test_clean_release_cli:Module]

View File

@@ -17,6 +17,7 @@ from src.scripts.clean_release_tui import CleanReleaseTUI, main
# [DEF:_build_mock_stdscr:Function]
# @RELATION: BINDS_TO -> test_clean_release_tui_v2
# @PURPOSE: Build deterministic curses screen mock with default terminal geometry and exit key.
def _build_mock_stdscr() -> MagicMock:
stdscr = MagicMock()
stdscr.getmaxyx.return_value = (40, 120)
@@ -26,9 +27,11 @@ def _build_mock_stdscr() -> MagicMock:
# [/DEF:_build_mock_stdscr:Function]
@patch("src.scripts.clean_release_tui.curses")
# [DEF:test_tui_f5_dispatches_run_action:Function]
# @RELATION: BINDS_TO -> test_clean_release_tui_v2
# @PURPOSE: Verify F5 key dispatch invokes run_checks exactly once before graceful exit.
def test_tui_f5_dispatches_run_action(mock_curses_module: MagicMock) -> None:
"""F5 should dispatch run action from TUI loop."""
mock_curses_module.KEY_F10 = curses.KEY_F10
@@ -48,9 +51,11 @@ def test_tui_f5_dispatches_run_action(mock_curses_module: MagicMock) -> None:
# [/DEF:test_tui_f5_dispatches_run_action:Function]
@patch("src.scripts.clean_release_tui.curses")
# [DEF:test_tui_f5_run_smoke_reports_blocked_state:Function]
# @RELATION: BINDS_TO -> test_clean_release_tui_v2
# @PURPOSE: Verify blocked compliance state is surfaced after F5-triggered run action.
def test_tui_f5_run_smoke_reports_blocked_state(mock_curses_module: MagicMock) -> None:
"""F5 smoke test should expose blocked outcome state after run action."""
mock_curses_module.KEY_F10 = curses.KEY_F10
@@ -77,8 +82,10 @@ def test_tui_f5_run_smoke_reports_blocked_state(mock_curses_module: MagicMock) -
# [/DEF:test_tui_f5_run_smoke_reports_blocked_state:Function]
# [DEF:test_tui_non_tty_refuses_startup:Function]
# @RELATION: BINDS_TO -> test_clean_release_tui_v2
# @PURPOSE: Verify non-TTY execution returns exit code 2 with actionable stderr guidance.
def test_tui_non_tty_refuses_startup(capsys) -> None:
"""Non-TTY startup must refuse TUI mode and redirect operator to CLI/API flow."""
with patch("sys.stdout.isatty", return_value=False):
@@ -92,9 +99,11 @@ def test_tui_non_tty_refuses_startup(capsys) -> None:
# [/DEF:test_tui_non_tty_refuses_startup:Function]
@patch("src.scripts.clean_release_tui.curses")
# [DEF:test_tui_f8_blocked_without_facade_binding:Function]
# @RELATION: BINDS_TO -> test_clean_release_tui_v2
# @PURPOSE: Verify F8 path reports disabled action instead of mutating hidden facade state.
def test_tui_f8_blocked_without_facade_binding(mock_curses_module: MagicMock) -> None:
"""F8 should not perform hidden state mutation when facade action is not bound."""
mock_curses_module.KEY_F10 = curses.KEY_F10
@@ -112,5 +121,5 @@ def test_tui_f8_blocked_without_facade_binding(mock_curses_module: MagicMock) ->
assert "F8 disabled" in app.last_error
# [/DEF:test_clean_release_tui_v2:Module]
# [/DEF:test_tui_f8_blocked_without_facade_binding:Function]
# [/DEF:test_clean_release_tui_v2:Module]

View File

@@ -1,5 +1,5 @@
# [DEF:test_candidate_manifest_services:Module]
# @RELATION: BELONGS_TO -> SrcRoot
# @RELATION: BELONGS_TO -> [SrcRoot:Module]
# @COMPLEXITY: 3
# @PURPOSE: Test lifecycle and manifest versioning for release candidates.
# @LAYER: Tests
@@ -9,12 +9,17 @@ from datetime import datetime, timezone
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.core.database import Base
from src.models.clean_release import ReleaseCandidate, DistributionManifest, CandidateArtifact
from src.models.clean_release import (
ReleaseCandidate,
DistributionManifest,
CandidateArtifact,
)
from src.services.clean_release.enums import CandidateStatus
from src.services.clean_release.candidate_service import register_candidate
from src.services.clean_release.manifest_service import build_manifest_snapshot
from src.services.clean_release.repository import CleanReleaseRepository
@pytest.fixture
def db_session():
engine = create_engine("sqlite:///:memory:")
@@ -24,8 +29,10 @@ def db_session():
yield session
session.close()
# [DEF:test_candidate_lifecycle_transitions:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify release candidate allows legal status transitions and rejects forbidden back-transitions.
def test_candidate_lifecycle_transitions(db_session):
"""
@PURPOSE: Verify legal state transitions for ReleaseCandidate.
@@ -36,7 +43,7 @@ def test_candidate_lifecycle_transitions(db_session):
version="1.0.0",
source_snapshot_ref="ref-1",
created_by="operator",
status=CandidateStatus.DRAFT
status=CandidateStatus.DRAFT,
)
db_session.add(candidate)
db_session.commit()
@@ -47,13 +54,17 @@ def test_candidate_lifecycle_transitions(db_session):
# Invalid transition: PREPARED -> DRAFT (should raise IllegalTransitionError)
from src.services.clean_release.exceptions import IllegalTransitionError
with pytest.raises(IllegalTransitionError, match="Forbidden transition"):
candidate.transition_to(CandidateStatus.DRAFT)
# [/DEF:test_candidate_lifecycle_transitions:Function]
# [DEF:test_manifest_versioning_and_immutability:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify manifest versions increment monotonically and older snapshots remain queryable.
def test_manifest_versioning_and_immutability(db_session):
"""
@PURPOSE: Verify manifest versioning and immutability invariants.
@@ -70,7 +81,7 @@ def test_manifest_versioning_and_immutability(db_session):
source_snapshot_ref="ref1",
content_json={},
created_at=datetime.now(timezone.utc),
created_by="operator"
created_by="operator",
)
db_session.add(m1)
@@ -84,23 +95,34 @@ def test_manifest_versioning_and_immutability(db_session):
source_snapshot_ref="ref1",
content_json={},
created_at=datetime.now(timezone.utc),
created_by="operator"
created_by="operator",
)
db_session.add(m2)
db_session.commit()
latest = db_session.query(DistributionManifest).filter_by(candidate_id=candidate_id).order_by(DistributionManifest.manifest_version.desc()).first()
latest = (
db_session.query(DistributionManifest)
.filter_by(candidate_id=candidate_id)
.order_by(DistributionManifest.manifest_version.desc())
.first()
)
assert latest.manifest_version == 2
assert latest.id == "manifest-v2"
all_manifests = db_session.query(DistributionManifest).filter_by(candidate_id=candidate_id).all()
all_manifests = (
db_session.query(DistributionManifest)
.filter_by(candidate_id=candidate_id)
.all()
)
assert len(all_manifests) == 2
# [/DEF:test_manifest_versioning_and_immutability:Function]
# [DEF:_valid_artifacts:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Provide canonical valid artifact payload used by candidate registration tests.
def _valid_artifacts():
return [
{
@@ -114,8 +136,10 @@ def _valid_artifacts():
# [/DEF:_valid_artifacts:Function]
# [DEF:test_register_candidate_rejects_duplicate_candidate_id:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify duplicate candidate_id registration is rejected by service invariants.
def test_register_candidate_rejects_duplicate_candidate_id():
repository = CleanReleaseRepository()
register_candidate(
@@ -140,8 +164,10 @@ def test_register_candidate_rejects_duplicate_candidate_id():
# [/DEF:test_register_candidate_rejects_duplicate_candidate_id:Function]
# [DEF:test_register_candidate_rejects_malformed_artifact_input:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify candidate registration rejects artifact payloads missing required fields.
def test_register_candidate_rejects_malformed_artifact_input():
repository = CleanReleaseRepository()
bad_artifacts = [{"id": "art-1", "path": "bin/app", "size": 42}] # missing sha256
@@ -159,8 +185,10 @@ def test_register_candidate_rejects_malformed_artifact_input():
# [/DEF:test_register_candidate_rejects_malformed_artifact_input:Function]
# [DEF:test_register_candidate_rejects_empty_artifact_set:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify candidate registration rejects empty artifact collections.
def test_register_candidate_rejects_empty_artifact_set():
repository = CleanReleaseRepository()
@@ -177,8 +205,10 @@ def test_register_candidate_rejects_empty_artifact_set():
# [/DEF:test_register_candidate_rejects_empty_artifact_set:Function]
# [DEF:test_manifest_service_rebuild_creates_new_version:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify repeated manifest build creates a new incremented immutable version.
def test_manifest_service_rebuild_creates_new_version():
repository = CleanReleaseRepository()
register_candidate(
@@ -190,8 +220,12 @@ def test_manifest_service_rebuild_creates_new_version():
artifacts=_valid_artifacts(),
)
first = build_manifest_snapshot(repository=repository, candidate_id="manifest-version-1", created_by="operator")
second = build_manifest_snapshot(repository=repository, candidate_id="manifest-version-1", created_by="operator")
first = build_manifest_snapshot(
repository=repository, candidate_id="manifest-version-1", created_by="operator"
)
second = build_manifest_snapshot(
repository=repository, candidate_id="manifest-version-1", created_by="operator"
)
assert first.manifest_version == 1
assert second.manifest_version == 2
@@ -200,8 +234,10 @@ def test_manifest_service_rebuild_creates_new_version():
# [/DEF:test_manifest_service_rebuild_creates_new_version:Function]
# [DEF:test_manifest_service_existing_manifest_cannot_be_mutated:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify existing manifest snapshot remains immutable when rebuilding newer manifest version.
def test_manifest_service_existing_manifest_cannot_be_mutated():
repository = CleanReleaseRepository()
register_candidate(
@@ -213,10 +249,18 @@ def test_manifest_service_existing_manifest_cannot_be_mutated():
artifacts=_valid_artifacts(),
)
created = build_manifest_snapshot(repository=repository, candidate_id="manifest-immutable-1", created_by="operator")
created = build_manifest_snapshot(
repository=repository,
candidate_id="manifest-immutable-1",
created_by="operator",
)
original_digest = created.manifest_digest
rebuilt = build_manifest_snapshot(repository=repository, candidate_id="manifest-immutable-1", created_by="operator")
rebuilt = build_manifest_snapshot(
repository=repository,
candidate_id="manifest-immutable-1",
created_by="operator",
)
old_manifest = repository.get_manifest(created.id)
assert old_manifest is not None
@@ -227,13 +271,20 @@ def test_manifest_service_existing_manifest_cannot_be_mutated():
# [/DEF:test_manifest_service_existing_manifest_cannot_be_mutated:Function]
# [DEF:test_manifest_service_rejects_missing_candidate:Function]
# @RELATION: BINDS_TO -> test_candidate_manifest_services
# @RELATION: BINDS_TO -> [test_candidate_manifest_services:Module]
# @PURPOSE: Verify manifest build fails with missing candidate identifier.
def test_manifest_service_rejects_missing_candidate():
repository = CleanReleaseRepository()
with pytest.raises(ValueError, match="not found"):
build_manifest_snapshot(repository=repository, candidate_id="missing-candidate", created_by="operator")
build_manifest_snapshot(
repository=repository,
candidate_id="missing-candidate",
created_by="operator",
)
# [/DEF:test_candidate_manifest_services:Module]
# [/DEF:test_manifest_service_rejects_missing_candidate:Function]
# [/DEF:test_candidate_manifest_services:Module]

View File

@@ -3,9 +3,9 @@
# @SEMANTICS: clean-release, policy-resolution, trusted-snapshots, contracts
# @PURPOSE: Verify trusted policy snapshot resolution contract and error guards.
# @LAYER: Tests
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.policy_resolution_service
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.repository
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.exceptions
# @RELATION: DEPENDS_ON -> [policy_resolution_service]
# @RELATION: DEPENDS_ON -> [repository]
# @RELATION: DEPENDS_ON -> [clean_release_exceptions]
# @INVARIANT: Resolution uses only ConfigManager active IDs and rejects runtime override attempts.
from __future__ import annotations
@@ -16,25 +16,33 @@ import pytest
from src.models.clean_release import CleanPolicySnapshot, SourceRegistrySnapshot
from src.services.clean_release.exceptions import PolicyResolutionError
from src.services.clean_release.policy_resolution_service import resolve_trusted_policy_snapshots
from src.services.clean_release.policy_resolution_service import (
resolve_trusted_policy_snapshots,
)
from src.services.clean_release.repository import CleanReleaseRepository
# [DEF:_config_manager:Function]
# @RELATION: BINDS_TO -> TestPolicyResolutionService
# @RELATION: BINDS_TO -> [TestPolicyResolutionService]
# @COMPLEXITY: 1
# @PURPOSE: Build deterministic ConfigManager-like stub for tests.
# @INVARIANT: Only settings.clean_release.active_policy_id and active_registry_id are populated; any other settings field access raises AttributeError.
# @PRE: policy_id and registry_id may be None or non-empty strings.
# @POST: Returns object exposing get_config().settings.clean_release active IDs.
def _config_manager(policy_id, registry_id):
clean_release = SimpleNamespace(active_policy_id=policy_id, active_registry_id=registry_id)
clean_release = SimpleNamespace(
active_policy_id=policy_id, active_registry_id=registry_id
)
settings = SimpleNamespace(clean_release=clean_release)
config = SimpleNamespace(settings=settings)
return SimpleNamespace(get_config=lambda: config)
# [/DEF:_config_manager:Function]
# [DEF:test_resolve_trusted_policy_snapshots_missing_profile:Function]
# @RELATION: BINDS_TO -> TestPolicyResolutionService
# @RELATION: BINDS_TO -> [TestPolicyResolutionService]
# @PURPOSE: Ensure resolution fails when trusted profile is not configured.
# @PRE: active_policy_id is None.
# @POST: Raises PolicyResolutionError with missing trusted profile reason.
@@ -47,11 +55,13 @@ def test_resolve_trusted_policy_snapshots_missing_profile():
config_manager=config_manager,
repository=repository,
)
# [/DEF:test_resolve_trusted_policy_snapshots_missing_profile:Function]
# [DEF:test_resolve_trusted_policy_snapshots_missing_registry:Function]
# @RELATION: BINDS_TO -> TestPolicyResolutionService
# @RELATION: BINDS_TO -> [TestPolicyResolutionService]
# @PURPOSE: Ensure resolution fails when trusted registry is not configured.
# @PRE: active_registry_id is None and active_policy_id is set.
# @POST: Raises PolicyResolutionError with missing trusted registry reason.
@@ -64,11 +74,13 @@ def test_resolve_trusted_policy_snapshots_missing_registry():
config_manager=config_manager,
repository=repository,
)
# [/DEF:test_resolve_trusted_policy_snapshots_missing_registry:Function]
# [DEF:test_resolve_trusted_policy_snapshots_rejects_override_attempt:Function]
# @RELATION: BINDS_TO -> TestPolicyResolutionService
# @RELATION: BINDS_TO -> [TestPolicyResolutionService]
# @PURPOSE: Ensure runtime override attempt is rejected even if snapshots exist.
# @PRE: valid trusted snapshots exist in repository and override is provided.
# @POST: Raises PolicyResolutionError with override forbidden reason.
@@ -104,6 +116,8 @@ def test_resolve_trusted_policy_snapshots_rejects_override_attempt():
repository=repository,
policy_id_override="policy-override",
)
# [/DEF:test_resolve_trusted_policy_snapshots_rejects_override_attempt:Function]
# [/DEF:TestPolicyResolutionService:Module]

View File

@@ -1,42 +1,71 @@
# [DEF:TestResourceHubs:Module]
# @RELATION: BELONGS_TO -> SrcRoot
# @COMPLEXITY: 3
# @SEMANTICS: tests, resource-hubs, dashboards, datasets, pagination, api
# @PURPOSE: Contract tests for resource hub dashboards/datasets listing and pagination boundary validation.
# @LAYER: Domain (Tests)
import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, AsyncMock
from src.app import app
from src.dependencies import get_config_manager, get_task_manager, get_resource_service, has_permission
from src.dependencies import (
get_config_manager,
get_task_manager,
get_resource_service,
has_permission,
)
client = TestClient(app)
# [DEF:test_dashboards_api:Test]
# @RELATION: BINDS_TO -> SrcRoot
# @RELATION: BINDS_TO -> TestResourceHubs
# @PURPOSE: Verify GET /api/dashboards contract compliance
# @TEST: Valid env_id returns 200 and dashboard list
# @TEST: Invalid env_id returns 404
# @TEST: Search filter works
@pytest.fixture
def mock_deps():
# @INVARIANT: unconstrained mock — no spec= enforced; attribute typos will silently pass
config_manager = MagicMock()
# @INVARIANT: unconstrained mock — no spec= enforced; attribute typos will silently pass
task_manager = MagicMock()
# @INVARIANT: unconstrained mock — no spec= enforced; attribute typos will silently pass
resource_service = MagicMock()
# Mock environment
env = MagicMock()
env.id = "env1"
config_manager.get_environments.return_value = [env]
# Mock tasks
task_manager.get_all_tasks.return_value = []
# Mock dashboards
resource_service.get_dashboards_with_status = AsyncMock(return_value=[
{"id": 1, "title": "Sales", "slug": "sales", "git_status": {"branch": "main", "sync_status": "OK"}, "last_task": None},
{"id": 2, "title": "Marketing", "slug": "mkt", "git_status": None, "last_task": {"task_id": "t1", "status": "SUCCESS"}}
])
resource_service.get_dashboards_with_status = AsyncMock(
return_value=[
{
"id": 1,
"title": "Sales",
"slug": "sales",
"git_status": {"branch": "main", "sync_status": "OK"},
"last_task": None,
},
{
"id": 2,
"title": "Marketing",
"slug": "mkt",
"git_status": None,
"last_task": {"task_id": "t1", "status": "SUCCESS"},
},
]
)
app.dependency_overrides[get_config_manager] = lambda: config_manager
app.dependency_overrides[get_task_manager] = lambda: task_manager
app.dependency_overrides[get_resource_service] = lambda: resource_service
# Bypass permission check
mock_user = MagicMock()
mock_user.username = "testadmin"
@@ -44,24 +73,25 @@ def mock_deps():
admin_role = MagicMock()
admin_role.name = "Admin"
mock_user.roles.append(admin_role)
# Override both get_current_user and has_permission
from src.dependencies import get_current_user
app.dependency_overrides[get_current_user] = lambda: mock_user
# We need to override the specific instance returned by has_permission
app.dependency_overrides[has_permission("plugin:migration", "READ")] = lambda: mock_user
yield {
"config": config_manager,
"task": task_manager,
"resource": resource_service
}
app.dependency_overrides[has_permission("plugin:migration", "READ")] = (
lambda: mock_user
)
yield {"config": config_manager, "task": task_manager, "resource": resource_service}
app.dependency_overrides.clear()
# [DEF:test_get_dashboards_success:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_dashboards_api
# @PURPOSE: Verify dashboards endpoint returns 200 with expected dashboard payload fields.
def test_get_dashboards_success(mock_deps):
response = client.get("/api/dashboards?env_id=env1")
assert response.status_code == 200
@@ -71,18 +101,24 @@ def test_get_dashboards_success(mock_deps):
assert data["dashboards"][0]["title"] == "Sales"
assert data["dashboards"][0]["git_status"]["sync_status"] == "OK"
# [/DEF:test_get_dashboards_success:Function]
# [DEF:test_get_dashboards_not_found:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_dashboards_api
# @PURPOSE: Verify dashboards endpoint returns 404 for unknown environment identifier.
def test_get_dashboards_not_found(mock_deps):
response = client.get("/api/dashboards?env_id=invalid")
assert response.status_code == 404
# [/DEF:test_get_dashboards_not_found:Function]
# [DEF:test_get_dashboards_search:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_dashboards_api
# @PURPOSE: Verify dashboards endpoint search filter returns matching subset.
def test_get_dashboards_search(mock_deps):
response = client.get("/api/dashboards?env_id=env1&search=Sales")
assert response.status_code == 200
@@ -90,25 +126,37 @@ def test_get_dashboards_search(mock_deps):
assert len(data["dashboards"]) == 1
assert data["dashboards"][0]["title"] == "Sales"
# [/DEF:test_get_dashboards_search:Function]
# [/DEF:test_dashboards_api:Test]
# [DEF:test_datasets_api:Test]
# @RELATION: BINDS_TO -> SrcRoot
# @RELATION: BINDS_TO -> TestResourceHubs
# @PURPOSE: Verify GET /api/datasets contract compliance
# @TEST: Valid env_id returns 200 and dataset list
# @TEST: Invalid env_id returns 404
# @TEST: Search filter works
# @TEST: Negative - Service failure returns 503
# [/DEF:test_get_dashboards_search:Function]
# [DEF:test_get_datasets_success:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_datasets_api
# @PURPOSE: Verify datasets endpoint returns 200 with mapped fields payload.
def test_get_datasets_success(mock_deps):
mock_deps["resource"].get_datasets_with_status = AsyncMock(return_value=[
{"id": 1, "table_name": "orders", "schema": "public", "database": "db1", "mapped_fields": {"total": 10, "mapped": 5}, "last_task": None}
])
mock_deps["resource"].get_datasets_with_status = AsyncMock(
return_value=[
{
"id": 1,
"table_name": "orders",
"schema": "public",
"database": "db1",
"mapped_fields": {"total": 10, "mapped": 5},
"last_task": None,
}
]
)
response = client.get("/api/datasets?env_id=env1")
assert response.status_code == 200
data = response.json()
@@ -117,86 +165,126 @@ def test_get_datasets_success(mock_deps):
assert data["datasets"][0]["table_name"] == "orders"
assert data["datasets"][0]["mapped_fields"]["mapped"] == 5
# [/DEF:test_get_datasets_success:Function]
# [DEF:test_get_datasets_not_found:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_datasets_api
# @PURPOSE: Verify datasets endpoint returns 404 for unknown environment identifier.
def test_get_datasets_not_found(mock_deps):
response = client.get("/api/datasets?env_id=invalid")
assert response.status_code == 404
# [/DEF:test_get_datasets_not_found:Function]
# [DEF:test_get_datasets_search:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_datasets_api
# @PURPOSE: Verify datasets endpoint search filter returns matching dataset subset.
def test_get_datasets_search(mock_deps):
mock_deps["resource"].get_datasets_with_status = AsyncMock(return_value=[
{"id": 1, "table_name": "orders", "schema": "public", "database": "db1", "mapped_fields": {"total": 10, "mapped": 5}, "last_task": None},
{"id": 2, "table_name": "users", "schema": "public", "database": "db1", "mapped_fields": {"total": 5, "mapped": 5}, "last_task": None}
])
mock_deps["resource"].get_datasets_with_status = AsyncMock(
return_value=[
{
"id": 1,
"table_name": "orders",
"schema": "public",
"database": "db1",
"mapped_fields": {"total": 10, "mapped": 5},
"last_task": None,
},
{
"id": 2,
"table_name": "users",
"schema": "public",
"database": "db1",
"mapped_fields": {"total": 5, "mapped": 5},
"last_task": None,
},
]
)
response = client.get("/api/datasets?env_id=env1&search=orders")
assert response.status_code == 200
data = response.json()
assert len(data["datasets"]) == 1
assert data["datasets"][0]["table_name"] == "orders"
# [/DEF:test_get_datasets_search:Function]
# [DEF:test_get_datasets_service_failure:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_datasets_api
# @PURPOSE: Verify datasets endpoint surfaces backend fetch failure as HTTP 503.
def test_get_datasets_service_failure(mock_deps):
mock_deps["resource"].get_datasets_with_status = AsyncMock(side_effect=Exception("Superset down"))
mock_deps["resource"].get_datasets_with_status = AsyncMock(
side_effect=Exception("Superset down")
)
response = client.get("/api/datasets?env_id=env1")
assert response.status_code == 503
assert "Failed to fetch datasets" in response.json()["detail"]
# [/DEF:test_get_datasets_service_failure:Function]
# [/DEF:test_datasets_api:Test]
# [DEF:test_pagination_boundaries:Test]
# @RELATION: BINDS_TO -> SrcRoot
# @RELATION: BINDS_TO -> TestResourceHubs
# @PURPOSE: Verify pagination validation for GET endpoints
# @TEST: page<1 and page_size>100 return 400
# [/DEF:test_get_datasets_service_failure:Function]
# [DEF:test_get_dashboards_pagination_zero_page:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_pagination_boundaries
# @PURPOSE: Verify dashboards endpoint rejects page=0 with HTTP 400 validation error.
def test_get_dashboards_pagination_zero_page(mock_deps):
"""@TEST_EDGE: pagination_zero_page -> {page:0, status:400}"""
response = client.get("/api/dashboards?env_id=env1&page=0")
assert response.status_code == 400
assert "Page must be >= 1" in response.json()["detail"]
# [/DEF:test_get_dashboards_pagination_zero_page:Function]
# [DEF:test_get_dashboards_pagination_oversize:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_pagination_boundaries
# @PURPOSE: Verify dashboards endpoint rejects oversized page_size with HTTP 400.
def test_get_dashboards_pagination_oversize(mock_deps):
"""@TEST_EDGE: pagination_oversize -> {page_size:101, status:400}"""
response = client.get("/api/dashboards?env_id=env1&page_size=101")
assert response.status_code == 400
assert "Page size must be between 1 and 100" in response.json()["detail"]
# [/DEF:test_get_dashboards_pagination_oversize:Function]
# [DEF:test_get_datasets_pagination_zero_page:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_pagination_boundaries
# @PURPOSE: Verify datasets endpoint rejects page=0 with HTTP 400.
def test_get_datasets_pagination_zero_page(mock_deps):
"""@TEST_EDGE: pagination_zero_page on datasets"""
response = client.get("/api/datasets?env_id=env1&page=0")
assert response.status_code == 400
# [/DEF:test_get_datasets_pagination_zero_page:Function]
# [DEF:test_get_datasets_pagination_oversize:Function]
# @RELATION: BINDS_TO -> UnknownModule
# @RELATION: BINDS_TO -> test_pagination_boundaries
# @PURPOSE: Verify datasets endpoint rejects oversized page_size with HTTP 400.
def test_get_datasets_pagination_oversize(mock_deps):
"""@TEST_EDGE: pagination_oversize on datasets"""
response = client.get("/api/datasets?env_id=env1&page_size=101")
assert response.status_code == 400
# [/DEF:test_pagination_boundaries:Test]
# [/DEF:test_get_datasets_pagination_oversize:Function]
# [/DEF:test_pagination_boundaries:Test]
# [/DEF:TestResourceHubs:Module]