feat(rbac): auto-sync permission catalog from declared route/plugin guards
This commit is contained in:
140
backend/src/services/__tests__/test_rbac_permission_catalog.py
Normal file
140
backend/src/services/__tests__/test_rbac_permission_catalog.py
Normal file
@@ -0,0 +1,140 @@
|
||||
# [DEF:backend.src.services.__tests__.test_rbac_permission_catalog:Module]
|
||||
# @TIER: STANDARD
|
||||
# @SEMANTICS: tests, rbac, permissions, catalog, discovery, sync
|
||||
# @PURPOSE: Verifies RBAC permission catalog discovery and idempotent synchronization behavior.
|
||||
# @LAYER: Service Tests
|
||||
# @RELATION: TESTS -> backend.src.services.rbac_permission_catalog
|
||||
# @INVARIANT: Synchronization adds only missing normalized permission pairs.
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import src.services.rbac_permission_catalog as catalog
|
||||
# [/SECTION: IMPORTS]
|
||||
|
||||
|
||||
# [DEF:test_discover_route_permissions_extracts_declared_pairs_and_ignores_tests:Function]
|
||||
# @PURPOSE: Ensures route-scanner extracts has_permission pairs from route files and skips __tests__.
|
||||
# @PRE: Temporary route directory contains route and test files.
|
||||
# @POST: Returned set includes production route permissions and excludes test-only declarations.
|
||||
def test_discover_route_permissions_extracts_declared_pairs_and_ignores_tests(tmp_path, monkeypatch):
|
||||
routes_dir = tmp_path / "routes"
|
||||
routes_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
(routes_dir / "dashboards.py").write_text(
|
||||
'\n'.join(
|
||||
[
|
||||
'_ = Depends(has_permission("plugin:migration", "READ"))',
|
||||
'_ = Depends(has_permission("plugin:migration", "EXECUTE"))',
|
||||
'_ = Depends(has_permission("tasks", "WRITE"))',
|
||||
]
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
tests_dir = routes_dir / "__tests__"
|
||||
tests_dir.mkdir(parents=True, exist_ok=True)
|
||||
(tests_dir / "test_fake.py").write_text(
|
||||
'_ = Depends(has_permission("plugin:ignored", "READ"))',
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
monkeypatch.setattr(catalog, "ROUTES_DIR", routes_dir)
|
||||
|
||||
discovered = catalog._discover_route_permissions()
|
||||
|
||||
assert ("plugin:migration", "READ") in discovered
|
||||
assert ("plugin:migration", "EXECUTE") in discovered
|
||||
assert ("tasks", "WRITE") in discovered
|
||||
assert ("plugin:ignored", "READ") not in discovered
|
||||
# [/DEF:test_discover_route_permissions_extracts_declared_pairs_and_ignores_tests:Function]
|
||||
|
||||
|
||||
# [DEF:test_discover_declared_permissions_unions_route_and_plugin_permissions:Function]
|
||||
# @PURPOSE: Ensures full catalog includes route-level permissions plus dynamic plugin EXECUTE rights.
|
||||
# @PRE: Route discovery and plugin loader both return permission sources.
|
||||
# @POST: Result set contains union of both sources.
|
||||
def test_discover_declared_permissions_unions_route_and_plugin_permissions(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
catalog,
|
||||
"_discover_route_permissions",
|
||||
lambda: {("tasks", "READ"), ("plugin:migration", "READ")},
|
||||
)
|
||||
|
||||
plugin_loader = MagicMock()
|
||||
plugin_loader.get_all_plugin_configs.return_value = [
|
||||
SimpleNamespace(id="superset-backup"),
|
||||
SimpleNamespace(id="llm_dashboard_validation"),
|
||||
]
|
||||
|
||||
discovered = catalog.discover_declared_permissions(plugin_loader=plugin_loader)
|
||||
|
||||
assert ("tasks", "READ") in discovered
|
||||
assert ("plugin:migration", "READ") in discovered
|
||||
assert ("plugin:superset-backup", "EXECUTE") in discovered
|
||||
assert ("plugin:llm_dashboard_validation", "EXECUTE") in discovered
|
||||
# [/DEF:test_discover_declared_permissions_unions_route_and_plugin_permissions:Function]
|
||||
|
||||
|
||||
# [DEF:test_sync_permission_catalog_inserts_only_missing_normalized_pairs:Function]
|
||||
# @PURPOSE: Ensures synchronization inserts only missing pairs and normalizes action/resource tokens.
|
||||
# @PRE: DB already contains subset of permissions.
|
||||
# @POST: Only missing normalized pairs are inserted and commit is executed once.
|
||||
def test_sync_permission_catalog_inserts_only_missing_normalized_pairs():
|
||||
db = MagicMock()
|
||||
db.query.return_value.all.return_value = [
|
||||
SimpleNamespace(resource="tasks", action="READ"),
|
||||
SimpleNamespace(resource="plugin:migration", action="EXECUTE"),
|
||||
]
|
||||
|
||||
declared_permissions = {
|
||||
("tasks", "read"),
|
||||
("plugin:migration", "execute"),
|
||||
("plugin:migration", "READ"),
|
||||
("", "WRITE"),
|
||||
("plugin:migration", ""),
|
||||
}
|
||||
|
||||
inserted_count = catalog.sync_permission_catalog(
|
||||
db=db,
|
||||
declared_permissions=declared_permissions,
|
||||
)
|
||||
|
||||
assert inserted_count == 1
|
||||
assert db.add.call_count == 1
|
||||
inserted_permission = db.add.call_args[0][0]
|
||||
assert inserted_permission.resource == "plugin:migration"
|
||||
assert inserted_permission.action == "READ"
|
||||
db.commit.assert_called_once()
|
||||
# [/DEF:test_sync_permission_catalog_inserts_only_missing_normalized_pairs:Function]
|
||||
|
||||
|
||||
# [DEF:test_sync_permission_catalog_is_noop_when_all_permissions_exist:Function]
|
||||
# @PURPOSE: Ensures synchronization is idempotent when all declared pairs already exist.
|
||||
# @PRE: DB contains full declared permission set.
|
||||
# @POST: No inserts are added and commit is not called.
|
||||
def test_sync_permission_catalog_is_noop_when_all_permissions_exist():
|
||||
db = MagicMock()
|
||||
db.query.return_value.all.return_value = [
|
||||
SimpleNamespace(resource="tasks", action="READ"),
|
||||
SimpleNamespace(resource="plugin:migration", action="READ"),
|
||||
]
|
||||
|
||||
declared_permissions = {
|
||||
("tasks", "READ"),
|
||||
("plugin:migration", "READ"),
|
||||
}
|
||||
|
||||
inserted_count = catalog.sync_permission_catalog(
|
||||
db=db,
|
||||
declared_permissions=declared_permissions,
|
||||
)
|
||||
|
||||
assert inserted_count == 0
|
||||
db.add.assert_not_called()
|
||||
db.commit.assert_not_called()
|
||||
# [/DEF:test_sync_permission_catalog_is_noop_when_all_permissions_exist:Function]
|
||||
|
||||
|
||||
# [/DEF:backend.src.services.__tests__.test_rbac_permission_catalog:Module]
|
||||
Reference in New Issue
Block a user