db + docker

This commit is contained in:
2026-02-20 20:47:39 +03:00
parent 01efc9dae1
commit d67d24e7e6
18 changed files with 2711 additions and 588 deletions

View File

@@ -0,0 +1,350 @@
# [DEF:backend.src.scripts.migrate_sqlite_to_postgres:Module]
#
# @SEMANTICS: migration, sqlite, postgresql, config, task_logs, task_records
# @PURPOSE: Migrates legacy config and task history from SQLite/file storage to PostgreSQL.
# @LAYER: Scripts
# @RELATION: READS_FROM -> backend/tasks.db
# @RELATION: READS_FROM -> backend/config.json
# @RELATION: WRITES_TO -> postgresql.task_records
# @RELATION: WRITES_TO -> postgresql.task_logs
# @RELATION: WRITES_TO -> postgresql.app_configurations
#
# @INVARIANT: Script is idempotent for task_records and app_configurations.
# [SECTION: IMPORTS]
import argparse
import json
import os
import sqlite3
from pathlib import Path
from typing import Any, Dict, Iterable, Optional
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from src.core.logger import belief_scope, logger
# [/SECTION]
# [DEF:Constants:Section]
DEFAULT_TARGET_URL = os.getenv(
"DATABASE_URL",
os.getenv("POSTGRES_URL", "postgresql+psycopg2://postgres:postgres@localhost:5432/ss_tools"),
)
# [/DEF:Constants:Section]
# [DEF:_json_load_if_needed:Function]
# @PURPOSE: Parses JSON-like values from SQLite TEXT/JSON columns to Python objects.
def _json_load_if_needed(value: Any) -> Any:
with belief_scope("_json_load_if_needed"):
if value is None:
return None
if isinstance(value, (dict, list)):
return value
if isinstance(value, str):
raw = value.strip()
if not raw:
return None
if raw[0] in "{[":
try:
return json.loads(raw)
except json.JSONDecodeError:
return value
return value
# [DEF:_find_legacy_config_path:Function]
# @PURPOSE: Resolves the existing legacy config.json path from candidates.
def _find_legacy_config_path(explicit_path: Optional[str]) -> Optional[Path]:
with belief_scope("_find_legacy_config_path"):
if explicit_path:
p = Path(explicit_path)
return p if p.exists() else None
candidates = [
Path("backend/config.json"),
Path("config.json"),
]
for candidate in candidates:
if candidate.exists():
return candidate
return None
# [DEF:_connect_sqlite:Function]
# @PURPOSE: Opens a SQLite connection with row factory.
def _connect_sqlite(path: Path) -> sqlite3.Connection:
with belief_scope("_connect_sqlite"):
conn = sqlite3.connect(str(path))
conn.row_factory = sqlite3.Row
return conn
# [DEF:_ensure_target_schema:Function]
# @PURPOSE: Ensures required PostgreSQL tables exist before migration.
def _ensure_target_schema(engine) -> None:
with belief_scope("_ensure_target_schema"):
stmts: Iterable[str] = (
"""
CREATE TABLE IF NOT EXISTS app_configurations (
id TEXT PRIMARY KEY,
payload JSONB NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
)
""",
"""
CREATE TABLE IF NOT EXISTS task_records (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
status TEXT NOT NULL,
environment_id TEXT NULL,
started_at TIMESTAMPTZ NULL,
finished_at TIMESTAMPTZ NULL,
logs JSONB NULL,
error TEXT NULL,
result JSONB NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
params JSONB NULL
)
""",
"""
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY,
task_id TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
level VARCHAR(16) NOT NULL,
source VARCHAR(64) NOT NULL DEFAULT 'system',
message TEXT NOT NULL,
metadata_json TEXT NULL,
CONSTRAINT fk_task_logs_task
FOREIGN KEY(task_id)
REFERENCES task_records(id)
ON DELETE CASCADE
)
""",
"CREATE INDEX IF NOT EXISTS ix_task_logs_task_timestamp ON task_logs (task_id, timestamp)",
"CREATE INDEX IF NOT EXISTS ix_task_logs_task_level ON task_logs (task_id, level)",
"CREATE INDEX IF NOT EXISTS ix_task_logs_task_source ON task_logs (task_id, source)",
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM pg_class WHERE relkind = 'S' AND relname = 'task_logs_id_seq'
) THEN
PERFORM 1;
ELSE
CREATE SEQUENCE task_logs_id_seq OWNED BY task_logs.id;
END IF;
END $$;
""",
"ALTER TABLE task_logs ALTER COLUMN id SET DEFAULT nextval('task_logs_id_seq')",
)
with engine.begin() as conn:
for stmt in stmts:
conn.execute(text(stmt))
# [DEF:_migrate_config:Function]
# @PURPOSE: Migrates legacy config.json into app_configurations(global).
def _migrate_config(engine, legacy_config_path: Optional[Path]) -> int:
with belief_scope("_migrate_config"):
if legacy_config_path is None:
logger.info("[_migrate_config][Action] No legacy config.json found, skipping")
return 0
payload = json.loads(legacy_config_path.read_text(encoding="utf-8"))
with engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO app_configurations (id, payload, updated_at)
VALUES ('global', CAST(:payload AS JSONB), NOW())
ON CONFLICT (id)
DO UPDATE SET payload = EXCLUDED.payload, updated_at = NOW()
"""
),
{"payload": json.dumps(payload, ensure_ascii=True)},
)
logger.info("[_migrate_config][Coherence:OK] Config migrated from %s", legacy_config_path)
return 1
# [DEF:_migrate_tasks_and_logs:Function]
# @PURPOSE: Migrates task_records and task_logs from SQLite into PostgreSQL.
def _migrate_tasks_and_logs(engine, sqlite_conn: sqlite3.Connection) -> Dict[str, int]:
with belief_scope("_migrate_tasks_and_logs"):
stats = {"task_records_total": 0, "task_records_inserted": 0, "task_logs_total": 0, "task_logs_inserted": 0}
rows = sqlite_conn.execute(
"""
SELECT id, type, status, environment_id, started_at, finished_at, logs, error, result, created_at, params
FROM task_records
ORDER BY created_at ASC
"""
).fetchall()
stats["task_records_total"] = len(rows)
with engine.begin() as conn:
existing_env_ids = {
row[0]
for row in conn.execute(text("SELECT id FROM environments")).fetchall()
}
for row in rows:
params_obj = _json_load_if_needed(row["params"])
result_obj = _json_load_if_needed(row["result"])
logs_obj = _json_load_if_needed(row["logs"])
environment_id = row["environment_id"]
if environment_id and environment_id not in existing_env_ids:
# Legacy task may reference environments that were not migrated; keep task row and drop FK value.
environment_id = None
res = conn.execute(
text(
"""
INSERT INTO task_records (
id, type, status, environment_id, started_at, finished_at,
logs, error, result, created_at, params
) VALUES (
:id, :type, :status, :environment_id, :started_at, :finished_at,
CAST(:logs AS JSONB), :error, CAST(:result AS JSONB), :created_at, CAST(:params AS JSONB)
)
ON CONFLICT (id) DO NOTHING
"""
),
{
"id": row["id"],
"type": row["type"],
"status": row["status"],
"environment_id": environment_id,
"started_at": row["started_at"],
"finished_at": row["finished_at"],
"logs": json.dumps(logs_obj, ensure_ascii=True) if logs_obj is not None else None,
"error": row["error"],
"result": json.dumps(result_obj, ensure_ascii=True) if result_obj is not None else None,
"created_at": row["created_at"],
"params": json.dumps(params_obj, ensure_ascii=True) if params_obj is not None else None,
},
)
if res.rowcount and res.rowcount > 0:
stats["task_records_inserted"] += int(res.rowcount)
log_rows = sqlite_conn.execute(
"""
SELECT id, task_id, timestamp, level, source, message, metadata_json
FROM task_logs
ORDER BY id ASC
"""
).fetchall()
stats["task_logs_total"] = len(log_rows)
with engine.begin() as conn:
for row in log_rows:
# Preserve original IDs to keep migration idempotent.
res = conn.execute(
text(
"""
INSERT INTO task_logs (id, task_id, timestamp, level, source, message, metadata_json)
VALUES (:id, :task_id, :timestamp, :level, :source, :message, :metadata_json)
ON CONFLICT (id) DO NOTHING
"""
),
{
"id": row["id"],
"task_id": row["task_id"],
"timestamp": row["timestamp"],
"level": row["level"],
"source": row["source"] or "system",
"message": row["message"],
"metadata_json": row["metadata_json"],
},
)
if res.rowcount and res.rowcount > 0:
stats["task_logs_inserted"] += int(res.rowcount)
# Ensure sequence is aligned after explicit id inserts.
conn.execute(
text(
"""
SELECT setval(
'task_logs_id_seq',
COALESCE((SELECT MAX(id) FROM task_logs), 1),
TRUE
)
"""
)
)
logger.info(
"[_migrate_tasks_and_logs][Coherence:OK] task_records=%s/%s task_logs=%s/%s",
stats["task_records_inserted"],
stats["task_records_total"],
stats["task_logs_inserted"],
stats["task_logs_total"],
)
return stats
# [DEF:run_migration:Function]
# @PURPOSE: Orchestrates migration from SQLite/file to PostgreSQL.
def run_migration(sqlite_path: Path, target_url: str, legacy_config_path: Optional[Path]) -> Dict[str, int]:
with belief_scope("run_migration"):
logger.info("[run_migration][Entry] sqlite=%s target=%s", sqlite_path, target_url)
if not sqlite_path.exists():
raise FileNotFoundError(f"SQLite source not found: {sqlite_path}")
sqlite_conn = _connect_sqlite(sqlite_path)
engine = create_engine(target_url, pool_pre_ping=True)
try:
_ensure_target_schema(engine)
config_upserted = _migrate_config(engine, legacy_config_path)
stats = _migrate_tasks_and_logs(engine, sqlite_conn)
stats["config_upserted"] = config_upserted
return stats
finally:
sqlite_conn.close()
# [DEF:main:Function]
# @PURPOSE: CLI entrypoint.
def main() -> int:
with belief_scope("main"):
parser = argparse.ArgumentParser(
description="Migrate legacy config.json and task logs from SQLite to PostgreSQL.",
)
parser.add_argument(
"--sqlite-path",
default="backend/tasks.db",
help="Path to source SQLite DB with task_records/task_logs (default: backend/tasks.db).",
)
parser.add_argument(
"--target-url",
default=DEFAULT_TARGET_URL,
help="Target PostgreSQL SQLAlchemy URL (default: DATABASE_URL/POSTGRES_URL env).",
)
parser.add_argument(
"--config-path",
default=None,
help="Optional path to legacy config.json (auto-detected when omitted).",
)
args = parser.parse_args()
sqlite_path = Path(args.sqlite_path)
legacy_config_path = _find_legacy_config_path(args.config_path)
try:
stats = run_migration(sqlite_path=sqlite_path, target_url=args.target_url, legacy_config_path=legacy_config_path)
print("Migration completed.")
print(json.dumps(stats, indent=2))
return 0
except (SQLAlchemyError, OSError, sqlite3.Error, ValueError) as e:
logger.error("[main][Coherence:Failed] Migration failed: %s", e)
print(f"Migration failed: {e}")
return 1
if __name__ == "__main__":
raise SystemExit(main())
# [/DEF:main:Function]
# [/DEF:backend.src.scripts.migrate_sqlite_to_postgres:Module]