# [DEF:backend.src.scripts.clean_release_cli:Module] # @TIER: STANDARD # @SEMANTICS: cli, clean-release, candidate, artifacts, manifest # @PURPOSE: Provide headless CLI commands for candidate registration, artifact import and manifest build. # @LAYER: Scripts from __future__ import annotations import argparse import json from datetime import date, datetime, timezone from typing import Any, Dict, List, Optional from ..models.clean_release import CandidateArtifact, ReleaseCandidate from ..services.clean_release.approval_service import approve_candidate, reject_candidate from ..services.clean_release.compliance_execution_service import ComplianceExecutionService from ..services.clean_release.enums import CandidateStatus from ..services.clean_release.publication_service import publish_candidate, revoke_publication # [DEF:build_parser:Function] # @PURPOSE: Build argparse parser for clean release CLI. def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="clean-release-cli") subparsers = parser.add_subparsers(dest="command", required=True) register = subparsers.add_parser("candidate-register") register.add_argument("--candidate-id", required=True) register.add_argument("--version", required=True) register.add_argument("--source-snapshot-ref", required=True) register.add_argument("--created-by", default="cli-operator") artifact_import = subparsers.add_parser("artifact-import") artifact_import.add_argument("--candidate-id", required=True) artifact_import.add_argument("--artifact-id", required=True) artifact_import.add_argument("--path", required=True) artifact_import.add_argument("--sha256", required=True) artifact_import.add_argument("--size", type=int, required=True) manifest_build = subparsers.add_parser("manifest-build") manifest_build.add_argument("--candidate-id", required=True) manifest_build.add_argument("--created-by", default="cli-operator") compliance_run = subparsers.add_parser("compliance-run") compliance_run.add_argument("--candidate-id", required=True) compliance_run.add_argument("--manifest-id", required=False, default=None) compliance_run.add_argument("--actor", default="cli-operator") compliance_run.add_argument("--json", action="store_true") compliance_status = subparsers.add_parser("compliance-status") compliance_status.add_argument("--run-id", required=True) compliance_status.add_argument("--json", action="store_true") compliance_report = subparsers.add_parser("compliance-report") compliance_report.add_argument("--run-id", required=True) compliance_report.add_argument("--json", action="store_true") compliance_violations = subparsers.add_parser("compliance-violations") compliance_violations.add_argument("--run-id", required=True) compliance_violations.add_argument("--json", action="store_true") approve = subparsers.add_parser("approve") approve.add_argument("--candidate-id", required=True) approve.add_argument("--report-id", required=True) approve.add_argument("--actor", default="cli-operator") approve.add_argument("--comment", required=False, default=None) approve.add_argument("--json", action="store_true") reject = subparsers.add_parser("reject") reject.add_argument("--candidate-id", required=True) reject.add_argument("--report-id", required=True) reject.add_argument("--actor", default="cli-operator") reject.add_argument("--comment", required=False, default=None) reject.add_argument("--json", action="store_true") publish = subparsers.add_parser("publish") publish.add_argument("--candidate-id", required=True) publish.add_argument("--report-id", required=True) publish.add_argument("--actor", default="cli-operator") publish.add_argument("--target-channel", required=True) publish.add_argument("--publication-ref", required=False, default=None) publish.add_argument("--json", action="store_true") revoke = subparsers.add_parser("revoke") revoke.add_argument("--publication-id", required=True) revoke.add_argument("--actor", default="cli-operator") revoke.add_argument("--comment", required=False, default=None) revoke.add_argument("--json", action="store_true") return parser # [/DEF:build_parser:Function] # [DEF:run_candidate_register:Function] # @PURPOSE: Register candidate in repository via CLI command. # @PRE: Candidate ID must be unique. # @POST: Candidate is persisted in DRAFT status. def run_candidate_register(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() existing = repository.get_candidate(args.candidate_id) if existing is not None: print(json.dumps({"status": "error", "message": "candidate already exists"})) return 1 candidate = ReleaseCandidate( id=args.candidate_id, version=args.version, source_snapshot_ref=args.source_snapshot_ref, created_by=args.created_by, created_at=datetime.now(timezone.utc), status=CandidateStatus.DRAFT.value, ) repository.save_candidate(candidate) print(json.dumps({"status": "ok", "candidate_id": candidate.id})) return 0 # [/DEF:run_candidate_register:Function] # [DEF:run_artifact_import:Function] # @PURPOSE: Import single artifact for existing candidate. # @PRE: Candidate must exist. # @POST: Artifact is persisted for candidate. def run_artifact_import(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() candidate = repository.get_candidate(args.candidate_id) if candidate is None: print(json.dumps({"status": "error", "message": "candidate not found"})) return 1 artifact = CandidateArtifact( id=args.artifact_id, candidate_id=args.candidate_id, path=args.path, sha256=args.sha256, size=args.size, ) repository.save_artifact(artifact) if candidate.status == CandidateStatus.DRAFT.value: candidate.transition_to(CandidateStatus.PREPARED) repository.save_candidate(candidate) print(json.dumps({"status": "ok", "artifact_id": artifact.id})) return 0 # [/DEF:run_artifact_import:Function] # [DEF:run_manifest_build:Function] # @PURPOSE: Build immutable manifest snapshot for candidate. # @PRE: Candidate must exist. # @POST: New manifest version is persisted. def run_manifest_build(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository from ..services.clean_release.manifest_service import build_manifest_snapshot repository = get_clean_release_repository() try: manifest = build_manifest_snapshot( repository=repository, candidate_id=args.candidate_id, created_by=args.created_by, ) except ValueError as exc: print(json.dumps({"status": "error", "message": str(exc)})) return 1 print(json.dumps({"status": "ok", "manifest_id": manifest.id, "version": manifest.manifest_version})) return 0 # [/DEF:run_manifest_build:Function] # [DEF:run_compliance_run:Function] # @PURPOSE: Execute compliance run for candidate with optional manifest fallback. # @PRE: Candidate exists and trusted snapshots are configured. # @POST: Returns run payload and exit code 0 on success. def run_compliance_run(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository, get_config_manager repository = get_clean_release_repository() config_manager = get_config_manager() service = ComplianceExecutionService(repository=repository, config_manager=config_manager) try: result = service.execute_run( candidate_id=args.candidate_id, requested_by=args.actor, manifest_id=args.manifest_id, ) except Exception as exc: # noqa: BLE001 print(json.dumps({"status": "error", "message": str(exc)})) return 2 payload = { "status": "ok", "run_id": result.run.id, "candidate_id": result.run.candidate_id, "run_status": result.run.status, "final_status": result.run.final_status, "task_id": getattr(result.run, "task_id", None), "report_id": getattr(result.run, "report_id", None), } print(json.dumps(payload)) return 0 # [/DEF:run_compliance_run:Function] # [DEF:run_compliance_status:Function] # @PURPOSE: Read run status by run id. # @PRE: Run exists. # @POST: Returns run status payload. def run_compliance_status(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() run = repository.get_check_run(args.run_id) if run is None: print(json.dumps({"status": "error", "message": "run not found"})) return 2 report = next((item for item in repository.reports.values() if item.run_id == run.id), None) payload = { "status": "ok", "run_id": run.id, "candidate_id": run.candidate_id, "run_status": run.status, "final_status": run.final_status, "task_id": getattr(run, "task_id", None), "report_id": getattr(run, "report_id", None) or (report.id if report else None), } print(json.dumps(payload)) return 0 # [/DEF:run_compliance_status:Function] # [DEF:_to_payload:Function] # @PURPOSE: Serialize domain models for CLI JSON output across SQLAlchemy/Pydantic variants. # @PRE: value is serializable model or primitive object. # @POST: Returns dictionary payload without mutating value. def _to_payload(value: Any) -> Dict[str, Any]: def _normalize(raw: Any) -> Any: if isinstance(raw, datetime): return raw.isoformat() if isinstance(raw, date): return raw.isoformat() if isinstance(raw, dict): return {str(key): _normalize(item) for key, item in raw.items()} if isinstance(raw, list): return [_normalize(item) for item in raw] if isinstance(raw, tuple): return [_normalize(item) for item in raw] return raw if hasattr(value, "model_dump"): return _normalize(value.model_dump()) table = getattr(value, "__table__", None) if table is not None: row = {column.name: getattr(value, column.name) for column in table.columns} return _normalize(row) raise TypeError(f"unsupported payload type: {type(value)!r}") # [/DEF:_to_payload:Function] # [DEF:run_compliance_report:Function] # @PURPOSE: Read immutable report by run id. # @PRE: Run and report exist. # @POST: Returns report payload. def run_compliance_report(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() run = repository.get_check_run(args.run_id) if run is None: print(json.dumps({"status": "error", "message": "run not found"})) return 2 report = next((item for item in repository.reports.values() if item.run_id == run.id), None) if report is None: print(json.dumps({"status": "error", "message": "report not found"})) return 2 print(json.dumps({"status": "ok", "report": _to_payload(report)})) return 0 # [/DEF:run_compliance_report:Function] # [DEF:run_compliance_violations:Function] # @PURPOSE: Read run violations by run id. # @PRE: Run exists. # @POST: Returns violations payload. def run_compliance_violations(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() run = repository.get_check_run(args.run_id) if run is None: print(json.dumps({"status": "error", "message": "run not found"})) return 2 violations = repository.get_violations_by_run(args.run_id) print(json.dumps({"status": "ok", "items": [_to_payload(item) for item in violations]})) return 0 # [/DEF:run_compliance_violations:Function] # [DEF:run_approve:Function] # @PURPOSE: Approve candidate based on immutable PASSED report. # @PRE: Candidate and report exist; report is PASSED. # @POST: Persists APPROVED decision and returns success payload. def run_approve(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() try: decision = approve_candidate( repository=repository, candidate_id=args.candidate_id, report_id=args.report_id, decided_by=args.actor, comment=args.comment, ) except Exception as exc: # noqa: BLE001 print(json.dumps({"status": "error", "message": str(exc)})) return 2 print(json.dumps({"status": "ok", "decision": decision.decision, "decision_id": decision.id})) return 0 # [/DEF:run_approve:Function] # [DEF:run_reject:Function] # @PURPOSE: Reject candidate without mutating compliance evidence. # @PRE: Candidate and report exist. # @POST: Persists REJECTED decision and returns success payload. def run_reject(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() try: decision = reject_candidate( repository=repository, candidate_id=args.candidate_id, report_id=args.report_id, decided_by=args.actor, comment=args.comment, ) except Exception as exc: # noqa: BLE001 print(json.dumps({"status": "error", "message": str(exc)})) return 2 print(json.dumps({"status": "ok", "decision": decision.decision, "decision_id": decision.id})) return 0 # [/DEF:run_reject:Function] # [DEF:run_publish:Function] # @PURPOSE: Publish approved candidate to target channel. # @PRE: Candidate is approved and report belongs to candidate. # @POST: Appends ACTIVE publication record and returns payload. def run_publish(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() try: publication = publish_candidate( repository=repository, candidate_id=args.candidate_id, report_id=args.report_id, published_by=args.actor, target_channel=args.target_channel, publication_ref=args.publication_ref, ) except Exception as exc: # noqa: BLE001 print(json.dumps({"status": "error", "message": str(exc)})) return 2 print(json.dumps({"status": "ok", "publication": _to_payload(publication)})) return 0 # [/DEF:run_publish:Function] # [DEF:run_revoke:Function] # @PURPOSE: Revoke active publication record. # @PRE: Publication id exists and is ACTIVE. # @POST: Publication record status becomes REVOKED. def run_revoke(args: argparse.Namespace) -> int: from ..dependencies import get_clean_release_repository repository = get_clean_release_repository() try: publication = revoke_publication( repository=repository, publication_id=args.publication_id, revoked_by=args.actor, comment=args.comment, ) except Exception as exc: # noqa: BLE001 print(json.dumps({"status": "error", "message": str(exc)})) return 2 print(json.dumps({"status": "ok", "publication": _to_payload(publication)})) return 0 # [/DEF:run_revoke:Function] # [DEF:main:Function] # @PURPOSE: CLI entrypoint for clean release commands. def main(argv: Optional[List[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.command == "candidate-register": return run_candidate_register(args) if args.command == "artifact-import": return run_artifact_import(args) if args.command == "manifest-build": return run_manifest_build(args) if args.command == "compliance-run": return run_compliance_run(args) if args.command == "compliance-status": return run_compliance_status(args) if args.command == "compliance-report": return run_compliance_report(args) if args.command == "compliance-violations": return run_compliance_violations(args) if args.command == "approve": return run_approve(args) if args.command == "reject": return run_reject(args) if args.command == "publish": return run_publish(args) if args.command == "revoke": return run_revoke(args) print(json.dumps({"status": "error", "message": "unknown command"})) return 2 # [/DEF:main:Function] if __name__ == "__main__": raise SystemExit(main()) # [/DEF:backend.src.scripts.clean_release_cli:Module]