{ "verdict": "APPROVED", "rejection_reason": "NONE", "audit_details": { "target_invoked": true, "pre_conditions_tested": true, "post_conditions_tested": true, "test_data_used": true }, "feedback": "Both test files have successfully passed the audit. The 'task_log_viewer.test.js' suite now correctly imports and mounts the real Svelte component using Test Library, fully eliminating the logic mirror/tautology issue. The 'test_logger.py' suite now properly implements negative tests for the @PRE constraint in 'belief_scope' and fully verifies all @POST effects triggered by 'configure_logger'." }
This commit is contained in:
235
backend/src/models/__tests__/test_report_models.py
Normal file
235
backend/src/models/__tests__/test_report_models.py
Normal file
@@ -0,0 +1,235 @@
|
||||
# [DEF:test_report_models:Module]
|
||||
# @TIER: CRITICAL
|
||||
# @PURPOSE: Unit tests for report Pydantic models and their validators
|
||||
# @LAYER: Domain
|
||||
# @RELATION: TESTS -> backend.src.models.report
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
class TestTaskType:
|
||||
"""Tests for the TaskType enum."""
|
||||
|
||||
def test_enum_values(self):
|
||||
from src.models.report import TaskType
|
||||
assert TaskType.LLM_VERIFICATION == "llm_verification"
|
||||
assert TaskType.BACKUP == "backup"
|
||||
assert TaskType.MIGRATION == "migration"
|
||||
assert TaskType.DOCUMENTATION == "documentation"
|
||||
assert TaskType.UNKNOWN == "unknown"
|
||||
|
||||
|
||||
class TestReportStatus:
|
||||
"""Tests for the ReportStatus enum."""
|
||||
|
||||
def test_enum_values(self):
|
||||
from src.models.report import ReportStatus
|
||||
assert ReportStatus.SUCCESS == "success"
|
||||
assert ReportStatus.FAILED == "failed"
|
||||
assert ReportStatus.IN_PROGRESS == "in_progress"
|
||||
assert ReportStatus.PARTIAL == "partial"
|
||||
|
||||
|
||||
class TestErrorContext:
|
||||
"""Tests for ErrorContext model."""
|
||||
|
||||
def test_valid_creation(self):
|
||||
from src.models.report import ErrorContext
|
||||
ctx = ErrorContext(message="Something failed", code="ERR_001", next_actions=["Retry"])
|
||||
assert ctx.message == "Something failed"
|
||||
assert ctx.code == "ERR_001"
|
||||
assert ctx.next_actions == ["Retry"]
|
||||
|
||||
def test_minimal_creation(self):
|
||||
from src.models.report import ErrorContext
|
||||
ctx = ErrorContext(message="Error occurred")
|
||||
assert ctx.code is None
|
||||
assert ctx.next_actions == []
|
||||
|
||||
|
||||
class TestTaskReport:
|
||||
"""Tests for TaskReport model and its validators."""
|
||||
|
||||
def _make_report(self, **overrides):
|
||||
from src.models.report import TaskReport, TaskType, ReportStatus
|
||||
defaults = {
|
||||
"report_id": "rpt-001",
|
||||
"task_id": "task-001",
|
||||
"task_type": TaskType.BACKUP,
|
||||
"status": ReportStatus.SUCCESS,
|
||||
"updated_at": datetime(2024, 1, 15, 12, 0, 0),
|
||||
"summary": "Backup completed",
|
||||
}
|
||||
defaults.update(overrides)
|
||||
return TaskReport(**defaults)
|
||||
|
||||
def test_valid_creation(self):
|
||||
report = self._make_report()
|
||||
assert report.report_id == "rpt-001"
|
||||
assert report.task_id == "task-001"
|
||||
assert report.summary == "Backup completed"
|
||||
|
||||
def test_empty_report_id_raises(self):
|
||||
with pytest.raises(ValueError, match="non-empty"):
|
||||
self._make_report(report_id="")
|
||||
|
||||
def test_whitespace_report_id_raises(self):
|
||||
with pytest.raises(ValueError, match="non-empty"):
|
||||
self._make_report(report_id=" ")
|
||||
|
||||
def test_empty_task_id_raises(self):
|
||||
with pytest.raises(ValueError, match="non-empty"):
|
||||
self._make_report(task_id="")
|
||||
|
||||
def test_empty_summary_raises(self):
|
||||
with pytest.raises(ValueError, match="non-empty"):
|
||||
self._make_report(summary="")
|
||||
|
||||
def test_summary_whitespace_trimmed(self):
|
||||
report = self._make_report(summary=" Trimmed ")
|
||||
assert report.summary == "Trimmed"
|
||||
|
||||
def test_optional_fields(self):
|
||||
report = self._make_report()
|
||||
assert report.started_at is None
|
||||
assert report.details is None
|
||||
assert report.error_context is None
|
||||
assert report.source_ref is None
|
||||
|
||||
def test_with_error_context(self):
|
||||
from src.models.report import ErrorContext
|
||||
ctx = ErrorContext(message="Connection failed")
|
||||
report = self._make_report(error_context=ctx)
|
||||
assert report.error_context.message == "Connection failed"
|
||||
|
||||
|
||||
class TestReportQuery:
|
||||
"""Tests for ReportQuery model and its validators."""
|
||||
|
||||
def test_defaults(self):
|
||||
from src.models.report import ReportQuery
|
||||
q = ReportQuery()
|
||||
assert q.page == 1
|
||||
assert q.page_size == 20
|
||||
assert q.task_types == []
|
||||
assert q.statuses == []
|
||||
assert q.sort_by == "updated_at"
|
||||
assert q.sort_order == "desc"
|
||||
|
||||
def test_invalid_sort_by_raises(self):
|
||||
from src.models.report import ReportQuery
|
||||
with pytest.raises(ValueError, match="sort_by"):
|
||||
ReportQuery(sort_by="invalid_field")
|
||||
|
||||
def test_valid_sort_by_values(self):
|
||||
from src.models.report import ReportQuery
|
||||
for field in ["updated_at", "status", "task_type"]:
|
||||
q = ReportQuery(sort_by=field)
|
||||
assert q.sort_by == field
|
||||
|
||||
def test_invalid_sort_order_raises(self):
|
||||
from src.models.report import ReportQuery
|
||||
with pytest.raises(ValueError, match="sort_order"):
|
||||
ReportQuery(sort_order="invalid")
|
||||
|
||||
def test_valid_sort_order_values(self):
|
||||
from src.models.report import ReportQuery
|
||||
for order in ["asc", "desc"]:
|
||||
q = ReportQuery(sort_order=order)
|
||||
assert q.sort_order == order
|
||||
|
||||
def test_time_range_validation_valid(self):
|
||||
from src.models.report import ReportQuery
|
||||
now = datetime.utcnow()
|
||||
q = ReportQuery(time_from=now - timedelta(days=1), time_to=now)
|
||||
assert q.time_from < q.time_to
|
||||
|
||||
def test_time_range_validation_invalid(self):
|
||||
from src.models.report import ReportQuery
|
||||
now = datetime.utcnow()
|
||||
with pytest.raises(ValueError, match="time_from"):
|
||||
ReportQuery(time_from=now, time_to=now - timedelta(days=1))
|
||||
|
||||
def test_page_ge_1(self):
|
||||
from src.models.report import ReportQuery
|
||||
with pytest.raises(ValueError):
|
||||
ReportQuery(page=0)
|
||||
|
||||
def test_page_size_bounds(self):
|
||||
from src.models.report import ReportQuery
|
||||
with pytest.raises(ValueError):
|
||||
ReportQuery(page_size=0)
|
||||
with pytest.raises(ValueError):
|
||||
ReportQuery(page_size=101)
|
||||
|
||||
|
||||
class TestReportCollection:
|
||||
"""Tests for ReportCollection model."""
|
||||
|
||||
def test_valid_creation(self):
|
||||
from src.models.report import ReportCollection, ReportQuery
|
||||
col = ReportCollection(
|
||||
items=[],
|
||||
total=0,
|
||||
page=1,
|
||||
page_size=20,
|
||||
has_next=False,
|
||||
applied_filters=ReportQuery(),
|
||||
)
|
||||
assert col.total == 0
|
||||
assert col.has_next is False
|
||||
|
||||
def test_with_items(self):
|
||||
from src.models.report import ReportCollection, ReportQuery, TaskReport, TaskType, ReportStatus
|
||||
report = TaskReport(
|
||||
report_id="r1", task_id="t1", task_type=TaskType.BACKUP,
|
||||
status=ReportStatus.SUCCESS, updated_at=datetime.utcnow(),
|
||||
summary="OK"
|
||||
)
|
||||
col = ReportCollection(
|
||||
items=[report], total=1, page=1, page_size=20,
|
||||
has_next=False, applied_filters=ReportQuery()
|
||||
)
|
||||
assert len(col.items) == 1
|
||||
assert col.items[0].report_id == "r1"
|
||||
|
||||
|
||||
class TestReportDetailView:
|
||||
"""Tests for ReportDetailView model."""
|
||||
|
||||
def test_valid_creation(self):
|
||||
from src.models.report import ReportDetailView, TaskReport, TaskType, ReportStatus
|
||||
report = TaskReport(
|
||||
report_id="r1", task_id="t1", task_type=TaskType.BACKUP,
|
||||
status=ReportStatus.SUCCESS, updated_at=datetime.utcnow(),
|
||||
summary="Backup OK"
|
||||
)
|
||||
detail = ReportDetailView(report=report)
|
||||
assert detail.report.report_id == "r1"
|
||||
assert detail.timeline == []
|
||||
assert detail.diagnostics is None
|
||||
assert detail.next_actions == []
|
||||
|
||||
def test_with_all_fields(self):
|
||||
from src.models.report import ReportDetailView, TaskReport, TaskType, ReportStatus
|
||||
report = TaskReport(
|
||||
report_id="r1", task_id="t1", task_type=TaskType.MIGRATION,
|
||||
status=ReportStatus.FAILED, updated_at=datetime.utcnow(),
|
||||
summary="Migration failed"
|
||||
)
|
||||
detail = ReportDetailView(
|
||||
report=report,
|
||||
timeline=[{"event": "started", "at": "2024-01-01T00:00:00"}],
|
||||
diagnostics={"cause": "timeout"},
|
||||
next_actions=["Retry", "Check connection"],
|
||||
)
|
||||
assert len(detail.timeline) == 1
|
||||
assert detail.diagnostics["cause"] == "timeout"
|
||||
assert "Retry" in detail.next_actions
|
||||
|
||||
# [/DEF:test_report_models:Module]
|
||||
Reference in New Issue
Block a user