Files
ss-tools/backend/tests/test_auth.py

237 lines
8.1 KiB
Python

import sys
from pathlib import Path
# Add src to path
sys.path.append(str(Path(__file__).parent.parent / "src"))
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from cryptography.fernet import Fernet
from src.core.database import Base
from src.models.auth import User, Role, Permission, ADGroupMapping
from src.services.auth_service import AuthService
from src.core.auth.repository import AuthRepository
from src.core.auth.security import verify_password, get_password_hash
from src.scripts.create_admin import create_admin
from src.scripts.init_auth_db import ensure_encryption_key
# Create in-memory SQLite database for testing
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create all tables
Base.metadata.create_all(bind=engine)
@pytest.fixture
def db_session():
"""Create a new database session with a transaction, rollback after test"""
connection = engine.connect()
transaction = connection.begin()
session = TestingSessionLocal(bind=connection)
yield session
session.close()
transaction.rollback()
connection.close()
@pytest.fixture
def auth_service(db_session):
return AuthService(db_session)
@pytest.fixture
def auth_repo(db_session):
return AuthRepository(db_session)
def test_create_user(auth_repo):
"""Test user creation"""
user = User(
username="testuser",
email="test@example.com",
password_hash=get_password_hash("testpassword123"),
auth_source="LOCAL"
)
auth_repo.db.add(user)
auth_repo.db.commit()
retrieved_user = auth_repo.get_user_by_username("testuser")
assert retrieved_user is not None
assert retrieved_user.username == "testuser"
assert retrieved_user.email == "test@example.com"
assert verify_password("testpassword123", retrieved_user.password_hash)
def test_authenticate_user(auth_service, auth_repo):
"""Test user authentication with valid and invalid credentials"""
user = User(
username="testuser",
email="test@example.com",
password_hash=get_password_hash("testpassword123"),
auth_source="LOCAL"
)
auth_repo.db.add(user)
auth_repo.db.commit()
# Test valid credentials
authenticated_user = auth_service.authenticate_user("testuser", "testpassword123")
assert authenticated_user is not None
assert authenticated_user.username == "testuser"
# Test invalid password
invalid_user = auth_service.authenticate_user("testuser", "wrongpassword")
assert invalid_user is None
# Test invalid username
invalid_user = auth_service.authenticate_user("nonexistent", "testpassword123")
assert invalid_user is None
def test_create_session(auth_service, auth_repo):
"""Test session token creation"""
user = User(
username="testuser",
email="test@example.com",
password_hash=get_password_hash("testpassword123"),
auth_source="LOCAL"
)
auth_repo.db.add(user)
auth_repo.db.commit()
session = auth_service.create_session(user)
assert "access_token" in session
assert "token_type" in session
assert session["token_type"] == "bearer"
assert len(session["access_token"]) > 0
def test_role_permission_association(auth_repo):
"""Test role and permission association"""
role = Role(name="Admin", description="System administrator")
perm1 = Permission(resource="admin:users", action="READ")
perm2 = Permission(resource="admin:users", action="WRITE")
role.permissions.extend([perm1, perm2])
auth_repo.db.add(role)
auth_repo.db.commit()
retrieved_role = auth_repo.get_role_by_name("Admin")
assert retrieved_role is not None
assert len(retrieved_role.permissions) == 2
permissions = [f"{p.resource}:{p.action}" for p in retrieved_role.permissions]
assert "admin:users:READ" in permissions
assert "admin:users:WRITE" in permissions
def test_user_role_association(auth_repo):
"""Test user and role association"""
role = Role(name="Admin", description="System administrator")
user = User(
username="adminuser",
email="admin@example.com",
password_hash=get_password_hash("adminpass123"),
auth_source="LOCAL"
)
user.roles.append(role)
auth_repo.db.add(role)
auth_repo.db.add(user)
auth_repo.db.commit()
retrieved_user = auth_repo.get_user_by_username("adminuser")
assert retrieved_user is not None
assert len(retrieved_user.roles) == 1
assert retrieved_user.roles[0].name == "Admin"
def test_ad_group_mapping(auth_repo):
"""Test AD group mapping"""
role = Role(name="ADFS_Admin", description="ADFS administrators")
auth_repo.db.add(role)
auth_repo.db.commit()
mapping = ADGroupMapping(ad_group="DOMAIN\\ADFS_Admins", role_id=role.id)
auth_repo.db.add(mapping)
auth_repo.db.commit()
retrieved_mapping = auth_repo.db.query(ADGroupMapping).filter_by(ad_group="DOMAIN\\ADFS_Admins").first()
assert retrieved_mapping is not None
assert retrieved_mapping.role_id == role.id
def test_create_admin_creates_user_with_optional_email(monkeypatch, db_session):
"""Test bootstrap admin creation stores optional email and Admin role"""
monkeypatch.setattr("src.scripts.create_admin.AuthSessionLocal", lambda: db_session)
result = create_admin("bootstrap-admin", "bootstrap-pass", "admin@example.com")
created_user = db_session.query(User).filter(User.username == "bootstrap-admin").first()
assert result == "created"
assert created_user is not None
assert created_user.email == "admin@example.com"
assert created_user.roles[0].name == "Admin"
def test_create_admin_is_idempotent_for_existing_user(monkeypatch, db_session):
"""Test bootstrap admin creation preserves existing user on repeated runs"""
monkeypatch.setattr("src.scripts.create_admin.AuthSessionLocal", lambda: db_session)
first_result = create_admin("bootstrap-admin-2", "bootstrap-pass")
second_result = create_admin("bootstrap-admin-2", "new-password", "changed@example.com")
created_user = db_session.query(User).filter(User.username == "bootstrap-admin-2").first()
assert first_result == "created"
assert second_result == "exists"
assert created_user is not None
assert created_user.email is None
assert verify_password("bootstrap-pass", created_user.password_hash)
assert not verify_password("new-password", created_user.password_hash)
def test_ensure_encryption_key_generates_backend_env_file(monkeypatch, tmp_path):
"""Test first-time initialization generates and persists a Fernet key."""
env_file = tmp_path / ".env"
monkeypatch.delenv("ENCRYPTION_KEY", raising=False)
generated_key = ensure_encryption_key(env_file)
assert generated_key
assert env_file.exists()
assert env_file.read_text(encoding="utf-8").strip() == f"ENCRYPTION_KEY={generated_key}"
assert verify_fernet_key(generated_key)
def test_ensure_encryption_key_reuses_existing_env_file_value(monkeypatch, tmp_path):
"""Test persisted key is reused without rewriting file contents."""
env_file = tmp_path / ".env"
existing_key = Fernet.generate_key().decode()
env_file.write_text(f"ENCRYPTION_KEY={existing_key}\nOTHER=value\n", encoding="utf-8")
monkeypatch.delenv("ENCRYPTION_KEY", raising=False)
reused_key = ensure_encryption_key(env_file)
assert reused_key == existing_key
assert env_file.read_text(encoding="utf-8") == f"ENCRYPTION_KEY={existing_key}\nOTHER=value\n"
def test_ensure_encryption_key_prefers_process_environment(monkeypatch, tmp_path):
"""Test explicit process environment has priority over file generation."""
env_file = tmp_path / ".env"
runtime_key = Fernet.generate_key().decode()
monkeypatch.setenv("ENCRYPTION_KEY", runtime_key)
resolved_key = ensure_encryption_key(env_file)
assert resolved_key == runtime_key
assert not env_file.exists()
def verify_fernet_key(value: str) -> bool:
Fernet(value.encode())
return True