Files
ss-tools/backend/src/core/database.py

253 lines
9.3 KiB
Python

# [DEF:backend.src.core.database:Module]
#
# @TIER: STANDARD
# @SEMANTICS: database, postgresql, sqlalchemy, session, persistence
# @PURPOSE: Configures database connection and session management (PostgreSQL-first).
# @LAYER: Core
# @RELATION: DEPENDS_ON -> sqlalchemy
# @RELATION: DEPENDS_ON -> backend.src.models.mapping
# @RELATION: DEPENDS_ON -> backend.src.core.auth.config
#
# @INVARIANT: A single engine instance is used for the entire application.
# [SECTION: IMPORTS]
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.orm import sessionmaker
from ..models.mapping import Base
# Import models to ensure they're registered with Base
from ..models import task as _task_models # noqa: F401
from ..models import auth as _auth_models # noqa: F401
from ..models import config as _config_models # noqa: F401
from ..models import llm as _llm_models # noqa: F401
from ..models import assistant as _assistant_models # noqa: F401
from ..models import profile as _profile_models # noqa: F401
from .logger import belief_scope, logger
from .auth.config import auth_config
import os
from pathlib import Path
# [/SECTION]
# [DEF:BASE_DIR:Variable]
# @PURPOSE: Base directory for the backend.
BASE_DIR = Path(__file__).resolve().parent.parent.parent
# [/DEF:BASE_DIR:Variable]
# [DEF:DATABASE_URL:Constant]
# @PURPOSE: URL for the main application database.
DEFAULT_POSTGRES_URL = os.getenv(
"POSTGRES_URL",
"postgresql+psycopg2://postgres:postgres@localhost:5432/ss_tools",
)
DATABASE_URL = os.getenv("DATABASE_URL", DEFAULT_POSTGRES_URL)
# [/DEF:DATABASE_URL:Constant]
# [DEF:TASKS_DATABASE_URL:Constant]
# @PURPOSE: URL for the tasks execution database.
# Defaults to DATABASE_URL to keep task logs in the same PostgreSQL instance.
TASKS_DATABASE_URL = os.getenv("TASKS_DATABASE_URL", DATABASE_URL)
# [/DEF:TASKS_DATABASE_URL:Constant]
# [DEF:AUTH_DATABASE_URL:Constant]
# @PURPOSE: URL for the authentication database.
AUTH_DATABASE_URL = os.getenv("AUTH_DATABASE_URL", auth_config.AUTH_DATABASE_URL)
# [/DEF:AUTH_DATABASE_URL:Constant]
# [DEF:engine:Variable]
def _build_engine(db_url: str):
with belief_scope("_build_engine"):
if db_url.startswith("sqlite"):
return create_engine(db_url, connect_args={"check_same_thread": False})
return create_engine(db_url, pool_pre_ping=True)
# @PURPOSE: SQLAlchemy engine for mappings database.
engine = _build_engine(DATABASE_URL)
# [/DEF:engine:Variable]
# [DEF:tasks_engine:Variable]
# @PURPOSE: SQLAlchemy engine for tasks database.
tasks_engine = _build_engine(TASKS_DATABASE_URL)
# [/DEF:tasks_engine:Variable]
# [DEF:auth_engine:Variable]
# @PURPOSE: SQLAlchemy engine for authentication database.
auth_engine = _build_engine(AUTH_DATABASE_URL)
# [/DEF:auth_engine:Variable]
# [DEF:SessionLocal:Class]
# @TIER: TRIVIAL
# @PURPOSE: A session factory for the main mappings database.
# @PRE: engine is initialized.
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# [/DEF:SessionLocal:Class]
# [DEF:TasksSessionLocal:Class]
# @TIER: TRIVIAL
# @PURPOSE: A session factory for the tasks execution database.
# @PRE: tasks_engine is initialized.
TasksSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=tasks_engine)
# [/DEF:TasksSessionLocal:Class]
# [DEF:AuthSessionLocal:Class]
# @TIER: TRIVIAL
# @PURPOSE: A session factory for the authentication database.
# @PRE: auth_engine is initialized.
AuthSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=auth_engine)
# [/DEF:AuthSessionLocal:Class]
# [DEF:_ensure_user_dashboard_preferences_columns:Function]
# @PURPOSE: Applies additive schema upgrades for user_dashboard_preferences table.
# @PRE: bind_engine points to application database where profile table is stored.
# @POST: Missing columns are added without data loss.
def _ensure_user_dashboard_preferences_columns(bind_engine):
with belief_scope("_ensure_user_dashboard_preferences_columns"):
table_name = "user_dashboard_preferences"
inspector = inspect(bind_engine)
if table_name not in inspector.get_table_names():
return
existing_columns = {
str(column.get("name") or "").strip()
for column in inspector.get_columns(table_name)
}
alter_statements = []
if "git_username" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences ADD COLUMN git_username VARCHAR"
)
if "git_email" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences ADD COLUMN git_email VARCHAR"
)
if "git_personal_access_token_encrypted" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences "
"ADD COLUMN git_personal_access_token_encrypted VARCHAR"
)
if "start_page" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences "
"ADD COLUMN start_page VARCHAR NOT NULL DEFAULT 'dashboards'"
)
if "auto_open_task_drawer" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences "
"ADD COLUMN auto_open_task_drawer BOOLEAN NOT NULL DEFAULT TRUE"
)
if "dashboards_table_density" not in existing_columns:
alter_statements.append(
"ALTER TABLE user_dashboard_preferences "
"ADD COLUMN dashboards_table_density VARCHAR NOT NULL DEFAULT 'comfortable'"
)
if not alter_statements:
return
try:
with bind_engine.begin() as connection:
for statement in alter_statements:
connection.execute(text(statement))
except Exception as migration_error:
logger.warning(
"[database][EXPLORE] Profile preference additive migration failed: %s",
migration_error,
)
# [/DEF:_ensure_user_dashboard_preferences_columns:Function]
# [DEF:_ensure_git_server_configs_columns:Function]
# @PURPOSE: Applies additive schema upgrades for git_server_configs table.
# @PRE: bind_engine points to application database.
# @POST: Missing columns are added without data loss.
def _ensure_git_server_configs_columns(bind_engine):
with belief_scope("_ensure_git_server_configs_columns"):
table_name = "git_server_configs"
inspector = inspect(bind_engine)
if table_name not in inspector.get_table_names():
return
existing_columns = {
str(column.get("name") or "").strip()
for column in inspector.get_columns(table_name)
}
alter_statements = []
if "default_branch" not in existing_columns:
alter_statements.append(
"ALTER TABLE git_server_configs ADD COLUMN default_branch VARCHAR NOT NULL DEFAULT 'main'"
)
if not alter_statements:
return
try:
with bind_engine.begin() as connection:
for statement in alter_statements:
connection.execute(text(statement))
except Exception as migration_error:
logger.warning(
"[database][EXPLORE] GitServerConfig preference additive migration failed: %s",
migration_error,
)
# [/DEF:_ensure_git_server_configs_columns:Function]
# [DEF:init_db:Function]
# @PURPOSE: Initializes the database by creating all tables.
# @PRE: engine, tasks_engine and auth_engine are initialized.
# @POST: Database tables created in all databases.
# @SIDE_EFFECT: Creates physical database files if they don't exist.
def init_db():
with belief_scope("init_db"):
Base.metadata.create_all(bind=engine)
Base.metadata.create_all(bind=tasks_engine)
Base.metadata.create_all(bind=auth_engine)
_ensure_user_dashboard_preferences_columns(engine)
_ensure_git_server_configs_columns(engine)
# [/DEF:init_db:Function]
# [DEF:get_db:Function]
# @PURPOSE: Dependency for getting a database session.
# @PRE: SessionLocal is initialized.
# @POST: Session is closed after use.
# @RETURN: Generator[Session, None, None]
def get_db():
with belief_scope("get_db"):
db = SessionLocal()
try:
yield db
finally:
db.close()
# [/DEF:get_db:Function]
# [DEF:get_tasks_db:Function]
# @PURPOSE: Dependency for getting a tasks database session.
# @PRE: TasksSessionLocal is initialized.
# @POST: Session is closed after use.
# @RETURN: Generator[Session, None, None]
def get_tasks_db():
with belief_scope("get_tasks_db"):
db = TasksSessionLocal()
try:
yield db
finally:
db.close()
# [/DEF:get_tasks_db:Function]
# [DEF:get_auth_db:Function]
# @PURPOSE: Dependency for getting an authentication database session.
# @PRE: AuthSessionLocal is initialized.
# @POST: Session is closed after use.
# @RETURN: Generator[Session, None, None]
def get_auth_db():
with belief_scope("get_auth_db"):
db = AuthSessionLocal()
try:
yield db
finally:
db.close()
# [/DEF:get_auth_db:Function]
# [/DEF:backend.src.core.database:Module]