Files
ss-tools/backend/src/api/routes/git.py

1214 lines
48 KiB
Python

# [DEF:backend.src.api.routes.git:Module]
#
# @TIER: STANDARD
# @SEMANTICS: git, routes, api, fastapi, repository, deployment
# @PURPOSE: Provides FastAPI endpoints for Git integration operations.
# @LAYER: API
# @RELATION: USES -> src.services.git_service.GitService
# @RELATION: USES -> src.api.routes.git_schemas
# @RELATION: USES -> src.models.git
#
# @INVARIANT: All Git operations must be routed through GitService.
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List, Optional
import typing
import os
from src.dependencies import get_config_manager, get_current_user, has_permission
from src.core.database import get_db
from src.models.auth import User
from src.models.git import GitServerConfig, GitRepository, GitProvider
from src.models.profile import UserDashboardPreference
from src.api.routes.git_schemas import (
GitServerConfigSchema, GitServerConfigCreate, GitServerConfigUpdate,
BranchSchema, BranchCreate,
BranchCheckout, CommitSchema, CommitCreate,
DeploymentEnvironmentSchema, DeployRequest, RepoInitRequest,
RepositoryBindingSchema,
RepoStatusBatchRequest, RepoStatusBatchResponse,
GiteaRepoCreateRequest, GiteaRepoSchema,
RemoteRepoCreateRequest, RemoteRepoSchema,
PromoteRequest, PromoteResponse,
)
from src.services.git_service import GitService
from src.core.superset_client import SupersetClient
from src.core.logger import logger, belief_scope
from ...services.llm_prompt_templates import (
DEFAULT_LLM_PROMPTS,
normalize_llm_settings,
resolve_bound_provider_id,
)
router = APIRouter(tags=["git"])
git_service = GitService()
MAX_REPOSITORY_STATUS_BATCH = 50
# [DEF:_build_no_repo_status_payload:Function]
# @PURPOSE: Build a consistent status payload for dashboards without initialized repositories.
# @PRE: None.
# @POST: Returns a stable payload compatible with frontend repository status parsing.
# @RETURN: dict
def _build_no_repo_status_payload() -> dict:
return {
"is_dirty": False,
"untracked_files": [],
"modified_files": [],
"staged_files": [],
"current_branch": None,
"upstream_branch": None,
"has_upstream": False,
"ahead_count": 0,
"behind_count": 0,
"is_diverged": False,
"sync_state": "NO_REPO",
"sync_status": "NO_REPO",
"has_repo": False,
}
# [/DEF:_build_no_repo_status_payload:Function]
# [DEF:_handle_unexpected_git_route_error:Function]
# @PURPOSE: Convert unexpected route-level exceptions to stable 500 API responses.
# @PRE: `error` is a non-HTTPException instance.
# @POST: Raises HTTPException(500) with route-specific context.
# @PARAM: route_name (str)
# @PARAM: error (Exception)
def _handle_unexpected_git_route_error(route_name: str, error: Exception) -> None:
logger.error(f"[{route_name}][Coherence:Failed] {error}")
raise HTTPException(status_code=500, detail=f"{route_name} failed: {str(error)}")
# [/DEF:_handle_unexpected_git_route_error:Function]
# [DEF:_resolve_repository_status:Function]
# @PURPOSE: Resolve repository status for one dashboard with graceful NO_REPO semantics.
# @PRE: `dashboard_id` is a valid integer.
# @POST: Returns standard status payload or `NO_REPO` payload when repository path is absent.
# @PARAM: dashboard_id (int)
# @RETURN: dict
def _resolve_repository_status(dashboard_id: int) -> dict:
repo_path = git_service._get_repo_path(dashboard_id)
if not os.path.exists(repo_path):
logger.debug(
f"[get_repository_status][Action] Repository is not initialized for dashboard {dashboard_id}"
)
return _build_no_repo_status_payload()
try:
return git_service.get_status(dashboard_id)
except HTTPException as e:
if e.status_code == 404:
logger.debug(
f"[get_repository_status][Action] Repository is not initialized for dashboard {dashboard_id}"
)
return _build_no_repo_status_payload()
raise
# [/DEF:_resolve_repository_status:Function]
# [DEF:_get_git_config_or_404:Function]
# @PURPOSE: Resolve GitServerConfig by id or raise 404.
# @PRE: db session is available.
# @POST: Returns GitServerConfig model.
def _get_git_config_or_404(db: Session, config_id: str) -> GitServerConfig:
config = db.query(GitServerConfig).filter(GitServerConfig.id == config_id).first()
if not config:
raise HTTPException(status_code=404, detail="Git configuration not found")
return config
# [/DEF:_get_git_config_or_404:Function]
# [DEF:_find_dashboard_id_by_slug:Function]
# @PURPOSE: Resolve dashboard numeric ID by slug in a specific environment.
# @PRE: dashboard_slug is non-empty.
# @POST: Returns dashboard ID or None when not found.
def _find_dashboard_id_by_slug(
client: SupersetClient,
dashboard_slug: str,
) -> Optional[int]:
query_variants = [
{"filters": [{"col": "slug", "opr": "eq", "value": dashboard_slug}], "page": 0, "page_size": 1},
{"filters": [{"col": "slug", "op": "eq", "value": dashboard_slug}], "page": 0, "page_size": 1},
]
for query in query_variants:
try:
_count, dashboards = client.get_dashboards_page(query=query)
if dashboards:
resolved_id = dashboards[0].get("id")
if resolved_id is not None:
return int(resolved_id)
except Exception:
continue
return None
# [/DEF:_find_dashboard_id_by_slug:Function]
# [DEF:_resolve_dashboard_id_from_ref:Function]
# @PURPOSE: Resolve dashboard ID from slug-or-id reference for Git routes.
# @PRE: dashboard_ref is provided; env_id is required for slug values.
# @POST: Returns numeric dashboard ID or raises HTTPException.
def _resolve_dashboard_id_from_ref(
dashboard_ref: str,
config_manager,
env_id: Optional[str] = None,
) -> int:
normalized_ref = str(dashboard_ref or "").strip()
if not normalized_ref:
raise HTTPException(status_code=400, detail="dashboard_ref is required")
if normalized_ref.isdigit():
return int(normalized_ref)
if not env_id:
raise HTTPException(
status_code=400,
detail="env_id is required for slug-based Git operations",
)
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == env_id), None)
if not env:
raise HTTPException(status_code=404, detail="Environment not found")
dashboard_id = _find_dashboard_id_by_slug(SupersetClient(env), normalized_ref)
if dashboard_id is None:
raise HTTPException(status_code=404, detail=f"Dashboard slug '{normalized_ref}' not found")
return dashboard_id
# [/DEF:_resolve_dashboard_id_from_ref:Function]
# [DEF:_resolve_repo_key_from_ref:Function]
# @PURPOSE: Resolve repository folder key with slug-first strategy and deterministic fallback.
# @PRE: dashboard_id is resolved and valid.
# @POST: Returns safe key to be used in local repository path.
# @RETURN: str
def _resolve_repo_key_from_ref(
dashboard_ref: str,
dashboard_id: int,
config_manager,
env_id: Optional[str] = None,
) -> str:
normalized_ref = str(dashboard_ref or "").strip()
if normalized_ref and not normalized_ref.isdigit():
return normalized_ref
if env_id:
try:
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == env_id), None)
if env:
payload = SupersetClient(env).get_dashboard(dashboard_id)
dashboard_data = payload.get("result", payload) if isinstance(payload, dict) else {}
dashboard_slug = dashboard_data.get("slug")
if dashboard_slug:
return str(dashboard_slug)
except Exception:
pass
return f"dashboard-{dashboard_id}"
# [/DEF:_resolve_repo_key_from_ref:Function]
# [DEF:_sanitize_optional_identity_value:Function]
# @PURPOSE: Normalize optional identity value into trimmed string or None.
# @PRE: value may be None or blank.
# @POST: Returns sanitized value suitable for git identity configuration.
# @RETURN: Optional[str]
def _sanitize_optional_identity_value(value: Optional[str]) -> Optional[str]:
normalized = str(value or "").strip()
if not normalized:
return None
return normalized
# [/DEF:_sanitize_optional_identity_value:Function]
# [DEF:_resolve_current_user_git_identity:Function]
# @PURPOSE: Resolve configured Git username/email from current user's profile preferences.
# @PRE: `db` may be stubbed in tests; `current_user` may be absent for direct handler invocations.
# @POST: Returns tuple(username, email) only when both values are configured.
# @RETURN: Optional[tuple[str, str]]
def _resolve_current_user_git_identity(
db: Session,
current_user: Optional[User],
) -> Optional[tuple[str, str]]:
if db is None or not hasattr(db, "query"):
return None
user_id = _sanitize_optional_identity_value(getattr(current_user, "id", None))
if not user_id:
return None
try:
preference = (
db.query(UserDashboardPreference)
.filter(UserDashboardPreference.user_id == user_id)
.first()
)
except Exception as resolve_error:
logger.warning(
"[_resolve_current_user_git_identity][Action] Failed to load profile preference for user %s: %s",
user_id,
resolve_error,
)
return None
if not preference:
return None
git_username = _sanitize_optional_identity_value(getattr(preference, "git_username", None))
git_email = _sanitize_optional_identity_value(getattr(preference, "git_email", None))
if not git_username or not git_email:
return None
return git_username, git_email
# [/DEF:_resolve_current_user_git_identity:Function]
# [DEF:_apply_git_identity_from_profile:Function]
# @PURPOSE: Apply user-scoped Git identity to repository-local config before write/pull operations.
# @PRE: dashboard_id is resolved; db/current_user may be missing in direct test invocation context.
# @POST: git_service.configure_identity is called only when identity and method are available.
# @RETURN: None
def _apply_git_identity_from_profile(
dashboard_id: int,
db: Session,
current_user: Optional[User],
) -> None:
identity = _resolve_current_user_git_identity(db, current_user)
if not identity:
return
configure_identity = getattr(git_service, "configure_identity", None)
if not callable(configure_identity):
return
git_username, git_email = identity
configure_identity(dashboard_id, git_username, git_email)
# [/DEF:_apply_git_identity_from_profile:Function]
# [DEF:get_git_configs:Function]
# @PURPOSE: List all configured Git servers.
# @PRE: Database session `db` is available.
# @POST: Returns a list of all GitServerConfig objects from the database.
# @RETURN: List[GitServerConfigSchema]
@router.get("/config", response_model=List[GitServerConfigSchema])
async def get_git_configs(
db: Session = Depends(get_db),
_ = Depends(has_permission("git_config", "READ"))
):
with belief_scope("get_git_configs"):
configs = db.query(GitServerConfig).all()
result = []
for config in configs:
schema = GitServerConfigSchema.from_orm(config)
schema.pat = "********"
result.append(schema)
return result
# [/DEF:get_git_configs:Function]
# [DEF:create_git_config:Function]
# @PURPOSE: Register a new Git server configuration.
# @PRE: `config` contains valid GitServerConfigCreate data.
# @POST: A new GitServerConfig record is created in the database.
# @PARAM: config (GitServerConfigCreate)
# @RETURN: GitServerConfigSchema
@router.post("/config", response_model=GitServerConfigSchema)
async def create_git_config(
config: GitServerConfigCreate,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("create_git_config"):
config_dict = config.dict(exclude={"config_id"})
db_config = GitServerConfig(**config_dict)
db.add(db_config)
db.commit()
db.refresh(db_config)
return db_config
# [/DEF:create_git_config:Function]
# [DEF:update_git_config:Function]
# @PURPOSE: Update an existing Git server configuration.
# @PRE: `config_id` corresponds to an existing configuration.
# @POST: The configuration record is updated in the database.
# @PARAM: config_id (str)
# @PARAM: config_update (GitServerConfigUpdate)
# @RETURN: GitServerConfigSchema
@router.put("/config/{config_id}", response_model=GitServerConfigSchema)
async def update_git_config(
config_id: str,
config_update: GitServerConfigUpdate,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("update_git_config"):
db_config = db.query(GitServerConfig).filter(GitServerConfig.id == config_id).first()
if not db_config:
raise HTTPException(status_code=404, detail="Configuration not found")
update_data = config_update.dict(exclude_unset=True)
if update_data.get("pat") == "********":
update_data.pop("pat")
for key, value in update_data.items():
setattr(db_config, key, value)
db.commit()
db.refresh(db_config)
result_schema = GitServerConfigSchema.from_orm(db_config)
result_schema.pat = "********"
return result_schema
# [/DEF:update_git_config:Function]
# [DEF:delete_git_config:Function]
# @PURPOSE: Remove a Git server configuration.
# @PRE: `config_id` corresponds to an existing configuration.
# @POST: The configuration record is removed from the database.
# @PARAM: config_id (str)
@router.delete("/config/{config_id}")
async def delete_git_config(
config_id: str,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("delete_git_config"):
db_config = db.query(GitServerConfig).filter(GitServerConfig.id == config_id).first()
if not db_config:
raise HTTPException(status_code=404, detail="Configuration not found")
db.delete(db_config)
db.commit()
return {"status": "success", "message": "Configuration deleted"}
# [/DEF:delete_git_config:Function]
# [DEF:test_git_config:Function]
# @PURPOSE: Validate connection to a Git server using provided credentials.
# @PRE: `config` contains provider, url, and pat.
# @POST: Returns success if the connection is validated via GitService.
# @PARAM: config (GitServerConfigCreate)
@router.post("/config/test")
async def test_git_config(
config: GitServerConfigCreate,
db: Session = Depends(get_db),
_ = Depends(has_permission("git_config", "READ"))
):
with belief_scope("test_git_config"):
pat_to_use = config.pat
if pat_to_use == "********":
if config.config_id:
db_config = db.query(GitServerConfig).filter(GitServerConfig.id == config.config_id).first()
if db_config:
pat_to_use = db_config.pat
else:
db_config = db.query(GitServerConfig).filter(GitServerConfig.url == config.url).first()
if db_config:
pat_to_use = db_config.pat
success = await git_service.test_connection(config.provider, config.url, pat_to_use)
if success:
return {"status": "success", "message": "Connection successful"}
else:
raise HTTPException(status_code=400, detail="Connection failed")
# [/DEF:test_git_config:Function]
# [DEF:list_gitea_repositories:Function]
# @PURPOSE: List repositories in Gitea for a saved Gitea config.
# @PRE: config_id exists and provider is GITEA.
# @POST: Returns repositories visible to PAT user.
@router.get("/config/{config_id}/gitea/repos", response_model=List[GiteaRepoSchema])
async def list_gitea_repositories(
config_id: str,
db: Session = Depends(get_db),
_ = Depends(has_permission("git_config", "READ"))
):
with belief_scope("list_gitea_repositories"):
config = _get_git_config_or_404(db, config_id)
if config.provider != GitProvider.GITEA:
raise HTTPException(status_code=400, detail="This endpoint supports GITEA provider only")
repos = await git_service.list_gitea_repositories(config.url, config.pat)
return [
GiteaRepoSchema(
name=repo.get("name", ""),
full_name=repo.get("full_name", ""),
private=bool(repo.get("private", False)),
clone_url=repo.get("clone_url"),
html_url=repo.get("html_url"),
ssh_url=repo.get("ssh_url"),
default_branch=repo.get("default_branch"),
)
for repo in repos
]
# [/DEF:list_gitea_repositories:Function]
# [DEF:create_gitea_repository:Function]
# @PURPOSE: Create a repository in Gitea for a saved Gitea config.
# @PRE: config_id exists and provider is GITEA.
# @POST: Returns created repository payload.
@router.post("/config/{config_id}/gitea/repos", response_model=GiteaRepoSchema)
async def create_gitea_repository(
config_id: str,
request: GiteaRepoCreateRequest,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("create_gitea_repository"):
config = _get_git_config_or_404(db, config_id)
if config.provider != GitProvider.GITEA:
raise HTTPException(status_code=400, detail="This endpoint supports GITEA provider only")
repo = await git_service.create_gitea_repository(
server_url=config.url,
pat=config.pat,
name=request.name,
private=request.private,
description=request.description,
auto_init=request.auto_init,
default_branch=request.default_branch,
)
return GiteaRepoSchema(
name=repo.get("name", ""),
full_name=repo.get("full_name", ""),
private=bool(repo.get("private", False)),
clone_url=repo.get("clone_url"),
html_url=repo.get("html_url"),
ssh_url=repo.get("ssh_url"),
default_branch=repo.get("default_branch"),
)
# [/DEF:create_gitea_repository:Function]
# [DEF:create_remote_repository:Function]
# @PURPOSE: Create repository on remote Git server using selected provider config.
# @PRE: config_id exists and PAT has creation permissions.
# @POST: Returns normalized remote repository payload.
@router.post("/config/{config_id}/repositories", response_model=RemoteRepoSchema)
async def create_remote_repository(
config_id: str,
request: RemoteRepoCreateRequest,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("create_remote_repository"):
config = _get_git_config_or_404(db, config_id)
if config.provider == GitProvider.GITEA:
repo = await git_service.create_gitea_repository(
server_url=config.url,
pat=config.pat,
name=request.name,
private=request.private,
description=request.description,
auto_init=request.auto_init,
default_branch=request.default_branch,
)
elif config.provider == GitProvider.GITHUB:
repo = await git_service.create_github_repository(
server_url=config.url,
pat=config.pat,
name=request.name,
private=request.private,
description=request.description,
auto_init=request.auto_init,
default_branch=request.default_branch,
)
elif config.provider == GitProvider.GITLAB:
repo = await git_service.create_gitlab_repository(
server_url=config.url,
pat=config.pat,
name=request.name,
private=request.private,
description=request.description,
auto_init=request.auto_init,
default_branch=request.default_branch,
)
else:
raise HTTPException(status_code=501, detail=f"Provider {config.provider} is not supported")
return RemoteRepoSchema(
provider=config.provider,
name=repo.get("name", ""),
full_name=repo.get("full_name", repo.get("name", "")),
private=bool(repo.get("private", False)),
clone_url=repo.get("clone_url"),
html_url=repo.get("html_url"),
ssh_url=repo.get("ssh_url"),
default_branch=repo.get("default_branch"),
)
# [/DEF:create_remote_repository:Function]
# [DEF:delete_gitea_repository:Function]
# @PURPOSE: Delete repository in Gitea for a saved Gitea config.
# @PRE: config_id exists and provider is GITEA.
# @POST: Target repository is deleted on Gitea.
@router.delete("/config/{config_id}/gitea/repos/{owner}/{repo_name}")
async def delete_gitea_repository(
config_id: str,
owner: str,
repo_name: str,
db: Session = Depends(get_db),
_ = Depends(has_permission("admin:settings", "WRITE"))
):
with belief_scope("delete_gitea_repository"):
config = _get_git_config_or_404(db, config_id)
if config.provider != GitProvider.GITEA:
raise HTTPException(status_code=400, detail="This endpoint supports GITEA provider only")
await git_service.delete_gitea_repository(
server_url=config.url,
pat=config.pat,
owner=owner,
repo_name=repo_name,
)
return {"status": "success", "message": "Repository deleted"}
# [/DEF:delete_gitea_repository:Function]
# [DEF:init_repository:Function]
# @PURPOSE: Link a dashboard to a Git repository and perform initial clone/init.
# @PRE: `dashboard_ref` exists and `init_data` contains valid config_id and remote_url.
# @POST: Repository is initialized on disk and a GitRepository record is saved in DB.
# @PARAM: dashboard_ref (str)
# @PARAM: init_data (RepoInitRequest)
@router.post("/repositories/{dashboard_ref}/init")
async def init_repository(
dashboard_ref: str,
init_data: RepoInitRequest,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("init_repository"):
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
repo_key = _resolve_repo_key_from_ref(dashboard_ref, dashboard_id, config_manager, env_id)
# 1. Get config
config = db.query(GitServerConfig).filter(GitServerConfig.id == init_data.config_id).first()
if not config:
raise HTTPException(status_code=404, detail="Git configuration not found")
try:
# 2. Perform Git clone/init
logger.info(f"[init_repository][Action] Initializing repo for dashboard {dashboard_id}")
git_service.init_repo(dashboard_id, init_data.remote_url, config.pat, repo_key=repo_key, default_branch=config.default_branch)
# 3. Save to DB
repo_path = git_service._get_repo_path(dashboard_id, repo_key=repo_key)
db_repo = db.query(GitRepository).filter(GitRepository.dashboard_id == dashboard_id).first()
if not db_repo:
db_repo = GitRepository(
dashboard_id=dashboard_id,
config_id=config.id,
remote_url=init_data.remote_url,
local_path=repo_path,
current_branch="dev",
)
db.add(db_repo)
else:
db_repo.config_id = config.id
db_repo.remote_url = init_data.remote_url
db_repo.local_path = repo_path
db_repo.current_branch = "dev"
db.commit()
logger.info(f"[init_repository][Coherence:OK] Repository initialized for dashboard {dashboard_id}")
return {"status": "success", "message": "Repository initialized"}
except Exception as e:
db.rollback()
logger.error(f"[init_repository][Coherence:Failed] Failed to init repository: {e}")
if isinstance(e, HTTPException):
raise
_handle_unexpected_git_route_error("init_repository", e)
# [/DEF:init_repository:Function]
# [DEF:get_repository_binding:Function]
# @PURPOSE: Return repository binding with provider metadata for selected dashboard.
# @PRE: `dashboard_ref` resolves to a valid dashboard and repository is initialized.
# @POST: Returns dashboard repository binding and linked provider.
# @PARAM: dashboard_ref (str)
# @RETURN: RepositoryBindingSchema
@router.get("/repositories/{dashboard_ref}", response_model=RepositoryBindingSchema)
async def get_repository_binding(
dashboard_ref: str,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("get_repository_binding"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
db_repo = db.query(GitRepository).filter(GitRepository.dashboard_id == dashboard_id).first()
if not db_repo:
raise HTTPException(status_code=404, detail="Repository not initialized")
config = _get_git_config_or_404(db, db_repo.config_id)
return RepositoryBindingSchema(
dashboard_id=db_repo.dashboard_id,
config_id=db_repo.config_id,
provider=config.provider,
remote_url=db_repo.remote_url,
local_path=db_repo.local_path,
)
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("get_repository_binding", e)
# [/DEF:get_repository_binding:Function]
# [DEF:delete_repository:Function]
# @PURPOSE: Delete local repository workspace and DB binding for selected dashboard.
# @PRE: `dashboard_ref` resolves to a valid dashboard.
# @POST: Repository files and binding record are removed when present.
# @PARAM: dashboard_ref (str)
# @RETURN: dict
@router.delete("/repositories/{dashboard_ref}")
async def delete_repository(
dashboard_ref: str,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("delete_repository"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
git_service.delete_repo(dashboard_id)
return {"status": "success"}
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("delete_repository", e)
# [/DEF:delete_repository:Function]
# [DEF:get_branches:Function]
# @PURPOSE: List all branches for a dashboard's repository.
# @PRE: Repository for `dashboard_ref` is initialized.
# @POST: Returns a list of branches from the local repository.
# @PARAM: dashboard_ref (str)
# @RETURN: List[BranchSchema]
@router.get("/repositories/{dashboard_ref}/branches", response_model=List[BranchSchema])
async def get_branches(
dashboard_ref: str,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("get_branches"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
return git_service.list_branches(dashboard_id)
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("get_branches", e)
# [/DEF:get_branches:Function]
# [DEF:create_branch:Function]
# @PURPOSE: Create a new branch in the dashboard's repository.
# @PRE: `dashboard_ref` repository exists and `branch_data` has name and from_branch.
# @POST: A new branch is created in the local repository.
# @PARAM: dashboard_ref (str)
# @PARAM: branch_data (BranchCreate)
@router.post("/repositories/{dashboard_ref}/branches")
async def create_branch(
dashboard_ref: str,
branch_data: BranchCreate,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("create_branch"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
_apply_git_identity_from_profile(dashboard_id, db, current_user)
git_service.create_branch(dashboard_id, branch_data.name, branch_data.from_branch)
return {"status": "success"}
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("create_branch", e)
# [/DEF:create_branch:Function]
# [DEF:checkout_branch:Function]
# @PURPOSE: Switch the dashboard's repository to a specific branch.
# @PRE: `dashboard_ref` repository exists and branch `checkout_data.name` exists.
# @POST: The local repository HEAD is moved to the specified branch.
# @PARAM: dashboard_ref (str)
# @PARAM: checkout_data (BranchCheckout)
@router.post("/repositories/{dashboard_ref}/checkout")
async def checkout_branch(
dashboard_ref: str,
checkout_data: BranchCheckout,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("checkout_branch"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
git_service.checkout_branch(dashboard_id, checkout_data.name)
return {"status": "success"}
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("checkout_branch", e)
# [/DEF:checkout_branch:Function]
# [DEF:commit_changes:Function]
# @PURPOSE: Stage and commit changes in the dashboard's repository.
# @PRE: `dashboard_ref` repository exists and `commit_data` has message and files.
# @POST: Specified files are staged and a new commit is created.
# @PARAM: dashboard_ref (str)
# @PARAM: commit_data (CommitCreate)
@router.post("/repositories/{dashboard_ref}/commit")
async def commit_changes(
dashboard_ref: str,
commit_data: CommitCreate,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("commit_changes"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
_apply_git_identity_from_profile(dashboard_id, db, current_user)
git_service.commit_changes(dashboard_id, commit_data.message, commit_data.files)
return {"status": "success"}
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("commit_changes", e)
# [/DEF:commit_changes:Function]
# [DEF:push_changes:Function]
# @PURPOSE: Push local commits to the remote repository.
# @PRE: `dashboard_ref` repository exists and has a remote configured.
# @POST: Local commits are pushed to the remote repository.
# @PARAM: dashboard_ref (str)
@router.post("/repositories/{dashboard_ref}/push")
async def push_changes(
dashboard_ref: str,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("push_changes"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
git_service.push_changes(dashboard_id)
return {"status": "success"}
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("push_changes", e)
# [/DEF:push_changes:Function]
# [DEF:pull_changes:Function]
# @PURPOSE: Pull changes from the remote repository.
# @PRE: `dashboard_ref` repository exists and has a remote configured.
# @POST: Remote changes are fetched and merged into the local branch.
# @PARAM: dashboard_ref (str)
@router.post("/repositories/{dashboard_ref}/pull")
async def pull_changes(
dashboard_ref: str,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("pull_changes"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
_apply_git_identity_from_profile(dashboard_id, db, current_user)
git_service.pull_changes(dashboard_id)
return {"status": "success"}
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("pull_changes", e)
# [/DEF:pull_changes:Function]
# [DEF:sync_dashboard:Function]
# @PURPOSE: Sync dashboard state from Superset to Git using the GitPlugin.
# @PRE: `dashboard_ref` is valid; GitPlugin is available.
# @POST: Dashboard YAMLs are exported from Superset and committed to Git.
# @PARAM: dashboard_ref (str)
# @PARAM: source_env_id (Optional[str])
@router.post("/repositories/{dashboard_ref}/sync")
async def sync_dashboard(
dashboard_ref: str,
env_id: Optional[str] = None,
source_env_id: typing.Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("sync_dashboard"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
from src.plugins.git_plugin import GitPlugin
plugin = GitPlugin()
return await plugin.execute({
"operation": "sync",
"dashboard_id": dashboard_id,
"source_env_id": source_env_id
})
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("sync_dashboard", e)
# [/DEF:sync_dashboard:Function]
# [DEF:promote_dashboard:Function]
# @PURPOSE: Promote changes between branches via MR or direct merge.
# @PRE: dashboard repository is initialized and Git config is valid.
# @POST: Returns promotion result metadata.
@router.post("/repositories/{dashboard_ref}/promote", response_model=PromoteResponse)
async def promote_dashboard(
dashboard_ref: str,
payload: PromoteRequest,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("promote_dashboard"):
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
db_repo = db.query(GitRepository).filter(GitRepository.dashboard_id == dashboard_id).first()
if not db_repo:
raise HTTPException(status_code=404, detail=f"Repository for dashboard {dashboard_ref} is not initialized")
config = _get_git_config_or_404(db, db_repo.config_id)
from_branch = payload.from_branch.strip()
to_branch = payload.to_branch.strip()
if not from_branch or not to_branch:
raise HTTPException(status_code=400, detail="from_branch and to_branch are required")
if from_branch == to_branch:
raise HTTPException(status_code=400, detail="from_branch and to_branch must be different")
mode = (payload.mode or "mr").strip().lower()
if mode == "direct":
reason = (payload.reason or "").strip()
if not reason:
raise HTTPException(status_code=400, detail="Direct promote requires non-empty reason")
logger.warning(
"[promote_dashboard][PolicyViolation] Direct promote without MR by actor=unknown dashboard_ref=%s from=%s to=%s reason=%s",
dashboard_ref,
from_branch,
to_branch,
reason,
)
_apply_git_identity_from_profile(dashboard_id, db, current_user)
result = git_service.promote_direct_merge(
dashboard_id=dashboard_id,
from_branch=from_branch,
to_branch=to_branch,
)
return PromoteResponse(
mode="direct",
from_branch=from_branch,
to_branch=to_branch,
status=result.get("status", "merged"),
policy_violation=True,
)
title = (payload.title or "").strip() or f"Promote {from_branch} -> {to_branch}"
description = payload.description
if config.provider == GitProvider.GITEA:
pr = await git_service.create_gitea_pull_request(
server_url=config.url,
pat=config.pat,
remote_url=db_repo.remote_url,
from_branch=from_branch,
to_branch=to_branch,
title=title,
description=description,
)
elif config.provider == GitProvider.GITHUB:
pr = await git_service.create_github_pull_request(
server_url=config.url,
pat=config.pat,
remote_url=db_repo.remote_url,
from_branch=from_branch,
to_branch=to_branch,
title=title,
description=description,
draft=payload.draft,
)
elif config.provider == GitProvider.GITLAB:
pr = await git_service.create_gitlab_merge_request(
server_url=config.url,
pat=config.pat,
remote_url=db_repo.remote_url,
from_branch=from_branch,
to_branch=to_branch,
title=title,
description=description,
remove_source_branch=payload.remove_source_branch,
)
else:
raise HTTPException(status_code=501, detail=f"Provider {config.provider} does not support promotion API")
return PromoteResponse(
mode="mr",
from_branch=from_branch,
to_branch=to_branch,
status=pr.get("status", "opened"),
url=pr.get("url"),
reference_id=str(pr.get("id")) if pr.get("id") is not None else None,
policy_violation=False,
)
# [/DEF:promote_dashboard:Function]
# [DEF:get_environments:Function]
# @PURPOSE: List all deployment environments.
# @PRE: Config manager is accessible.
# @POST: Returns a list of DeploymentEnvironmentSchema objects.
# @RETURN: List[DeploymentEnvironmentSchema]
@router.get("/environments", response_model=List[DeploymentEnvironmentSchema])
async def get_environments(
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("environments", "READ"))
):
with belief_scope("get_environments"):
envs = config_manager.get_environments()
return [
DeploymentEnvironmentSchema(
id=e.id,
name=e.name,
superset_url=e.url,
is_active=True
) for e in envs
]
# [/DEF:get_environments:Function]
# [DEF:deploy_dashboard:Function]
# @PURPOSE: Deploy dashboard from Git to a target environment.
# @PRE: `dashboard_ref` and `deploy_data.environment_id` are valid.
# @POST: Dashboard YAMLs are read from Git and imported into the target Superset.
# @PARAM: dashboard_ref (str)
# @PARAM: deploy_data (DeployRequest)
@router.post("/repositories/{dashboard_ref}/deploy")
async def deploy_dashboard(
dashboard_ref: str,
deploy_data: DeployRequest,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("deploy_dashboard"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
from src.plugins.git_plugin import GitPlugin
plugin = GitPlugin()
return await plugin.execute({
"operation": "deploy",
"dashboard_id": dashboard_id,
"environment_id": deploy_data.environment_id
})
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("deploy_dashboard", e)
# [/DEF:deploy_dashboard:Function]
# [DEF:get_history:Function]
# @PURPOSE: View commit history for a dashboard's repository.
# @PRE: `dashboard_ref` repository exists.
# @POST: Returns a list of recent commits from the repository.
# @PARAM: dashboard_ref (str)
# @PARAM: limit (int)
# @RETURN: List[CommitSchema]
@router.get("/repositories/{dashboard_ref}/history", response_model=List[CommitSchema])
async def get_history(
dashboard_ref: str,
limit: int = 50,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("get_history"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
return git_service.get_commit_history(dashboard_id, limit)
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("get_history", e)
# [/DEF:get_history:Function]
# [DEF:get_repository_status:Function]
# @PURPOSE: Get current Git status for a dashboard repository.
# @PRE: `dashboard_ref` resolves to a valid dashboard.
# @POST: Returns repository status; if repo is not initialized, returns `NO_REPO` payload.
# @PARAM: dashboard_ref (str)
# @RETURN: dict
@router.get("/repositories/{dashboard_ref}/status")
async def get_repository_status(
dashboard_ref: str,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("get_repository_status"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
return _resolve_repository_status(dashboard_id)
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("get_repository_status", e)
# [/DEF:get_repository_status:Function]
# [DEF:get_repository_status_batch:Function]
# @PURPOSE: Get Git statuses for multiple dashboard repositories in one request.
# @PRE: `request.dashboard_ids` is provided.
# @POST: Returns `statuses` map where each key is dashboard ID and value is repository status payload.
# @PARAM: request (RepoStatusBatchRequest)
# @RETURN: RepoStatusBatchResponse
@router.post("/repositories/status/batch", response_model=RepoStatusBatchResponse)
async def get_repository_status_batch(
request: RepoStatusBatchRequest,
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("get_repository_status_batch"):
dashboard_ids = list(dict.fromkeys(request.dashboard_ids))
if len(dashboard_ids) > MAX_REPOSITORY_STATUS_BATCH:
logger.warning(
"[get_repository_status_batch][Action] Batch size %s exceeds limit %s. Truncating request.",
len(dashboard_ids),
MAX_REPOSITORY_STATUS_BATCH,
)
dashboard_ids = dashboard_ids[:MAX_REPOSITORY_STATUS_BATCH]
statuses = {}
for dashboard_id in dashboard_ids:
try:
statuses[str(dashboard_id)] = _resolve_repository_status(dashboard_id)
except HTTPException:
statuses[str(dashboard_id)] = {
**_build_no_repo_status_payload(),
"sync_state": "ERROR",
"sync_status": "ERROR",
}
except Exception as e:
logger.error(
f"[get_repository_status_batch][Coherence:Failed] Failed for dashboard {dashboard_id}: {e}"
)
statuses[str(dashboard_id)] = {
**_build_no_repo_status_payload(),
"sync_state": "ERROR",
"sync_status": "ERROR",
}
return RepoStatusBatchResponse(statuses=statuses)
# [/DEF:get_repository_status_batch:Function]
# [DEF:get_repository_diff:Function]
# @PURPOSE: Get Git diff for a dashboard repository.
# @PRE: `dashboard_ref` repository exists.
# @POST: Returns the diff text for the specified file or all changes.
# @PARAM: dashboard_ref (str)
# @PARAM: file_path (Optional[str])
# @PARAM: staged (bool)
# @RETURN: str
@router.get("/repositories/{dashboard_ref}/diff")
async def get_repository_diff(
dashboard_ref: str,
file_path: Optional[str] = None,
staged: bool = False,
env_id: Optional[str] = None,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("get_repository_diff"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
diff_text = git_service.get_diff(dashboard_id, file_path, staged)
return diff_text
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("get_repository_diff", e)
# [/DEF:get_repository_diff:Function]
# [DEF:generate_commit_message:Function]
# @PURPOSE: Generate a suggested commit message using LLM.
# @PRE: Repository for `dashboard_ref` is initialized.
# @POST: Returns a suggested commit message string.
@router.post("/repositories/{dashboard_ref}/generate-message")
async def generate_commit_message(
dashboard_ref: str,
env_id: Optional[str] = None,
config_manager = Depends(get_config_manager),
db: Session = Depends(get_db),
_ = Depends(has_permission("plugin:git", "EXECUTE"))
):
with belief_scope("generate_commit_message"):
try:
dashboard_id = _resolve_dashboard_id_from_ref(dashboard_ref, config_manager, env_id)
# 1. Get Diff
diff = git_service.get_diff(dashboard_id, staged=True)
if not diff:
diff = git_service.get_diff(dashboard_id, staged=False)
if not diff:
return {"message": "No changes detected"}
# 2. Get History
history_objs = git_service.get_commit_history(dashboard_id, limit=5)
history = [h.message for h in history_objs if hasattr(h, 'message')]
# 3. Get LLM Client
from ...services.llm_provider import LLMProviderService
from ...plugins.llm_analysis.service import LLMClient
from ...plugins.llm_analysis.models import LLMProviderType
llm_service = LLMProviderService(db)
providers = llm_service.get_all_providers()
llm_settings = normalize_llm_settings(config_manager.get_config().settings.llm)
bound_provider_id = resolve_bound_provider_id(llm_settings, "git_commit")
provider = next((p for p in providers if p.id == bound_provider_id), None)
if not provider:
provider = next((p for p in providers if p.is_active), None)
if not provider:
raise HTTPException(status_code=400, detail="No active LLM provider found")
api_key = llm_service.get_decrypted_api_key(provider.id)
client = LLMClient(
provider_type=LLMProviderType(provider.provider_type),
api_key=api_key,
base_url=provider.base_url,
default_model=provider.default_model
)
# 4. Generate Message
from ...plugins.git.llm_extension import GitLLMExtension
extension = GitLLMExtension(client)
git_prompt = llm_settings["prompts"].get(
"git_commit_prompt",
DEFAULT_LLM_PROMPTS["git_commit_prompt"],
)
message = await extension.suggest_commit_message(
diff,
history,
prompt_template=git_prompt,
)
return {"message": message}
except HTTPException:
raise
except Exception as e:
_handle_unexpected_git_route_error("generate_commit_message", e)
# [/DEF:generate_commit_message:Function]
# [/DEF:backend.src.api.routes.git:Module]