# [DEF:AppDependencies:Module] # @COMPLEXITY: 3 # @SEMANTICS: dependency, injection, singleton, factory, auth, jwt # @PURPOSE: Manages creation and provision of shared application dependencies, such as PluginLoader and TaskManager, to avoid circular imports. # @LAYER: Core # @RELATION: Used by main app and API routers to get access to shared instances. # @RELATION: CALLS ->[CleanReleaseRepository] # @RELATION: CALLS ->[ConfigManager] # @RELATION: CALLS ->[PluginLoader] # @RELATION: CALLS ->[SchedulerService] # @RELATION: CALLS ->[TaskManager] # @RELATION: CALLS ->[get_all_plugin_configs] # @RELATION: CALLS ->[get_db] # @RELATION: CALLS ->[info] # @RELATION: CALLS ->[init_db] from pathlib import Path from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError from .core.plugin_loader import PluginLoader from .core.task_manager import TaskManager from .core.config_manager import ConfigManager from .core.scheduler import SchedulerService from .services.resource_service import ResourceService from .services.mapping_service import MappingService from .services.clean_release.repositories import ( CandidateRepository, ArtifactRepository, ManifestRepository, PolicyRepository, ComplianceRepository, ReportRepository, ApprovalRepository, PublicationRepository, AuditRepository, CleanReleaseAuditLog ) from .services.clean_release.repository import CleanReleaseRepository from .services.clean_release.facade import CleanReleaseFacade from .services.reports.report_service import ReportsService from .core.database import init_db, get_auth_db, get_db from .core.logger import logger from .core.auth.jwt import decode_token from .core.auth.repository import AuthRepository from .models.auth import User # Initialize singletons # Use absolute path relative to this file to ensure plugins are found regardless of CWD project_root = Path(__file__).parent.parent.parent config_path = project_root / "config.json" # Initialize database before services that use persisted configuration. init_db() config_manager = ConfigManager(config_path=str(config_path)) # [DEF:get_config_manager:Function] # @COMPLEXITY: 1 # @PURPOSE: Dependency injector for ConfigManager. # @PRE: Global config_manager must be initialized. # @POST: Returns shared ConfigManager instance. # @RETURN: ConfigManager - The shared config manager instance. def get_config_manager() -> ConfigManager: """Dependency injector for ConfigManager.""" return config_manager # [/DEF:get_config_manager:Function] plugin_dir = Path(__file__).parent / "plugins" plugin_loader = PluginLoader(plugin_dir=str(plugin_dir)) logger.info(f"PluginLoader initialized with directory: {plugin_dir}") logger.info(f"Available plugins: {[config.name for config in plugin_loader.get_all_plugin_configs()]}") task_manager = TaskManager(plugin_loader) logger.info("TaskManager initialized") scheduler_service = SchedulerService(task_manager, config_manager) logger.info("SchedulerService initialized") resource_service = ResourceService() logger.info("ResourceService initialized") # Clean Release Redesign Singletons # Note: These use get_db() which is a generator, so we need a way to provide a session. # For singletons in dependencies.py, we might need a different approach or # initialize them inside the dependency functions. # [DEF:get_plugin_loader:Function] # @COMPLEXITY: 1 # @PURPOSE: Dependency injector for PluginLoader. # @PRE: Global plugin_loader must be initialized. # @POST: Returns shared PluginLoader instance. # @RETURN: PluginLoader - The shared plugin loader instance. def get_plugin_loader() -> PluginLoader: """Dependency injector for PluginLoader.""" return plugin_loader # [/DEF:get_plugin_loader:Function] # [DEF:get_task_manager:Function] # @COMPLEXITY: 1 # @PURPOSE: Dependency injector for TaskManager. # @PRE: Global task_manager must be initialized. # @POST: Returns shared TaskManager instance. # @RETURN: TaskManager - The shared task manager instance. def get_task_manager() -> TaskManager: """Dependency injector for TaskManager.""" return task_manager # [/DEF:get_task_manager:Function] # [DEF:get_scheduler_service:Function] # @COMPLEXITY: 1 # @PURPOSE: Dependency injector for SchedulerService. # @PRE: Global scheduler_service must be initialized. # @POST: Returns shared SchedulerService instance. # @RETURN: SchedulerService - The shared scheduler service instance. def get_scheduler_service() -> SchedulerService: """Dependency injector for SchedulerService.""" return scheduler_service # [/DEF:get_scheduler_service:Function] # [DEF:get_resource_service:Function] # @COMPLEXITY: 1 # @PURPOSE: Dependency injector for ResourceService. # @PRE: Global resource_service must be initialized. # @POST: Returns shared ResourceService instance. # @RETURN: ResourceService - The shared resource service instance. def get_resource_service() -> ResourceService: """Dependency injector for ResourceService.""" return resource_service # [/DEF:get_resource_service:Function] # [DEF:get_mapping_service:Function] # @COMPLEXITY: 1 # @PURPOSE: Dependency injector for MappingService. # @PRE: Global config_manager must be initialized. # @POST: Returns new MappingService instance. # @RETURN: MappingService - A new mapping service instance. def get_mapping_service() -> MappingService: """Dependency injector for MappingService.""" return MappingService(config_manager) # [/DEF:get_mapping_service:Function] _clean_release_repository = CleanReleaseRepository() # [DEF:get_clean_release_repository:Function] # @COMPLEXITY: 1 # @PURPOSE: Legacy compatibility shim for CleanReleaseRepository. # @POST: Returns a shared CleanReleaseRepository instance. def get_clean_release_repository() -> CleanReleaseRepository: """Legacy compatibility shim for CleanReleaseRepository.""" return _clean_release_repository # [/DEF:get_clean_release_repository:Function] # [DEF:get_clean_release_facade:Function] # @COMPLEXITY: 1 # @PURPOSE: Dependency injector for CleanReleaseFacade. # @POST: Returns a facade instance with a fresh DB session. def get_clean_release_facade(db = Depends(get_db)) -> CleanReleaseFacade: candidate_repo = CandidateRepository(db) artifact_repo = ArtifactRepository(db) manifest_repo = ManifestRepository(db) policy_repo = PolicyRepository(db) compliance_repo = ComplianceRepository(db) report_repo = ReportRepository(db) approval_repo = ApprovalRepository(db) publication_repo = PublicationRepository(db) audit_repo = AuditRepository(db) return CleanReleaseFacade( candidate_repo=candidate_repo, artifact_repo=artifact_repo, manifest_repo=manifest_repo, policy_repo=policy_repo, compliance_repo=compliance_repo, report_repo=report_repo, approval_repo=approval_repo, publication_repo=publication_repo, audit_repo=audit_repo, config_manager=config_manager ) # [/DEF:get_clean_release_facade:Function] # [DEF:oauth2_scheme:Variable] # @COMPLEXITY: 1 # @PURPOSE: OAuth2 password bearer scheme for token extraction. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") # [/DEF:oauth2_scheme:Variable] # [DEF:get_current_user:Function] # @COMPLEXITY: 3 # @PURPOSE: Dependency for retrieving currently authenticated user from a JWT. # @PRE: JWT token provided in Authorization header. # @POST: Returns User object if token is valid. # @THROW: HTTPException 401 if token is invalid or user not found. # @PARAM: token (str) - Extracted JWT token. # @PARAM: db (Session) - Auth database session. # @RETURN: User - The authenticated user. def get_current_user(token: str = Depends(oauth2_scheme), db = Depends(get_auth_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = decode_token(token) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception repo = AuthRepository(db) user = repo.get_user_by_username(username) if user is None: raise credentials_exception return user # [/DEF:get_current_user:Function] # [DEF:has_permission:Function] # @COMPLEXITY: 3 # @PURPOSE: Dependency for checking if the current user has a specific permission. # @PRE: User is authenticated. # @POST: Returns True if user has permission. # @THROW: HTTPException 403 if permission is denied. # @PARAM: resource (str) - The resource identifier. # @PARAM: action (str) - The action identifier (READ, EXECUTE, WRITE). # @RETURN: User - The authenticated user if permission granted. def has_permission(resource: str, action: str): def permission_checker(current_user: User = Depends(get_current_user)): # Union of all permissions across all roles for role in current_user.roles: for perm in role.permissions: if perm.resource == resource and perm.action == action: return current_user # Special case for Admin role (full access) if any(role.name == "Admin" for role in current_user.roles): return current_user from .core.auth.logger import log_security_event log_security_event("PERMISSION_DENIED", current_user.username, {"resource": resource, "action": action}) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission denied for {resource}:{action}" ) return permission_checker # [/DEF:has_permission:Function] # [/DEF:AppDependencies:Module]