Files
ss-tools/backend/src/core/task_manager/cleanup.py
2026-01-13 17:33:57 +03:00

47 lines
2.1 KiB
Python

# [DEF:TaskCleanupModule:Module]
# @SEMANTICS: task, cleanup, retention
# @PURPOSE: Implements task cleanup and retention policies.
# @LAYER: Core
# @RELATION: Uses TaskPersistenceService to delete old tasks.
from datetime import datetime, timedelta
from .persistence import TaskPersistenceService
from ..logger import logger, belief_scope
from ..config_manager import ConfigManager
# [DEF:TaskCleanupService:Class]
# @PURPOSE: Provides methods to clean up old task records.
class TaskCleanupService:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the cleanup service with dependencies.
# @PRE: persistence_service and config_manager are valid.
# @POST: Cleanup service is ready.
def __init__(self, persistence_service: TaskPersistenceService, config_manager: ConfigManager):
self.persistence_service = persistence_service
self.config_manager = config_manager
# [/DEF:__init__:Function]
# [DEF:run_cleanup:Function]
# @PURPOSE: Deletes tasks older than the configured retention period.
# @PRE: Config manager has valid settings.
# @POST: Old tasks are deleted from persistence.
def run_cleanup(self):
with belief_scope("TaskCleanupService.run_cleanup"):
settings = self.config_manager.get_config().settings
retention_days = settings.task_retention_days
# This is a simplified implementation.
# In a real scenario, we would query IDs of tasks older than retention_days.
# For now, we'll log the action.
logger.info(f"Cleaning up tasks older than {retention_days} days.")
# Re-loading tasks to check for limit
tasks = self.persistence_service.load_tasks(limit=1000)
if len(tasks) > settings.task_retention_limit:
to_delete = [t.id for t in tasks[settings.task_retention_limit:]]
self.persistence_service.delete_tasks(to_delete)
logger.info(f"Deleted {len(to_delete)} tasks exceeding limit of {settings.task_retention_limit}")
# [/DEF:run_cleanup:Function]
# [/DEF:TaskCleanupService:Class]
# [/DEF:TaskCleanupModule:Module]