# [DEF:SchedulerModule:Module] # @TIER: STANDARD # @SEMANTICS: scheduler, apscheduler, cron, backup # @PURPOSE: Manages scheduled tasks using APScheduler. # @LAYER: Core # @RELATION: Uses TaskManager to run scheduled backups. # [SECTION: IMPORTS] from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from .logger import logger, belief_scope from .config_manager import ConfigManager from .database import SessionLocal from ..models.llm import ValidationPolicy import asyncio from datetime import datetime, time, timedelta, date # [/SECTION] # [DEF:SchedulerService:Class] # @TIER: STANDARD # @SEMANTICS: scheduler, service, apscheduler # @PURPOSE: Provides a service to manage scheduled backup tasks. class SchedulerService: # [DEF:__init__:Function] # @PURPOSE: Initializes the scheduler service with task and config managers. # @PRE: task_manager and config_manager must be provided. # @POST: Scheduler instance is created but not started. def __init__(self, task_manager, config_manager: ConfigManager): with belief_scope("SchedulerService.__init__"): self.task_manager = task_manager self.config_manager = config_manager self.scheduler = BackgroundScheduler() self.loop = asyncio.get_event_loop() # [/DEF:__init__:Function] # [DEF:start:Function] # @PURPOSE: Starts the background scheduler and loads initial schedules. # @PRE: Scheduler should be initialized. # @POST: Scheduler is running and schedules are loaded. def start(self): with belief_scope("SchedulerService.start"): if not self.scheduler.running: self.scheduler.start() logger.info("Scheduler started.") self.load_schedules() # [/DEF:start:Function] # [DEF:stop:Function] # @PURPOSE: Stops the background scheduler. # @PRE: Scheduler should be running. # @POST: Scheduler is shut down. def stop(self): with belief_scope("SchedulerService.stop"): if self.scheduler.running: self.scheduler.shutdown() logger.info("Scheduler stopped.") # [/DEF:stop:Function] # [DEF:load_schedules:Function] # @PURPOSE: Loads backup schedules from configuration and registers them. # @PRE: config_manager must have valid configuration. # @POST: All enabled backup jobs are added to the scheduler. def load_schedules(self): with belief_scope("SchedulerService.load_schedules"): # Clear existing jobs self.scheduler.remove_all_jobs() config = self.config_manager.get_config() for env in config.environments: if env.backup_schedule and env.backup_schedule.enabled: self.add_backup_job(env.id, env.backup_schedule.cron_expression) # [/DEF:load_schedules:Function] # [DEF:add_backup_job:Function] # @PURPOSE: Adds a scheduled backup job for an environment. # @PRE: env_id and cron_expression must be valid strings. # @POST: A new job is added to the scheduler or replaced if it already exists. # @PARAM: env_id (str) - The ID of the environment. # @PARAM: cron_expression (str) - The cron expression for the schedule. def add_backup_job(self, env_id: str, cron_expression: str): with belief_scope("SchedulerService.add_backup_job", f"env_id={env_id}, cron={cron_expression}"): job_id = f"backup_{env_id}" try: self.scheduler.add_job( self._trigger_backup, CronTrigger.from_crontab(cron_expression), id=job_id, args=[env_id], replace_existing=True ) logger.info(f"Scheduled backup job added for environment {env_id}: {cron_expression}") except Exception as e: logger.error(f"Failed to add backup job for environment {env_id}: {e}") # [/DEF:add_backup_job:Function] # [DEF:_trigger_backup:Function] # @PURPOSE: Triggered by the scheduler to start a backup task. # @PRE: env_id must be a valid environment ID. # @POST: A new backup task is created in the task manager if not already running. # @PARAM: env_id (str) - The ID of the environment. def _trigger_backup(self, env_id: str): with belief_scope("SchedulerService._trigger_backup", f"env_id={env_id}"): logger.info(f"Triggering scheduled backup for environment {env_id}") # Check if a backup is already running for this environment active_tasks = self.task_manager.get_tasks(limit=100) for task in active_tasks: if (task.plugin_id == "superset-backup" and task.status in ["PENDING", "RUNNING"] and task.params.get("environment_id") == env_id): logger.warning(f"Backup already running for environment {env_id}. Skipping scheduled run.") return # Run the backup task # We need to run this in the event loop since create_task is async asyncio.run_coroutine_threadsafe( self.task_manager.create_task("superset-backup", {"environment_id": env_id}), self.loop ) # [/DEF:_trigger_backup:Function] # [/DEF:SchedulerService:Class] # [DEF:ThrottledSchedulerConfigurator:Class] # @TIER: CRITICAL # @SEMANTICS: scheduler, throttling, distribution # @PURPOSE: Distributes validation tasks evenly within an execution window. class ThrottledSchedulerConfigurator: # [DEF:calculate_schedule:Function] # @PURPOSE: Calculates execution times for N tasks within a window. # @PRE: window_start, window_end (time), dashboard_ids (List), current_date (date). # @POST: Returns List[datetime] of scheduled times. # @INVARIANT: Tasks are distributed with near-even spacing. @staticmethod def calculate_schedule( window_start: time, window_end: time, dashboard_ids: list, current_date: date ) -> list: with belief_scope("ThrottledSchedulerConfigurator.calculate_schedule"): n = len(dashboard_ids) if n == 0: return [] start_dt = datetime.combine(current_date, window_start) end_dt = datetime.combine(current_date, window_end) # Handle window crossing midnight if end_dt < start_dt: end_dt += timedelta(days=1) total_seconds = (end_dt - start_dt).total_seconds() # Minimum interval of 1 second to avoid division by zero or negative if total_seconds <= 0: logger.warning(f"[calculate_schedule] Window size is zero or negative. Falling back to start time for all {n} tasks.") return [start_dt] * n # If window is too small for even distribution (e.g. 10 tasks in 5 seconds), # we still distribute them but they might be very close. # The requirement says "near-even spacing". if n == 1: return [start_dt] interval = total_seconds / (n - 1) if n > 1 else 0 # If interval is too small (e.g. < 1s), we might want a fallback, # but the spec says "handle too-small windows with explicit fallback/warning". if interval < 1: logger.warning(f"[calculate_schedule] Window too small for {n} tasks (interval {interval:.2f}s). Tasks will be highly concentrated.") scheduled_times = [] for i in range(n): scheduled_times.append(start_dt + timedelta(seconds=i * interval)) return scheduled_times # [/DEF:calculate_schedule:Function] # [/DEF:ThrottledSchedulerConfigurator:Class] # [/DEF:SchedulerModule:Module]