security: rotate bootstrap and clean workspace
This commit is contained in:
@@ -8,8 +8,10 @@
|
||||
# @INVARIANT: Safe to run multiple times (idempotent).
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
# Add src to path
|
||||
sys.path.append(str(Path(__file__).parent.parent.parent))
|
||||
@@ -19,6 +21,41 @@ from src.core.logger import logger, belief_scope
|
||||
from src.scripts.seed_permissions import seed_permissions
|
||||
# [/SECTION]
|
||||
|
||||
ENV_FILE_PATH = Path(__file__).resolve().parents[2] / ".env"
|
||||
|
||||
|
||||
# [DEF:ensure_encryption_key:Function]
|
||||
# @PURPOSE: Ensure backend runtime has a persistent Fernet encryption key during first-time installation.
|
||||
# @PRE: Backend root is writable or ENCRYPTION_KEY is already provided via environment.
|
||||
# @POST: ENCRYPTION_KEY exists in process environment or backend/.env.
|
||||
def ensure_encryption_key(env_file_path: Path = ENV_FILE_PATH) -> str:
|
||||
existing_key = os.getenv("ENCRYPTION_KEY", "").strip()
|
||||
if existing_key:
|
||||
Fernet(existing_key.encode())
|
||||
logger.info("ENCRYPTION_KEY already provided via environment; skipping generation.")
|
||||
return existing_key
|
||||
|
||||
if env_file_path.exists():
|
||||
for raw_line in env_file_path.read_text(encoding="utf-8").splitlines():
|
||||
if raw_line.startswith("ENCRYPTION_KEY="):
|
||||
persisted_key = raw_line.partition("=")[2].strip()
|
||||
if persisted_key:
|
||||
Fernet(persisted_key.encode())
|
||||
os.environ["ENCRYPTION_KEY"] = persisted_key
|
||||
logger.info(f"Loaded existing ENCRYPTION_KEY from {env_file_path}.")
|
||||
return persisted_key
|
||||
|
||||
generated_key = Fernet.generate_key().decode()
|
||||
with env_file_path.open("a", encoding="utf-8") as env_file:
|
||||
if env_file.tell() > 0:
|
||||
env_file.write("\n")
|
||||
env_file.write(f"ENCRYPTION_KEY={generated_key}\n")
|
||||
|
||||
os.environ["ENCRYPTION_KEY"] = generated_key
|
||||
logger.info(f"Generated ENCRYPTION_KEY and persisted it to {env_file_path}.")
|
||||
return generated_key
|
||||
# [/DEF:ensure_encryption_key:Function]
|
||||
|
||||
# [DEF:run_init:Function]
|
||||
# @PURPOSE: Main entry point for the initialization script.
|
||||
# @POST: auth.db is initialized with the correct schema and seeded permissions.
|
||||
@@ -26,6 +63,7 @@ def run_init():
|
||||
with belief_scope("init_auth_db"):
|
||||
logger.info("Initializing authentication database...")
|
||||
try:
|
||||
ensure_encryption_key()
|
||||
init_db()
|
||||
logger.info("Authentication database initialized successfully.")
|
||||
|
||||
@@ -40,4 +78,4 @@ def run_init():
|
||||
if __name__ == "__main__":
|
||||
run_init()
|
||||
|
||||
# [/DEF:backend.src.scripts.init_auth_db:Module]
|
||||
# [/DEF:backend.src.scripts.init_auth_db:Module]
|
||||
|
||||
@@ -26,8 +26,7 @@ class TestEncryptionManager:
|
||||
"""Construct EncryptionManager directly using Fernet (avoids relative import chain)."""
|
||||
# Re-implement the same logic as EncryptionManager to avoid import issues
|
||||
# with the llm_provider module's relative imports
|
||||
import os
|
||||
key = os.getenv("ENCRYPTION_KEY", "REMOVED_HISTORICAL_SECRET_DO_NOT_USE").encode()
|
||||
key = Fernet.generate_key()
|
||||
fernet = Fernet(key)
|
||||
|
||||
class EncryptionManager:
|
||||
@@ -99,6 +98,18 @@ class TestEncryptionManager:
|
||||
assert decrypted == ""
|
||||
# [/DEF:test_encrypt_empty_string:Function]
|
||||
|
||||
# [DEF:test_missing_key_fails_fast:Function]
|
||||
# @PURPOSE: Missing ENCRYPTION_KEY must abort initialization instead of using a fallback secret.
|
||||
# @PRE: ENCRYPTION_KEY is unset.
|
||||
# @POST: RuntimeError raised during EncryptionManager construction.
|
||||
def test_missing_key_fails_fast(self):
|
||||
from src.services.llm_provider import EncryptionManager
|
||||
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
with pytest.raises(RuntimeError, match="ENCRYPTION_KEY must be set"):
|
||||
EncryptionManager()
|
||||
# [/DEF:test_missing_key_fails_fast:Function]
|
||||
|
||||
# [DEF:test_custom_key_roundtrip:Function]
|
||||
# @PURPOSE: Custom Fernet key produces valid roundtrip.
|
||||
# @PRE: Generated Fernet key.
|
||||
|
||||
@@ -6,18 +6,35 @@
|
||||
# @RELATION: DEPENDS_ON -> backend.src.core.database
|
||||
# @RELATION: DEPENDS_ON -> backend.src.models.llm
|
||||
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, TYPE_CHECKING
|
||||
from sqlalchemy.orm import Session
|
||||
from ..models.llm import LLMProvider
|
||||
from ..plugins.llm_analysis.models import LLMProviderConfig
|
||||
from ..core.logger import belief_scope, logger
|
||||
from cryptography.fernet import Fernet
|
||||
import os
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..plugins.llm_analysis.models import LLMProviderConfig
|
||||
|
||||
# [DEF:_require_fernet_key:Function]
|
||||
# @TIER: CRITICAL
|
||||
# @PURPOSE: Load and validate the Fernet key used for secret encryption.
|
||||
# @PRE: ENCRYPTION_KEY environment variable must be set to a valid Fernet key.
|
||||
# @POST: Returns validated key bytes ready for Fernet initialization.
|
||||
def _require_fernet_key() -> bytes:
|
||||
raw_key = os.getenv("ENCRYPTION_KEY", "").strip()
|
||||
if not raw_key:
|
||||
raise RuntimeError("ENCRYPTION_KEY must be set to a valid Fernet key")
|
||||
|
||||
key = raw_key.encode()
|
||||
Fernet(key)
|
||||
return key
|
||||
# [/DEF:_require_fernet_key:Function]
|
||||
|
||||
# [DEF:EncryptionManager:Class]
|
||||
# @TIER: CRITICAL
|
||||
# @PURPOSE: Handles encryption and decryption of sensitive data like API keys.
|
||||
# @INVARIANT: Uses a secret key from environment or a default one (fallback only for dev).
|
||||
# @INVARIANT: Uses only a validated secret key from environment.
|
||||
#
|
||||
# @TEST_CONTRACT: EncryptionManagerModel ->
|
||||
# {
|
||||
@@ -33,10 +50,10 @@ import os
|
||||
class EncryptionManager:
|
||||
# [DEF:EncryptionManager.__init__:Function]
|
||||
# @PURPOSE: Initialize the encryption manager with a Fernet key.
|
||||
# @PRE: ENCRYPTION_KEY env var must be set or use default dev key.
|
||||
# @PRE: ENCRYPTION_KEY env var must be set to a valid Fernet key.
|
||||
# @POST: Fernet instance ready for encryption/decryption.
|
||||
def __init__(self):
|
||||
self.key = os.getenv("ENCRYPTION_KEY", "REMOVED_HISTORICAL_SECRET_DO_NOT_USE").encode()
|
||||
self.key = _require_fernet_key()
|
||||
self.fernet = Fernet(self.key)
|
||||
# [/DEF:EncryptionManager.__init__:Function]
|
||||
|
||||
@@ -97,7 +114,7 @@ class LLMProviderService:
|
||||
# @PURPOSE: Creates a new LLM provider with encrypted API key.
|
||||
# @PRE: config must contain valid provider configuration.
|
||||
# @POST: New provider created and persisted to database.
|
||||
def create_provider(self, config: LLMProviderConfig) -> LLMProvider:
|
||||
def create_provider(self, config: "LLMProviderConfig") -> LLMProvider:
|
||||
with belief_scope("create_provider"):
|
||||
encrypted_key = self.encryption.encrypt(config.api_key)
|
||||
db_provider = LLMProvider(
|
||||
@@ -119,7 +136,7 @@ class LLMProviderService:
|
||||
# @PURPOSE: Updates an existing LLM provider.
|
||||
# @PRE: provider_id must exist, config must be valid.
|
||||
# @POST: Provider updated and persisted to database.
|
||||
def update_provider(self, provider_id: str, config: LLMProviderConfig) -> Optional[LLMProvider]:
|
||||
def update_provider(self, provider_id: str, config: "LLMProviderConfig") -> Optional[LLMProvider]:
|
||||
with belief_scope("update_provider"):
|
||||
db_provider = self.get_provider(provider_id)
|
||||
if not db_provider:
|
||||
@@ -180,4 +197,4 @@ class LLMProviderService:
|
||||
|
||||
# [/DEF:LLMProviderService:Class]
|
||||
|
||||
# [/DEF:backend.src.services.llm_provider:Module]
|
||||
# [/DEF:backend.src.services.llm_provider:Module]
|
||||
|
||||
Reference in New Issue
Block a user