601 lines
24 KiB
Python
601 lines
24 KiB
Python
# [DEF:backend.src.scripts.clean_release_tui:Module]
|
|
# @TIER: STANDARD
|
|
# @SEMANTICS: clean-release, tui, ncurses, interactive-validator
|
|
# @PURPOSE: Interactive terminal interface for Enterprise Clean Release compliance validation.
|
|
# @LAYER: UI
|
|
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.compliance_orchestrator
|
|
# @RELATION: DEPENDS_ON -> backend.src.services.clean_release.repository
|
|
# @INVARIANT: TUI refuses startup in non-TTY environments; headless flow is CLI/API only.
|
|
|
|
import curses
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from types import SimpleNamespace
|
|
from typing import List, Optional, Any, Dict
|
|
|
|
# Standardize sys.path for direct execution from project root or scripts dir.
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
BACKEND_ROOT = os.path.abspath(os.path.join(SCRIPT_DIR, "..", ".."))
|
|
if BACKEND_ROOT not in sys.path:
|
|
sys.path.insert(0, BACKEND_ROOT)
|
|
|
|
from src.models.clean_release import (
|
|
CandidateArtifact,
|
|
CheckFinalStatus,
|
|
CheckStageName,
|
|
CheckStageStatus,
|
|
CleanProfilePolicy,
|
|
ComplianceViolation,
|
|
ProfileType,
|
|
ReleaseCandidate,
|
|
ResourceSourceEntry,
|
|
ResourceSourceRegistry,
|
|
RegistryStatus,
|
|
ReleaseCandidateStatus,
|
|
)
|
|
from src.services.clean_release.approval_service import approve_candidate
|
|
from src.services.clean_release.artifact_catalog_loader import load_bootstrap_artifacts
|
|
from src.services.clean_release.compliance_execution_service import ComplianceExecutionService
|
|
from src.services.clean_release.enums import CandidateStatus
|
|
from src.services.clean_release.manifest_service import build_manifest_snapshot
|
|
from src.services.clean_release.publication_service import publish_candidate
|
|
from src.services.clean_release.repository import CleanReleaseRepository
|
|
|
|
# [DEF:TuiFacadeAdapter:Class]
|
|
# @PURPOSE: Thin TUI adapter that routes business mutations through application services.
|
|
# @PRE: repository contains candidate and trusted policy/registry snapshots for execution.
|
|
# @POST: Business actions return service results/errors without direct TUI-owned mutations.
|
|
class TuiFacadeAdapter:
|
|
def __init__(self, repository: CleanReleaseRepository):
|
|
self.repository = repository
|
|
|
|
def _build_config_manager(self):
|
|
policy = self.repository.get_active_policy()
|
|
if policy is None:
|
|
raise ValueError("Active policy not found")
|
|
clean_release = SimpleNamespace(
|
|
active_policy_id=policy.id,
|
|
active_registry_id=policy.registry_snapshot_id,
|
|
)
|
|
settings = SimpleNamespace(clean_release=clean_release)
|
|
config = SimpleNamespace(settings=settings)
|
|
return SimpleNamespace(get_config=lambda: config)
|
|
|
|
def run_compliance(self, *, candidate_id: str, actor: str):
|
|
manifests = self.repository.get_manifests_by_candidate(candidate_id)
|
|
if not manifests:
|
|
raise ValueError("Manifest required before compliance run")
|
|
latest_manifest = sorted(manifests, key=lambda item: item.manifest_version, reverse=True)[0]
|
|
service = ComplianceExecutionService(
|
|
repository=self.repository,
|
|
config_manager=self._build_config_manager(),
|
|
)
|
|
return service.execute_run(candidate_id=candidate_id, requested_by=actor, manifest_id=latest_manifest.id)
|
|
|
|
def approve_latest(self, *, candidate_id: str, actor: str):
|
|
reports = [item for item in self.repository.reports.values() if item.candidate_id == candidate_id]
|
|
if not reports:
|
|
raise ValueError("No compliance report available for approval")
|
|
report = sorted(reports, key=lambda item: item.generated_at, reverse=True)[0]
|
|
return approve_candidate(
|
|
repository=self.repository,
|
|
candidate_id=candidate_id,
|
|
report_id=report.id,
|
|
decided_by=actor,
|
|
comment="Approved from TUI",
|
|
)
|
|
|
|
def publish_latest(self, *, candidate_id: str, actor: str):
|
|
reports = [item for item in self.repository.reports.values() if item.candidate_id == candidate_id]
|
|
if not reports:
|
|
raise ValueError("No compliance report available for publication")
|
|
report = sorted(reports, key=lambda item: item.generated_at, reverse=True)[0]
|
|
return publish_candidate(
|
|
repository=self.repository,
|
|
candidate_id=candidate_id,
|
|
report_id=report.id,
|
|
published_by=actor,
|
|
target_channel="stable",
|
|
publication_ref=None,
|
|
)
|
|
|
|
def build_manifest(self, *, candidate_id: str, actor: str):
|
|
return build_manifest_snapshot(
|
|
repository=self.repository,
|
|
candidate_id=candidate_id,
|
|
created_by=actor,
|
|
)
|
|
|
|
def get_overview(self, *, candidate_id: str) -> Dict[str, Any]:
|
|
candidate = self.repository.get_candidate(candidate_id)
|
|
manifests = self.repository.get_manifests_by_candidate(candidate_id)
|
|
latest_manifest = sorted(manifests, key=lambda item: item.manifest_version, reverse=True)[0] if manifests else None
|
|
runs = [item for item in self.repository.check_runs.values() if item.candidate_id == candidate_id]
|
|
latest_run = sorted(runs, key=lambda item: item.requested_at, reverse=True)[0] if runs else None
|
|
latest_report = next((item for item in self.repository.reports.values() if latest_run and item.run_id == latest_run.id), None)
|
|
approvals = getattr(self.repository, "approval_decisions", [])
|
|
latest_approval = sorted(
|
|
[item for item in approvals if item.candidate_id == candidate_id],
|
|
key=lambda item: item.decided_at,
|
|
reverse=True,
|
|
)[0] if any(item.candidate_id == candidate_id for item in approvals) else None
|
|
publications = getattr(self.repository, "publication_records", [])
|
|
latest_publication = sorted(
|
|
[item for item in publications if item.candidate_id == candidate_id],
|
|
key=lambda item: item.published_at,
|
|
reverse=True,
|
|
)[0] if any(item.candidate_id == candidate_id for item in publications) else None
|
|
policy = self.repository.get_active_policy()
|
|
registry = self.repository.get_registry(policy.internal_source_registry_ref) if policy else None
|
|
return {
|
|
"candidate": candidate,
|
|
"manifest": latest_manifest,
|
|
"run": latest_run,
|
|
"report": latest_report,
|
|
"approval": latest_approval,
|
|
"publication": latest_publication,
|
|
"policy": policy,
|
|
"registry": registry,
|
|
}
|
|
# [/DEF:TuiFacadeAdapter:Class]
|
|
|
|
|
|
# [DEF:CleanReleaseTUI:Class]
|
|
# @PURPOSE: Curses-based application for compliance monitoring.
|
|
# @UX_STATE: READY -> Waiting for operator to start checks (F5).
|
|
# @UX_STATE: RUNNING -> Executing compliance stages with progress feedback.
|
|
# @UX_STATE: COMPLIANT -> Release candidate passed all checks.
|
|
# @UX_STATE: BLOCKED -> Violations detected, release forbidden.
|
|
# @UX_FEEDBACK: Red alerts for BLOCKED status, Green for COMPLIANT.
|
|
class CleanReleaseTUI:
|
|
def __init__(self, stdscr: curses.window):
|
|
self.stdscr = stdscr
|
|
self.mode = os.getenv("CLEAN_TUI_MODE", "demo").strip().lower()
|
|
self.repo = self._build_repository(self.mode)
|
|
self.facade = TuiFacadeAdapter(self.repo)
|
|
self.candidate_id = self._resolve_candidate_id()
|
|
self.status: Any = "READY"
|
|
self.checks_progress: List[Dict[str, Any]] = []
|
|
self.violations_list: List[ComplianceViolation] = []
|
|
self.report_id: Optional[str] = None
|
|
self.last_error: Optional[str] = None
|
|
self.overview: Dict[str, Any] = {}
|
|
self.refresh_overview()
|
|
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLUE) # Header/Footer
|
|
curses.init_pair(2, curses.COLOR_GREEN, -1) # PASS
|
|
curses.init_pair(3, curses.COLOR_RED, -1) # FAIL/BLOCKED
|
|
curses.init_pair(4, curses.COLOR_YELLOW, -1) # RUNNING
|
|
curses.init_pair(5, curses.COLOR_CYAN, -1) # Text
|
|
|
|
def _build_repository(self, mode: str) -> CleanReleaseRepository:
|
|
repo = CleanReleaseRepository()
|
|
if mode == "demo":
|
|
self._bootstrap_demo_repository(repo)
|
|
else:
|
|
self._bootstrap_real_repository(repo)
|
|
return repo
|
|
|
|
def _bootstrap_demo_repository(self, repository: CleanReleaseRepository) -> None:
|
|
now = datetime.now(timezone.utc)
|
|
policy = CleanProfilePolicy(
|
|
policy_id="POL-ENT-CLEAN",
|
|
policy_version="1",
|
|
profile=ProfileType.ENTERPRISE_CLEAN,
|
|
active=True,
|
|
internal_source_registry_ref="REG-1",
|
|
prohibited_artifact_categories=["test-data"],
|
|
effective_from=now,
|
|
)
|
|
setattr(policy, "immutable", True)
|
|
repository.save_policy(policy)
|
|
|
|
registry = ResourceSourceRegistry(
|
|
registry_id="REG-1",
|
|
name="Default Internal Registry",
|
|
entries=[
|
|
ResourceSourceEntry(
|
|
source_id="S1",
|
|
host="internal-repo.company.com",
|
|
protocol="https",
|
|
purpose="artifactory",
|
|
)
|
|
],
|
|
updated_at=now,
|
|
updated_by="system",
|
|
)
|
|
setattr(registry, "immutable", True)
|
|
setattr(registry, "allowed_hosts", ["internal-repo.company.com"])
|
|
setattr(registry, "allowed_schemes", ["https"])
|
|
setattr(registry, "allowed_source_types", ["artifactory"])
|
|
repository.save_registry(registry)
|
|
candidate = ReleaseCandidate(
|
|
id="2026.03.03-rc1",
|
|
version="1.0.0",
|
|
source_snapshot_ref="v1.0.0-rc1",
|
|
created_at=now,
|
|
created_by="system",
|
|
status=CandidateStatus.DRAFT.value,
|
|
)
|
|
candidate.transition_to(CandidateStatus.PREPARED)
|
|
repository.save_candidate(candidate)
|
|
repository.save_artifact(
|
|
CandidateArtifact(
|
|
id="demo-art-1",
|
|
candidate_id=candidate.id,
|
|
path="src/main.py",
|
|
sha256="sha256-demo-core",
|
|
size=128,
|
|
detected_category="core",
|
|
)
|
|
)
|
|
repository.save_artifact(
|
|
CandidateArtifact(
|
|
id="demo-art-2",
|
|
candidate_id=candidate.id,
|
|
path="test/data.csv",
|
|
sha256="sha256-demo-test",
|
|
size=64,
|
|
detected_category="test-data",
|
|
)
|
|
)
|
|
manifest = build_manifest_snapshot(
|
|
repository=repository,
|
|
candidate_id=candidate.id,
|
|
created_by="system",
|
|
policy_id="POL-ENT-CLEAN",
|
|
)
|
|
summary = dict(manifest.content_json.get("summary", {}))
|
|
summary["prohibited_detected_count"] = 1
|
|
manifest.content_json["summary"] = summary
|
|
|
|
def _bootstrap_real_repository(self, repository: CleanReleaseRepository) -> None:
|
|
bootstrap_path = os.getenv("CLEAN_TUI_BOOTSTRAP_JSON", "").strip()
|
|
if not bootstrap_path:
|
|
return
|
|
|
|
with open(bootstrap_path, "r", encoding="utf-8") as bootstrap_file:
|
|
payload = json.load(bootstrap_file)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
candidate = ReleaseCandidate(
|
|
id=payload.get("candidate_id", "candidate-1"),
|
|
version=payload.get("version", "1.0.0"),
|
|
source_snapshot_ref=payload.get("source_snapshot_ref", "snapshot-ref"),
|
|
created_at=now,
|
|
created_by=payload.get("created_by", "operator"),
|
|
status=ReleaseCandidateStatus.DRAFT,
|
|
)
|
|
repository.save_candidate(candidate)
|
|
imported_artifacts = load_bootstrap_artifacts(
|
|
os.getenv("CLEAN_TUI_ARTIFACTS_JSON", "").strip(),
|
|
candidate.id,
|
|
)
|
|
for artifact in imported_artifacts:
|
|
repository.save_artifact(artifact)
|
|
if imported_artifacts:
|
|
candidate.transition_to(CandidateStatus.PREPARED)
|
|
repository.save_candidate(candidate)
|
|
|
|
registry_id = payload.get("registry_id", "REG-1")
|
|
entries = [
|
|
ResourceSourceEntry(
|
|
source_id=f"S-{index + 1}",
|
|
host=host,
|
|
protocol="https",
|
|
purpose="bootstrap",
|
|
enabled=True,
|
|
)
|
|
for index, host in enumerate(payload.get("allowed_hosts", []))
|
|
if str(host).strip()
|
|
]
|
|
if entries:
|
|
repository.save_registry(
|
|
ResourceSourceRegistry(
|
|
registry_id=registry_id,
|
|
name=payload.get("registry_name", "Bootstrap Internal Registry"),
|
|
entries=entries,
|
|
updated_at=now,
|
|
updated_by=payload.get("created_by", "operator"),
|
|
status=RegistryStatus.ACTIVE,
|
|
)
|
|
)
|
|
|
|
if entries:
|
|
repository.save_policy(
|
|
CleanProfilePolicy(
|
|
policy_id=payload.get("policy_id", "POL-ENT-CLEAN"),
|
|
policy_version=payload.get("policy_version", "1"),
|
|
profile=ProfileType.ENTERPRISE_CLEAN,
|
|
active=True,
|
|
internal_source_registry_ref=registry_id,
|
|
prohibited_artifact_categories=payload.get(
|
|
"prohibited_artifact_categories",
|
|
["test-data", "demo", "load-test"],
|
|
),
|
|
required_system_categories=payload.get("required_system_categories", ["core"]),
|
|
effective_from=now,
|
|
)
|
|
)
|
|
|
|
def _resolve_candidate_id(self) -> str:
|
|
env_candidate = os.getenv("CLEAN_TUI_CANDIDATE_ID", "").strip()
|
|
if env_candidate:
|
|
return env_candidate
|
|
|
|
candidate_ids = list(self.repo.candidates.keys())
|
|
if candidate_ids:
|
|
return candidate_ids[0]
|
|
return ""
|
|
|
|
def draw_header(self, max_y: int, max_x: int):
|
|
header_text = " Enterprise Clean Release Validator (TUI) "
|
|
self.stdscr.attron(curses.color_pair(1) | curses.A_BOLD)
|
|
# Avoid slicing if possible to satisfy Pyre, or use explicit int
|
|
centered = header_text.center(max_x)
|
|
self.stdscr.addstr(0, 0, centered[:max_x])
|
|
self.stdscr.attroff(curses.color_pair(1) | curses.A_BOLD)
|
|
|
|
candidate = self.overview.get("candidate")
|
|
candidate_text = self.candidate_id or "not-set"
|
|
profile_text = "enterprise-clean"
|
|
lifecycle = getattr(candidate, "status", "UNKNOWN")
|
|
info_line_text = (
|
|
f" │ Candidate: [{candidate_text}] Profile: [{profile_text}] "
|
|
f"Lifecycle: [{lifecycle}] Mode: [{self.mode}]"
|
|
).ljust(max_x)
|
|
self.stdscr.addstr(2, 0, info_line_text[:max_x])
|
|
|
|
def draw_checks(self):
|
|
self.stdscr.addstr(4, 3, "Checks:")
|
|
check_defs = [
|
|
(CheckStageName.DATA_PURITY, "Data Purity (no test/demo payloads)"),
|
|
(CheckStageName.INTERNAL_SOURCES_ONLY, "Internal Sources Only (company servers)"),
|
|
(CheckStageName.NO_EXTERNAL_ENDPOINTS, "No External Internet Endpoints"),
|
|
(CheckStageName.MANIFEST_CONSISTENCY, "Release Manifest Consistency"),
|
|
]
|
|
|
|
row = 5
|
|
drawn_checks = {c["stage"]: c for c in self.checks_progress}
|
|
|
|
for stage, desc in check_defs:
|
|
status_text = " "
|
|
color = curses.color_pair(5)
|
|
|
|
if stage in drawn_checks:
|
|
c = drawn_checks[stage]
|
|
if c["status"] == "RUNNING":
|
|
status_text = "..."
|
|
color = curses.color_pair(4)
|
|
elif c["status"] == CheckStageStatus.PASS:
|
|
status_text = "PASS"
|
|
color = curses.color_pair(2)
|
|
elif c["status"] == CheckStageStatus.FAIL:
|
|
status_text = "FAIL"
|
|
color = curses.color_pair(3)
|
|
|
|
self.stdscr.addstr(row, 4, f"[{status_text:^4}] {desc}")
|
|
if status_text != " ":
|
|
self.stdscr.addstr(row, 50, f"{status_text:>10}", color | curses.A_BOLD)
|
|
row += 1
|
|
|
|
def draw_sources(self):
|
|
self.stdscr.addstr(12, 3, "Allowed Internal Sources:", curses.A_BOLD)
|
|
reg = self.overview.get("registry")
|
|
row = 13
|
|
if reg:
|
|
for entry in reg.entries:
|
|
self.stdscr.addstr(row, 3, f" - {entry.host}")
|
|
row += 1
|
|
else:
|
|
self.stdscr.addstr(row, 3, " - (none)")
|
|
|
|
def draw_status(self):
|
|
color = curses.color_pair(5)
|
|
if self.status == CheckFinalStatus.COMPLIANT: color = curses.color_pair(2)
|
|
elif self.status == CheckFinalStatus.BLOCKED: color = curses.color_pair(3)
|
|
|
|
stat_str = str(self.status.value if hasattr(self.status, "value") else self.status)
|
|
self.stdscr.addstr(18, 3, f"FINAL STATUS: {stat_str.upper()}", color | curses.A_BOLD)
|
|
|
|
if self.report_id:
|
|
self.stdscr.addstr(19, 3, f"Report ID: {self.report_id}")
|
|
|
|
approval = self.overview.get("approval")
|
|
publication = self.overview.get("publication")
|
|
if approval:
|
|
self.stdscr.addstr(20, 3, f"Approval: {approval.decision}")
|
|
if publication:
|
|
self.stdscr.addstr(20, 32, f"Publication: {publication.status}")
|
|
|
|
if self.violations_list:
|
|
self.stdscr.addstr(21, 3, f"Violations Details ({len(self.violations_list)} total):", curses.color_pair(3) | curses.A_BOLD)
|
|
row = 22
|
|
for i, v in enumerate(self.violations_list[:5]):
|
|
v_cat = str(getattr(v, "code", "VIOLATION"))
|
|
msg = str(getattr(v, "message", "Violation detected"))
|
|
location = str(
|
|
getattr(v, "artifact_path", "")
|
|
or getattr(getattr(v, "evidence_json", {}), "get", lambda *_: "")("location", "")
|
|
)
|
|
msg_text = f"[{v_cat}] {msg} (Loc: {location})"
|
|
self.stdscr.addstr(row + i, 5, msg_text[:70], curses.color_pair(3))
|
|
if self.last_error:
|
|
self.stdscr.addstr(27, 3, f"Error: {self.last_error}"[:100], curses.color_pair(3) | curses.A_BOLD)
|
|
|
|
def draw_footer(self, max_y: int, max_x: int):
|
|
footer_text = " F5 Run F6 Manifest F7 Refresh F8 Approve F9 Publish F10 Exit ".center(max_x)
|
|
self.stdscr.attron(curses.color_pair(1))
|
|
self.stdscr.addstr(max_y - 1, 0, footer_text[:max_x])
|
|
self.stdscr.attroff(curses.color_pair(1))
|
|
|
|
# [DEF:run_checks:Function]
|
|
# @PURPOSE: Execute compliance run via facade adapter and update UI state.
|
|
# @PRE: Candidate and policy snapshots are present in repository.
|
|
# @POST: UI reflects final run/report/violation state from service result.
|
|
def run_checks(self):
|
|
self.status = "RUNNING"
|
|
self.report_id = None
|
|
self.violations_list = []
|
|
self.checks_progress = []
|
|
self.last_error = None
|
|
self.refresh_screen()
|
|
|
|
try:
|
|
result = self.facade.run_compliance(candidate_id=self.candidate_id, actor="operator")
|
|
except Exception as exc: # noqa: BLE001
|
|
self.status = CheckFinalStatus.FAILED
|
|
self.last_error = str(exc)
|
|
self.refresh_screen()
|
|
return
|
|
|
|
self.checks_progress = [
|
|
{
|
|
"stage": stage.stage_name,
|
|
"status": CheckStageStatus.PASS if str(stage.decision).upper() == "PASSED" else CheckStageStatus.FAIL,
|
|
}
|
|
for stage in result.stage_runs
|
|
]
|
|
self.violations_list = result.violations
|
|
self.report_id = result.report.id if result.report is not None else None
|
|
|
|
final_status = str(result.run.final_status or "").upper()
|
|
if final_status in {"BLOCKED", CheckFinalStatus.BLOCKED.value}:
|
|
self.status = CheckFinalStatus.BLOCKED
|
|
elif final_status in {"COMPLIANT", "PASSED", CheckFinalStatus.COMPLIANT.value}:
|
|
self.status = CheckFinalStatus.COMPLIANT
|
|
else:
|
|
self.status = CheckFinalStatus.FAILED
|
|
self.refresh_overview()
|
|
self.refresh_screen()
|
|
# [/DEF:run_checks:Function]
|
|
|
|
def build_manifest(self):
|
|
try:
|
|
manifest = self.facade.build_manifest(candidate_id=self.candidate_id, actor="operator")
|
|
self.status = "READY"
|
|
self.report_id = None
|
|
self.violations_list = []
|
|
self.checks_progress = []
|
|
self.last_error = f"Manifest built: {manifest.id}"
|
|
except Exception as exc: # noqa: BLE001
|
|
self.last_error = str(exc)
|
|
self.refresh_overview()
|
|
self.refresh_screen()
|
|
|
|
def clear_history(self):
|
|
self.status = "READY"
|
|
self.report_id = None
|
|
self.violations_list = []
|
|
self.checks_progress = []
|
|
self.last_error = None
|
|
self.refresh_overview()
|
|
self.refresh_screen()
|
|
|
|
def approve_latest(self):
|
|
if not self.report_id:
|
|
self.last_error = "F8 disabled: no compliance report available"
|
|
self.refresh_screen()
|
|
return
|
|
try:
|
|
self.facade.approve_latest(candidate_id=self.candidate_id, actor="operator")
|
|
self.last_error = None
|
|
except Exception as exc: # noqa: BLE001
|
|
self.last_error = str(exc)
|
|
self.refresh_overview()
|
|
self.refresh_screen()
|
|
|
|
def publish_latest(self):
|
|
if not self.report_id:
|
|
self.last_error = "F9 disabled: no compliance report available"
|
|
self.refresh_screen()
|
|
return
|
|
try:
|
|
self.facade.publish_latest(candidate_id=self.candidate_id, actor="operator")
|
|
self.last_error = None
|
|
except Exception as exc: # noqa: BLE001
|
|
self.last_error = str(exc)
|
|
self.refresh_overview()
|
|
self.refresh_screen()
|
|
|
|
def refresh_overview(self):
|
|
if not self.report_id:
|
|
self.last_error = "F9 disabled: no compliance report available"
|
|
self.refresh_screen()
|
|
return
|
|
try:
|
|
self.facade.publish_latest(candidate_id=self.candidate_id, actor="operator")
|
|
self.last_error = None
|
|
except Exception as exc: # noqa: BLE001
|
|
self.last_error = str(exc)
|
|
self.refresh_overview()
|
|
self.refresh_screen()
|
|
|
|
def refresh_overview(self):
|
|
if not self.candidate_id:
|
|
self.overview = {}
|
|
return
|
|
self.overview = self.facade.get_overview(candidate_id=self.candidate_id)
|
|
|
|
def refresh_screen(self):
|
|
max_y, max_x = self.stdscr.getmaxyx()
|
|
self.stdscr.clear()
|
|
try:
|
|
self.draw_header(max_y, max_x)
|
|
self.draw_checks()
|
|
self.draw_sources()
|
|
self.draw_status()
|
|
self.draw_footer(max_y, max_x)
|
|
except Exception:
|
|
pass
|
|
self.stdscr.refresh()
|
|
|
|
def loop(self):
|
|
self.refresh_screen()
|
|
while True:
|
|
char = self.stdscr.getch()
|
|
if char == curses.KEY_F10:
|
|
break
|
|
elif char == curses.KEY_F5:
|
|
self.run_checks()
|
|
elif char == curses.KEY_F6:
|
|
self.build_manifest()
|
|
elif char == curses.KEY_F7:
|
|
self.clear_history()
|
|
elif char == curses.KEY_F8:
|
|
self.approve_latest()
|
|
elif char == curses.KEY_F9:
|
|
self.publish_latest()
|
|
# [/DEF:CleanReleaseTUI:Class]
|
|
|
|
|
|
def tui_main(stdscr: curses.window):
|
|
curses.curs_set(0) # Hide cursor
|
|
app = CleanReleaseTUI(stdscr)
|
|
app.loop()
|
|
|
|
|
|
def main() -> int:
|
|
# TUI requires interactive terminal; headless mode must use CLI/API flow.
|
|
if not sys.stdout.isatty():
|
|
print(
|
|
"TTY is required for TUI mode. Use CLI/API workflow instead.",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
try:
|
|
curses.wrapper(tui_main)
|
|
return 0
|
|
except Exception as e:
|
|
print(f"Error starting TUI: {e}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|
|
# [/DEF:backend.src.scripts.clean_release_tui:Module]
|