# [DEF:backend.src.services.rbac_permission_catalog:Module] # # @COMPLEXITY: 3 # @SEMANTICS: rbac, permissions, catalog, sync, discovery # @PURPOSE: Discovers declared RBAC permissions from API routes/plugins and synchronizes them into auth database. # @LAYER: Service # @RELATION: CALLS -> backend.src.core.plugin_loader.PluginLoader.get_all_plugin_configs # @RELATION: DEPENDS_ON -> backend.src.models.auth.Permission # @INVARIANT: Synchronization is idempotent for existing (resource, action) permission pairs. # [SECTION: IMPORTS] import re from functools import lru_cache from pathlib import Path from typing import Iterable, Set, Tuple from sqlalchemy.orm import Session from ..core.logger import belief_scope, logger from ..models.auth import Permission # [/SECTION: IMPORTS] # [DEF:HAS_PERMISSION_PATTERN:Constant] # @PURPOSE: Regex pattern for extracting has_permission("resource", "ACTION") declarations. HAS_PERMISSION_PATTERN = re.compile( r"""has_permission\(\s*['"]([^'"]+)['"]\s*,\s*['"]([A-Z]+)['"]\s*\)""" ) # [/DEF:HAS_PERMISSION_PATTERN:Constant] # [DEF:ROUTES_DIR:Constant] # @PURPOSE: Absolute directory path where API route RBAC declarations are defined. ROUTES_DIR = Path(__file__).resolve().parent.parent / "api" / "routes" # [/DEF:ROUTES_DIR:Constant] # [DEF:_iter_route_files:Function] # @PURPOSE: Iterates API route files that may contain RBAC declarations. # @PRE: ROUTES_DIR points to backend/src/api/routes. # @POST: Yields Python files excluding test and cache directories. # @RETURN: Iterable[Path] - Route file paths for permission extraction. def _iter_route_files() -> Iterable[Path]: with belief_scope("rbac_permission_catalog._iter_route_files"): if not ROUTES_DIR.exists(): return [] files = [] for file_path in ROUTES_DIR.rglob("*.py"): path_parts = set(file_path.parts) if "__tests__" in path_parts or "__pycache__" in path_parts: continue files.append(file_path) return files # [/DEF:_iter_route_files:Function] # [DEF:_discover_route_permissions:Function] # @PURPOSE: Extracts explicit has_permission declarations from API route source code. # @PRE: Route files are readable UTF-8 text files. # @POST: Returns unique set of (resource, action) pairs declared in route guards. # @RETURN: Set[Tuple[str, str]] - Permission pairs from route-level RBAC declarations. def _discover_route_permissions() -> Set[Tuple[str, str]]: with belief_scope("rbac_permission_catalog._discover_route_permissions"): discovered: Set[Tuple[str, str]] = set() for route_file in _iter_route_files(): try: source = route_file.read_text(encoding="utf-8") except OSError as read_error: logger.warning( "[rbac_permission_catalog][EXPLORE] Failed to read route file %s: %s", route_file, read_error, ) continue for resource, action in HAS_PERMISSION_PATTERN.findall(source): normalized_resource = str(resource or "").strip() normalized_action = str(action or "").strip().upper() if normalized_resource and normalized_action: discovered.add((normalized_resource, normalized_action)) return discovered # [/DEF:_discover_route_permissions:Function] # [DEF:_discover_route_permissions_cached:Function] # @PURPOSE: Cache route permission discovery because route source files are static during normal runtime. # @PRE: None. # @POST: Returns stable discovered route permission pairs without repeated filesystem scans. @lru_cache(maxsize=1) def _discover_route_permissions_cached() -> Tuple[Tuple[str, str], ...]: with belief_scope("rbac_permission_catalog._discover_route_permissions_cached"): return tuple(sorted(_discover_route_permissions())) # [/DEF:_discover_route_permissions_cached:Function] # [DEF:_discover_plugin_execute_permissions:Function] # @PURPOSE: Derives dynamic task permissions of form plugin:{plugin_id}:EXECUTE from plugin registry. # @PRE: plugin_loader is optional and may expose get_all_plugin_configs. # @POST: Returns unique plugin EXECUTE permissions if loader is available. # @RETURN: Set[Tuple[str, str]] - Permission pairs derived from loaded plugin IDs. def _discover_plugin_execute_permissions(plugin_loader=None) -> Set[Tuple[str, str]]: with belief_scope("rbac_permission_catalog._discover_plugin_execute_permissions"): discovered: Set[Tuple[str, str]] = set() if plugin_loader is None: return discovered try: plugin_configs = plugin_loader.get_all_plugin_configs() except Exception as plugin_error: logger.warning( "[rbac_permission_catalog][EXPLORE] Failed to read plugin configs for RBAC discovery: %s", plugin_error, ) return discovered for plugin_config in plugin_configs: plugin_id = str(getattr(plugin_config, "id", "") or "").strip() if plugin_id: discovered.add((f"plugin:{plugin_id}", "EXECUTE")) return discovered # [/DEF:_discover_plugin_execute_permissions:Function] # [DEF:_discover_plugin_execute_permissions_cached:Function] # @PURPOSE: Cache dynamic plugin EXECUTE permission pairs by normalized plugin id tuple. # @PRE: plugin_ids is a deterministic tuple of plugin ids. # @POST: Returns stable permission tuple without repeated plugin catalog expansion. @lru_cache(maxsize=8) def _discover_plugin_execute_permissions_cached( plugin_ids: Tuple[str, ...], ) -> Tuple[Tuple[str, str], ...]: with belief_scope("rbac_permission_catalog._discover_plugin_execute_permissions_cached"): return tuple((f"plugin:{plugin_id}", "EXECUTE") for plugin_id in plugin_ids) # [/DEF:_discover_plugin_execute_permissions_cached:Function] # [DEF:discover_declared_permissions:Function] # @PURPOSE: Builds canonical RBAC permission catalog from routes and plugin registry. # @PRE: plugin_loader may be provided for dynamic task plugin permission discovery. # @POST: Returns union of route-declared and dynamic plugin EXECUTE permissions. # @RETURN: Set[Tuple[str, str]] - Complete discovered permission set. def discover_declared_permissions(plugin_loader=None) -> Set[Tuple[str, str]]: with belief_scope("rbac_permission_catalog.discover_declared_permissions"): permissions = set(_discover_route_permissions_cached()) plugin_ids = tuple( sorted( { str(getattr(plugin_config, "id", "") or "").strip() for plugin_config in (plugin_loader.get_all_plugin_configs() if plugin_loader else []) if str(getattr(plugin_config, "id", "") or "").strip() } ) ) permissions.update(_discover_plugin_execute_permissions_cached(plugin_ids)) return permissions # [/DEF:discover_declared_permissions:Function] # [DEF:sync_permission_catalog:Function] # @PURPOSE: Persists missing RBAC permission pairs into auth database. # @PRE: db is a valid SQLAlchemy session bound to auth database. # @PRE: declared_permissions is an iterable of (resource, action) tuples. # @POST: Missing permissions are inserted; existing permissions remain untouched. # @SIDE_EFFECT: Commits auth database transaction when new permissions are added. # @RETURN: int - Number of inserted permission records. def sync_permission_catalog( db: Session, declared_permissions: Iterable[Tuple[str, str]], ) -> int: with belief_scope("rbac_permission_catalog.sync_permission_catalog"): normalized_declared: Set[Tuple[str, str]] = set() for resource, action in declared_permissions: normalized_resource = str(resource or "").strip() normalized_action = str(action or "").strip().upper() if normalized_resource and normalized_action: normalized_declared.add((normalized_resource, normalized_action)) existing_permissions = db.query(Permission).all() existing_pairs = {(perm.resource, perm.action.upper()) for perm in existing_permissions} missing_pairs = sorted(normalized_declared - existing_pairs) for resource, action in missing_pairs: db.add(Permission(resource=resource, action=action)) if missing_pairs: db.commit() return len(missing_pairs) # [/DEF:sync_permission_catalog:Function] # [/DEF:backend.src.services.rbac_permission_catalog:Module]