# [DEF:TaskPersistenceModule:Module] # @SEMANTICS: persistence, sqlite, task, storage # @PURPOSE: Handles the persistence of tasks, specifically those awaiting user input, to a SQLite database. # @LAYER: Core # @RELATION: Used by TaskManager to save and load tasks. # @INVARIANT: Database schema must match the Task model structure. # @CONSTRAINT: Uses synchronous SQLite operations (blocking), should be used carefully. # [SECTION: IMPORTS] import sqlite3 import json from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any from .models import Task, TaskStatus from ..logger import logger, belief_scope # [/SECTION] # [DEF:TaskPersistenceService:Class] # @SEMANTICS: persistence, service, database # @PURPOSE: Provides methods to save and load tasks from a local SQLite database. class TaskPersistenceService: def __init__(self, db_path: Optional[Path] = None): if db_path is None: self.db_path = Path(__file__).parent.parent.parent.parent / "migrations.db" else: self.db_path = db_path self._ensure_db_exists() # [DEF:TaskPersistenceService._ensure_db_exists:Function] # @PURPOSE: Ensures the database directory and table exist. # @PRE: None. # @POST: Database file and table are created if they didn't exist. def _ensure_db_exists(self) -> None: with belief_scope("TaskPersistenceService._ensure_db_exists"): self.db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS persistent_tasks ( id TEXT PRIMARY KEY, plugin_id TEXT NOT NULL, status TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, input_request JSON, context JSON ) """) conn.commit() conn.close() # [/DEF:TaskPersistenceService._ensure_db_exists:Function] # [DEF:TaskPersistenceService.persist_tasks:Function] # @PURPOSE: Persists a list of tasks to the database. # @PRE: Tasks list contains valid Task objects. # @POST: Tasks matching the criteria (AWAITING_INPUT) are saved/updated in the DB. # @PARAM: tasks (List[Task]) - The list of tasks to check and persist. def persist_tasks(self, tasks: List[Task]) -> None: with belief_scope("TaskPersistenceService.persist_tasks"): conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() count = 0 for task in tasks: if task.status == TaskStatus.AWAITING_INPUT: cursor.execute(""" INSERT OR REPLACE INTO persistent_tasks (id, plugin_id, status, created_at, updated_at, input_request, context) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( task.id, task.plugin_id, task.status.value, task.started_at.isoformat() if task.started_at else datetime.utcnow().isoformat(), datetime.utcnow().isoformat(), json.dumps(task.input_request) if task.input_request else None, json.dumps(task.params) )) count += 1 conn.commit() conn.close() logger.info(f"Persisted {count} tasks awaiting input.") # [/DEF:TaskPersistenceService.persist_tasks:Function] # [DEF:TaskPersistenceService.load_tasks:Function] # @PURPOSE: Loads persisted tasks from the database. # @PRE: Database exists. # @POST: Returns a list of Task objects reconstructed from the DB. # @RETURN: List[Task] - The loaded tasks. def load_tasks(self) -> List[Task]: with belief_scope("TaskPersistenceService.load_tasks"): if not self.db_path.exists(): return [] conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() # Check if plugin_id column exists (migration for existing db) cursor.execute("PRAGMA table_info(persistent_tasks)") columns = [info[1] for info in cursor.fetchall()] has_plugin_id = "plugin_id" in columns if has_plugin_id: cursor.execute("SELECT id, plugin_id, status, created_at, input_request, context FROM persistent_tasks") else: cursor.execute("SELECT id, status, created_at, input_request, context FROM persistent_tasks") rows = cursor.fetchall() loaded_tasks = [] for row in rows: if has_plugin_id: task_id, plugin_id, status, created_at, input_request_json, context_json = row else: task_id, status, created_at, input_request_json, context_json = row plugin_id = "superset-migration" # Default fallback try: task = Task( id=task_id, plugin_id=plugin_id, status=TaskStatus(status), started_at=datetime.fromisoformat(created_at), input_required=True, input_request=json.loads(input_request_json) if input_request_json else None, params=json.loads(context_json) if context_json else {} ) loaded_tasks.append(task) except Exception as e: logger.error(f"Failed to load task {task_id}: {e}") conn.close() return loaded_tasks # [/DEF:TaskPersistenceService.load_tasks:Function] # [DEF:TaskPersistenceService.delete_tasks:Function] # @PURPOSE: Deletes specific tasks from the database. # @PARAM: task_ids (List[str]) - List of task IDs to delete. def delete_tasks(self, task_ids: List[str]) -> None: if not task_ids: return with belief_scope("TaskPersistenceService.delete_tasks"): conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() placeholders = ', '.join('?' for _ in task_ids) cursor.execute(f"DELETE FROM persistent_tasks WHERE id IN ({placeholders})", task_ids) conn.commit() conn.close() # [/DEF:TaskPersistenceService.delete_tasks:Function] # [/DEF:TaskPersistenceService:Class] # [/DEF:TaskPersistenceModule:Module]