subagents

This commit is contained in:
2026-03-20 17:20:24 +03:00
parent b89b9a66f2
commit 1149e8df1d
36 changed files with 4313 additions and 327 deletions

View File

@@ -299,6 +299,16 @@ def _make_us3_session():
# [/DEF:_make_us3_session:Function]
# [DEF:_make_preview_ready_session:Function]
def _make_preview_ready_session():
session = _make_us3_session()
session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY
session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW
session.current_phase = SessionPhase.PREVIEW
return session
# [/DEF:_make_preview_ready_session:Function]
# [DEF:dataset_review_api_dependencies:Function]
@pytest.fixture(autouse=True)
def dataset_review_api_dependencies():
@@ -605,7 +615,11 @@ def test_orchestrator_start_session_bootstraps_recovery_state(dataset_review_api
"filter_name": "country",
"display_name": "Country",
"raw_value": ["DE"],
"normalized_value": ["DE"],
"normalized_value": {
"filter_clauses": [{"col": "country_code", "op": "IN", "val": ["DE"]}],
"extra_form_data": {"filters": [{"col": "country_code", "op": "IN", "val": ["DE"]}]},
"value_origin": "extra_form_data.filters",
},
"source": "superset_url",
"confidence_state": "imported",
"requires_confirmation": False,
@@ -650,6 +664,11 @@ def test_orchestrator_start_session_bootstraps_recovery_state(dataset_review_api
saved_mappings = repository.save_recovery_state.call_args.args[4]
assert len(saved_filters) == 1
assert saved_filters[0].filter_name == "country"
assert saved_filters[0].normalized_value == {
"filter_clauses": [{"col": "country_code", "op": "IN", "val": ["DE"]}],
"extra_form_data": {"filters": [{"col": "country_code", "op": "IN", "val": ["DE"]}]},
"value_origin": "extra_form_data.filters",
}
assert len(saved_variables) == 1
assert saved_variables[0].variable_name == "country"
assert len(saved_mappings) == 1
@@ -1095,6 +1114,137 @@ def test_us3_preview_endpoint_returns_failed_preview_without_false_dashboard_not
# [/DEF:test_us3_preview_endpoint_returns_failed_preview_without_false_dashboard_not_found_contract_drift:Function]
# [DEF:test_execution_snapshot_includes_recovered_imported_filters_without_template_mapping:Function]
# @PURPOSE: Recovered imported filters with values should flow into preview filter context even when no template variable mapping exists.
def test_execution_snapshot_includes_recovered_imported_filters_without_template_mapping(
dataset_review_api_dependencies,
):
repository = MagicMock()
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
orchestrator = DatasetReviewOrchestrator(
repository=repository,
config_manager=dataset_review_api_dependencies["config_manager"],
task_manager=None,
)
session = _make_preview_ready_session()
recovered_filter = MagicMock()
recovered_filter.filter_id = "filter-country"
recovered_filter.filter_name = "country"
recovered_filter.display_name = "Country"
recovered_filter.raw_value = ["DE", "FR"]
recovered_filter.normalized_value = ["DE", "FR"]
recovered_filter.requires_confirmation = False
recovered_filter.recovery_status = "recovered"
session.imported_filters = [recovered_filter]
session.template_variables = []
session.execution_mappings = []
session.semantic_fields = []
snapshot = orchestrator._build_execution_snapshot(session)
assert snapshot["template_params"] == {}
assert snapshot["preview_blockers"] == []
recovered_filter.normalized_value = {
"filter_clauses": [{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}],
"extra_form_data": {"filters": [{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}]},
"value_origin": "extra_form_data.filters",
}
snapshot = orchestrator._build_execution_snapshot(session)
assert snapshot["template_params"] == {}
assert snapshot["preview_blockers"] == []
assert snapshot["effective_filters"] == [
{
"filter_id": "filter-country",
"filter_name": "country",
"display_name": "Country",
"effective_value": ["DE", "FR"],
"raw_input_value": ["DE", "FR"],
"normalized_filter_payload": {
"filter_clauses": [{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}],
"extra_form_data": {"filters": [{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}]},
"value_origin": "extra_form_data.filters",
},
}
]
# [/DEF:test_execution_snapshot_includes_recovered_imported_filters_without_template_mapping:Function]
# [DEF:test_execution_snapshot_preserves_mapped_template_variables_and_filter_context:Function]
# @PURPOSE: Mapped template variables should still populate template params while contributing their effective filter context.
def test_execution_snapshot_preserves_mapped_template_variables_and_filter_context(
dataset_review_api_dependencies,
):
repository = MagicMock()
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
orchestrator = DatasetReviewOrchestrator(
repository=repository,
config_manager=dataset_review_api_dependencies["config_manager"],
task_manager=None,
)
session = _make_preview_ready_session()
snapshot = orchestrator._build_execution_snapshot(session)
assert snapshot["template_params"] == {"country": "DE"}
assert snapshot["preview_blockers"] == []
assert snapshot["effective_filters"] == [
{
"mapping_id": "map-1",
"filter_id": "filter-1",
"filter_name": "country",
"variable_id": "var-1",
"variable_name": "country",
"effective_value": "DE",
"raw_input_value": "DE",
}
]
assert snapshot["open_warning_refs"] == ["map-1"]
# [/DEF:test_execution_snapshot_preserves_mapped_template_variables_and_filter_context:Function]
# [DEF:test_execution_snapshot_skips_partial_imported_filters_without_values:Function]
# @PURPOSE: Partial imported filters without raw or normalized values must not emit bogus active preview filters.
def test_execution_snapshot_skips_partial_imported_filters_without_values(
dataset_review_api_dependencies,
):
repository = MagicMock()
repository.db = MagicMock()
repository.event_logger = MagicMock(spec=SessionEventLogger)
orchestrator = DatasetReviewOrchestrator(
repository=repository,
config_manager=dataset_review_api_dependencies["config_manager"],
task_manager=None,
)
session = _make_preview_ready_session()
unresolved_filter = MagicMock()
unresolved_filter.filter_id = "filter-region"
unresolved_filter.filter_name = "region"
unresolved_filter.display_name = "Region"
unresolved_filter.raw_value = None
unresolved_filter.normalized_value = None
unresolved_filter.requires_confirmation = True
unresolved_filter.recovery_status = "partial"
session.imported_filters = [unresolved_filter]
session.template_variables = []
session.execution_mappings = []
session.semantic_fields = []
snapshot = orchestrator._build_execution_snapshot(session)
assert snapshot["template_params"] == {}
assert snapshot["effective_filters"] == []
assert snapshot["preview_blockers"] == []
# [/DEF:test_execution_snapshot_skips_partial_imported_filters_without_values:Function]
# [DEF:test_us3_launch_endpoint_requires_launch_permission:Function]
# @PURPOSE: Launch endpoint should enforce the contract RBAC permission instead of the generic session-manage permission.
def test_us3_launch_endpoint_requires_launch_permission(dataset_review_api_dependencies):

View File

@@ -0,0 +1,594 @@
# [DEF:NativeFilterExtractionTests:Module]
# @COMPLEXITY: 3
# @SEMANTICS: tests, superset, native, filters, permalink, filter_state
# @PURPOSE: Verify native filter extraction from permalinks and native_filters_key URLs.
# @LAYER: Domain
# @RELATION: [BINDS_TO] ->[SupersetClient]
# @RELATION: [BINDS_TO] ->[AsyncSupersetClient]
# @RELATION: [BINDS_TO] ->[FilterState, ParsedNativeFilters, ExtraFormDataMerge]
import json
from unittest.mock import MagicMock
import pytest
from src.core.superset_client import SupersetClient
from src.core.async_superset_client import AsyncSupersetClient
from src.core.config_models import Environment
from src.core.utils.superset_context_extractor import (
SupersetContextExtractor,
SupersetParsedContext,
)
from src.models.filter_state import (
FilterState,
NativeFilterDataMask,
ParsedNativeFilters,
ExtraFormDataMerge,
)
# [DEF:_make_environment:Function]
def _make_environment() -> Environment:
return Environment(
id="env-1",
name="DEV",
url="http://superset.local",
username="demo",
password="secret",
)
# [/DEF:_make_environment:Function]
# [DEF:test_extract_native_filters_from_permalink:Function]
# @PURPOSE: Extract native filters from a permalink key.
def test_extract_native_filters_from_permalink():
client = SupersetClient(_make_environment())
client.get_dashboard_permalink_state = MagicMock(
return_value={
"result": {
"state": {
"dataMask": {
"filter_country": {
"extraFormData": {
"filters": [
{"col": "country", "op": "IN", "val": ["DE", "FR"]}
]
},
"filterState": {"value": ["DE", "FR"]},
"ownState": {},
},
"filter_date": {
"extraFormData": {
"time_range": "2020-01-01 : 2024-12-31"
},
"filterState": {"value": "2020-01-01 : 2024-12-31"},
"ownState": {},
},
},
"activeTabs": ["tab1", "tab2"],
"anchor": "SECTION1",
"chartStates": {"chart_1": {}},
}
}
}
)
result = client.extract_native_filters_from_permalink("test-permalink-key")
assert result["permalink_key"] == "test-permalink-key"
assert "dataMask" in result
assert "filter_country" in result["dataMask"]
assert "filter_date" in result["dataMask"]
assert result["dataMask"]["filter_country"]["extraFormData"]["filters"][0]["val"] == ["DE", "FR"]
assert result["activeTabs"] == ["tab1", "tab2"]
assert result["anchor"] == "SECTION1"
# [/DEF:test_extract_native_filters_from_permalink]
# [DEF:test_extract_native_filters_from_permalink_direct_response:Function]
# @PURPOSE: Handle permalink response without result wrapper.
def test_extract_native_filters_from_permalink_direct_response():
client = SupersetClient(_make_environment())
client.get_dashboard_permalink_state = MagicMock(
return_value={
"state": {
"dataMask": {
"filter_1": {
"extraFormData": {"filters": []},
"filterState": {},
"ownState": {},
}
}
}
}
)
result = client.extract_native_filters_from_permalink("direct-key")
assert result["permalink_key"] == "direct-key"
assert "filter_1" in result["dataMask"]
# [/DEF:test_extract_native_filters_from_permalink_direct_response]
# [DEF:test_extract_native_filters_from_key:Function]
# @PURPOSE: Extract native filters from a native_filters_key.
def test_extract_native_filters_from_key():
client = SupersetClient(_make_environment())
client.get_native_filter_state = MagicMock(
return_value={
"result": {
"value": json.dumps({
"filter_region": {
"id": "filter_region",
"extraFormData": {
"filters": [{"col": "region", "op": "IN", "val": ["EMEA"]}]
},
"filterState": {"value": ["EMEA"]},
}
})
}
}
)
result = client.extract_native_filters_from_key(123, "filter-state-key")
assert result["dashboard_id"] == 123
assert result["filter_state_key"] == "filter-state-key"
assert "dataMask" in result
assert "filter_region" in result["dataMask"]
assert result["dataMask"]["filter_region"]["extraFormData"]["filters"][0]["val"] == ["EMEA"]
# [/DEF:test_extract_native_filters_from_key]
# [DEF:test_extract_native_filters_from_key_single_filter:Function]
# @PURPOSE: Handle single filter format in native filter state.
def test_extract_native_filters_from_key_single_filter():
client = SupersetClient(_make_environment())
client.get_native_filter_state = MagicMock(
return_value={
"result": {
"value": json.dumps({
"id": "single_filter",
"extraFormData": {"filters": [{"col": "status", "op": "==", "val": "active"}]},
"filterState": {"value": "active"},
})
}
}
)
result = client.extract_native_filters_from_key(456, "single-key")
assert "dataMask" in result
assert "single_filter" in result["dataMask"]
assert result["dataMask"]["single_filter"]["extraFormData"]["filters"][0]["col"] == "status"
# [/DEF:test_extract_native_filters_from_key_single_filter]
# [DEF:test_extract_native_filters_from_key_dict_value:Function]
# @PURPOSE: Handle filter state value as dict instead of JSON string.
def test_extract_native_filters_from_key_dict_value():
client = SupersetClient(_make_environment())
client.get_native_filter_state = MagicMock(
return_value={
"result": {
"value": {
"filter_id": {
"extraFormData": {"filters": []},
"filterState": {},
}
}
}
}
)
result = client.extract_native_filters_from_key(789, "dict-key")
assert "dataMask" in result
assert "filter_id" in result["dataMask"]
# [/DEF:test_extract_native_filters_from_key_dict_value]
# [DEF:test_parse_dashboard_url_for_filters_permalink:Function]
# @PURPOSE: Parse permalink URL format.
def test_parse_dashboard_url_for_filters_permalink():
client = SupersetClient(_make_environment())
client.extract_native_filters_from_permalink = MagicMock(
return_value={"dataMask": {"f1": {}}, "permalink_key": "abc123"}
)
result = client.parse_dashboard_url_for_filters(
"http://superset.local/superset/dashboard/p/abc123/"
)
assert result["filter_type"] == "permalink"
assert result["filters"]["dataMask"]["f1"] == {}
# [/DEF:test_parse_dashboard_url_for_filters_permalink]
# [DEF:test_parse_dashboard_url_for_filters_native_key:Function]
# @PURPOSE: Parse native_filters_key URL format with numeric dashboard ID.
def test_parse_dashboard_url_for_filters_native_key():
client = SupersetClient(_make_environment())
client.extract_native_filters_from_key = MagicMock(
return_value={"dataMask": {"f2": {}}, "dashboard_id": 42, "filter_state_key": "xyz"}
)
result = client.parse_dashboard_url_for_filters(
"http://superset.local/dashboard/42/?native_filters_key=xyz"
)
assert result["filter_type"] == "native_filters_key"
assert result["dashboard_id"] == 42
assert result["filters"]["dataMask"]["f2"] == {}
# [/DEF:test_parse_dashboard_url_for_filters_native_key]
# [DEF:test_parse_dashboard_url_for_filters_native_key_slug:Function]
# @PURPOSE: Parse native_filters_key URL format when dashboard reference is a slug, not a numeric ID.
def test_parse_dashboard_url_for_filters_native_key_slug():
client = SupersetClient(_make_environment())
# Simulate slug resolution: get_dashboard returns the dashboard with numeric ID
client.get_dashboard = MagicMock(
return_value={
"result": {"id": 99, "dashboard_title": "COVID Dashboard", "slug": "covid"}
}
)
client.extract_native_filters_from_key = MagicMock(
return_value={"dataMask": {"f_slug": {}}, "dashboard_id": 99, "filter_state_key": "abc123"}
)
result = client.parse_dashboard_url_for_filters(
"http://superset.local/superset/dashboard/covid/?native_filters_key=abc123"
)
assert result["filter_type"] == "native_filters_key"
assert result["dashboard_id"] == 99
assert result["filters"]["dataMask"]["f_slug"] == {}
client.get_dashboard.assert_called_once_with("covid")
client.extract_native_filters_from_key.assert_called_once_with(99, "abc123")
# [/DEF:test_parse_dashboard_url_for_filters_native_key_slug]
# [DEF:test_parse_dashboard_url_for_filters_native_key_slug_resolution_fails:Function]
# @PURPOSE: Gracefully handle slug resolution failure for native_filters_key URL.
def test_parse_dashboard_url_for_filters_native_key_slug_resolution_fails():
client = SupersetClient(_make_environment())
client.get_dashboard = MagicMock(side_effect=Exception("Not found"))
result = client.parse_dashboard_url_for_filters(
"http://superset.local/dashboard/unknownslug/?native_filters_key=key1"
)
assert result["filter_type"] is None
assert result["dashboard_id"] is None
# [/DEF:test_parse_dashboard_url_for_filters_native_key_slug_resolution_fails]
# [DEF:test_parse_dashboard_url_for_filters_native_filters_direct:Function]
# @PURPOSE: Parse native_filters direct query param.
def test_parse_dashboard_url_for_filters_native_filters_direct():
client = SupersetClient(_make_environment())
result = client.parse_dashboard_url_for_filters(
"http://superset.local/dashboard/1/?native_filters="
+ json.dumps({"filter_1": {"col": "x", "op": "==", "val": "y"}})
)
assert result["filter_type"] == "native_filters"
assert "dataMask" in result["filters"]
# [/DEF:test_parse_dashboard_url_for_filters_native_filters_direct]
# [DEF:test_parse_dashboard_url_for_filters_no_filters:Function]
# @PURPOSE: Return empty result when no filters present.
def test_parse_dashboard_url_for_filters_no_filters():
client = SupersetClient(_make_environment())
result = client.parse_dashboard_url_for_filters(
"http://superset.local/dashboard/1/"
)
assert result["filter_type"] is None
assert result["filters"] == {}
# [/DEF:test_parse_dashboard_url_for_filters_no_filters]
# [DEF:test_extra_form_data_merge:Function]
# @PURPOSE: Test ExtraFormDataMerge correctly merges dictionaries.
def test_extra_form_data_merge():
merger = ExtraFormDataMerge()
original = {
"filters": [{"col": "a", "op": "IN", "val": [1, 2]}],
"time_range": "2020-01-01 : 2021-01-01",
"extras": {"where": "x > 0"},
}
new = {
"filters": [{"col": "b", "op": "==", "val": "test"}],
"time_range": "2022-01-01 : 2023-01-01",
"columns": ["col1", "col2"],
}
result = merger.merge(original, new)
# Filters should be appended
assert len(result["filters"]) == 2
assert result["filters"][0]["col"] == "a"
assert result["filters"][1]["col"] == "b"
# Time range should be overridden
assert result["time_range"] == "2022-01-01 : 2023-01-01"
# Extras should remain
assert result["extras"] == {"where": "x > 0"}
# New columns should be added
assert result["columns"] == ["col1", "col2"]
# [/DEF:test_extra_form_data_merge]
# [DEF:test_filter_state_model:Function]
# @PURPOSE: Test FilterState Pydantic model.
def test_filter_state_model():
state = FilterState(
extraFormData={"filters": [{"col": "x", "op": "==", "val": "y"}]},
filterState={"value": "y"},
ownState={"selectedValues": ["y"]},
)
assert state.extraFormData["filters"][0]["col"] == "x"
assert state.filterState["value"] == "y"
assert state.ownState["selectedValues"] == ["y"]
# [/DEF:test_filter_state_model]
# [DEF:test_parsed_native_filters_model:Function]
# @PURPOSE: Test ParsedNativeFilters Pydantic model.
def test_parsed_native_filters_model():
filters = ParsedNativeFilters(
dataMask={"f1": {"extraFormData": {}, "filterState": {}}},
filter_type="permalink",
dashboard_id="42",
permalink_key="abc",
)
assert filters.has_filters() is True
assert filters.get_filter_count() == 1
assert filters.filter_type == "permalink"
# [/DEF:test_parsed_native_filters_model]
# [DEF:test_parsed_native_filters_empty:Function]
# @PURPOSE: Test ParsedNativeFilters with no filters.
def test_parsed_native_filters_empty():
filters = ParsedNativeFilters()
assert filters.has_filters() is False
assert filters.get_filter_count() == 0
# [/DEF:test_parsed_native_filters_empty]
# [DEF:test_native_filter_data_mask_model:Function]
# @PURPOSE: Test NativeFilterDataMask model.
def test_native_filter_data_mask_model():
data_mask = NativeFilterDataMask(
filters={
"filter_1": FilterState(extraFormData={"filters": []}, filterState={}),
"filter_2": FilterState(extraFormData={"time_range": "..."}, filterState={}),
}
)
assert data_mask.get_filter_ids() == ["filter_1", "filter_2"]
assert data_mask.get_extra_form_data("filter_1") == {"filters": []}
assert data_mask.get_extra_form_data("nonexistent") == {}
# [/DEF:test_native_filter_data_mask_model]
# [DEF:test_recover_imported_filters_reconciles_raw_native_filter_ids_to_metadata_names:Function]
# @PURPOSE: Reconcile raw native filter ids from state to canonical metadata filter names.
def test_recover_imported_filters_reconciles_raw_native_filter_ids_to_metadata_names():
client = MagicMock()
client.get_dashboard.return_value = {
"result": {
"json_metadata": json.dumps(
{
"native_filter_configuration": [
{
"id": "NATIVE_FILTER-EWNH3M70z",
"name": "Country",
"label": "Country",
}
]
}
)
}
}
extractor = SupersetContextExtractor(_make_environment(), client=client)
parsed_context = SupersetParsedContext(
source_url="http://superset.local/dashboard/42/?native_filters_key=abc",
dataset_ref="dataset:42",
dashboard_id=42,
imported_filters=[
{
"filter_name": "NATIVE_FILTER-EWNH3M70z",
"display_name": "NATIVE_FILTER-EWNH3M70z",
"raw_value": ["DE", "FR"],
"normalized_value": {
"filter_clauses": [{"col": "country", "op": "IN", "val": ["DE", "FR"]}],
"extra_form_data": {"filters": [{"col": "country", "op": "IN", "val": ["DE", "FR"]}]},
"value_origin": "filter_state",
},
"source": "superset_native_filters_key",
"recovery_status": "recovered",
"requires_confirmation": False,
"notes": "Recovered from Superset native_filters_key state",
}
],
)
result = extractor.recover_imported_filters(parsed_context)
assert len(result) == 1
assert result[0]["filter_name"] == "Country"
assert result[0]["display_name"] == "Country"
assert result[0]["raw_value"] == ["DE", "FR"]
assert result[0]["source"] == "superset_native_filters_key"
assert result[0]["normalized_value"] == {
"filter_clauses": [{"col": "country", "op": "IN", "val": ["DE", "FR"]}],
"extra_form_data": {"filters": [{"col": "country", "op": "IN", "val": ["DE", "FR"]}]},
"value_origin": "filter_state",
}
# [/DEF:test_recover_imported_filters_reconciles_raw_native_filter_ids_to_metadata_names:Function]
# [DEF:test_recover_imported_filters_collapses_state_and_metadata_duplicates_into_one_canonical_filter:Function]
# @PURPOSE: Collapse raw-id state entries and metadata entries into one canonical filter.
def test_recover_imported_filters_collapses_state_and_metadata_duplicates_into_one_canonical_filter():
client = MagicMock()
client.get_dashboard.return_value = {
"result": {
"json_metadata": json.dumps(
{
"native_filter_configuration": [
{
"id": "NATIVE_FILTER-EWNH3M70z",
"name": "Country",
"label": "Country",
},
{
"id": "NATIVE_FILTER-vv123",
"name": "Region",
"label": "Region",
},
]
}
)
}
}
extractor = SupersetContextExtractor(_make_environment(), client=client)
parsed_context = SupersetParsedContext(
source_url="http://superset.local/dashboard/42/?native_filters_key=abc",
dataset_ref="dataset:42",
dashboard_id=42,
imported_filters=[
{
"filter_name": "NATIVE_FILTER-EWNH3M70z",
"display_name": "Country",
"raw_value": ["DE", "FR"],
"source": "superset_native_filters_key",
"recovery_status": "recovered",
"requires_confirmation": False,
"notes": "Recovered from Superset native_filters_key state",
}
],
)
result = extractor.recover_imported_filters(parsed_context)
assert len(result) == 2
country_filter = next(item for item in result if item["filter_name"] == "Country")
region_filter = next(item for item in result if item["filter_name"] == "Region")
assert country_filter["raw_value"] == ["DE", "FR"]
assert country_filter["recovery_status"] == "recovered"
assert region_filter["raw_value"] is None
assert region_filter["recovery_status"] == "partial"
# [/DEF:test_recover_imported_filters_collapses_state_and_metadata_duplicates_into_one_canonical_filter:Function]
# [DEF:test_recover_imported_filters_preserves_unmatched_raw_native_filter_ids:Function]
# @PURPOSE: Preserve unmatched raw native filter ids as fallback diagnostics when metadata mapping is unavailable.
def test_recover_imported_filters_preserves_unmatched_raw_native_filter_ids():
client = MagicMock()
client.get_dashboard.return_value = {
"result": {
"json_metadata": json.dumps(
{
"native_filter_configuration": [
{
"id": "NATIVE_FILTER-EWNH3M70z",
"name": "Country",
"label": "Country",
}
]
}
)
}
}
extractor = SupersetContextExtractor(_make_environment(), client=client)
parsed_context = SupersetParsedContext(
source_url="http://superset.local/dashboard/42/?native_filters_key=abc",
dataset_ref="dataset:42",
dashboard_id=42,
imported_filters=[
{
"filter_name": "UNKNOWN_NATIVE_FILTER",
"display_name": "UNKNOWN_NATIVE_FILTER",
"raw_value": ["orphan"],
"source": "superset_native_filters_key",
"recovery_status": "recovered",
"requires_confirmation": False,
"notes": "Recovered from Superset native_filters_key state",
}
],
)
result = extractor.recover_imported_filters(parsed_context)
assert len(result) == 2
assert any(item["filter_name"] == "Country" and item["recovery_status"] == "partial" for item in result)
assert any(
item["filter_name"] == "UNKNOWN_NATIVE_FILTER"
and item["raw_value"] == ["orphan"]
and item["source"] == "superset_native_filters_key"
for item in result
)
# [/DEF:test_recover_imported_filters_preserves_unmatched_raw_native_filter_ids:Function]
# [DEF:test_extract_imported_filters_preserves_clause_level_native_filter_payload_for_preview:Function]
# @PURPOSE: Recovered native filter state should preserve exact Superset clause payload and time extras for preview compilation.
def test_extract_imported_filters_preserves_clause_level_native_filter_payload_for_preview():
extractor = SupersetContextExtractor(_make_environment(), client=MagicMock())
imported_filters = extractor._extract_imported_filters(
{
"native_filter_state": {
"NATIVE_FILTER-1": {
"id": "NATIVE_FILTER-1",
"filterState": {"label": "Country", "value": ["DE"]},
"extraFormData": {
"filters": [{"col": "country_code", "op": "IN", "val": ["DE"]}],
"time_range": "Last month",
},
}
}
}
)
assert imported_filters == [
{
"filter_name": "NATIVE_FILTER-1",
"raw_value": ["DE"],
"display_name": "Country",
"normalized_value": {
"filter_clauses": [{"col": "country_code", "op": "IN", "val": ["DE"]}],
"extra_form_data": {
"filters": [{"col": "country_code", "op": "IN", "val": ["DE"]}],
"time_range": "Last month",
},
"value_origin": "filter_state",
},
"source": "superset_native_filters_key",
"recovery_status": "recovered",
"requires_confirmation": False,
"notes": "Recovered from Superset native_filters_key state",
}
]
# [/DEF:test_extract_imported_filters_preserves_clause_level_native_filter_payload_for_preview:Function]
# [/DEF:NativeFilterExtractionTests:Module]

View File

@@ -52,9 +52,9 @@ def _make_httpx_status_error(status_code: int, url: str) -> httpx.HTTPStatusErro
# [/DEF:_make_httpx_status_error:Function]
# [DEF:test_compile_dataset_preview_uses_chart_data_and_result_query_sql:Function]
# @PURPOSE: Superset preview compilation should call the real chart-data endpoint and extract SQL from result[].query.
def test_compile_dataset_preview_uses_chart_data_and_result_query_sql():
# [DEF:test_compile_dataset_preview_prefers_legacy_explore_form_data_strategy:Function]
# @PURPOSE: Superset preview compilation should prefer the legacy form_data transport inferred from browser traffic before falling back to chart-data.
def test_compile_dataset_preview_prefers_legacy_explore_form_data_strategy():
client = SupersetClient(_make_environment())
client.get_dataset = MagicMock(
return_value={
@@ -69,11 +69,9 @@ def test_compile_dataset_preview_uses_chart_data_and_result_query_sql():
)
client.network = MagicMock()
client.network.request.return_value = {
"result": [
{
"query": "SELECT count(*) FROM public.sales WHERE country IN ('DE')",
}
]
"result": {
"query": "SELECT count(*) FROM public.sales WHERE country IN ('DE')",
}
}
result = client.compile_dataset_preview(
@@ -86,21 +84,295 @@ def test_compile_dataset_preview_uses_chart_data_and_result_query_sql():
client.network.request.assert_called_once()
request_call = client.network.request.call_args
assert request_call.kwargs["method"] == "POST"
assert request_call.kwargs["endpoint"] == "/chart/data"
assert request_call.kwargs["headers"] == {"Content-Type": "application/json"}
assert request_call.kwargs["endpoint"] == "/explore_json/form_data"
assert request_call.kwargs["params"] is not None
assert request_call.kwargs["params"].keys() == {"form_data"}
query_context = json.loads(request_call.kwargs["data"])
assert query_context["datasource"] == {"id": 42, "type": "table"}
assert query_context["queries"][0]["filters"] == [
legacy_form_data = json.loads(request_call.kwargs["params"]["form_data"])
assert "datasource" not in legacy_form_data
assert legacy_form_data["datasource_id"] == 42
assert legacy_form_data["datasource_type"] == "table"
assert legacy_form_data["extra_filters"] == [
{"col": "country", "op": "IN", "val": ["DE"]}
]
assert query_context["queries"][0]["url_params"] == {"country": "DE"}
assert legacy_form_data["extra_form_data"] == {
"filters": [{"col": "country", "op": "IN", "val": ["DE"]}]
}
assert legacy_form_data["url_params"] == {"country": "DE"}
assert legacy_form_data["result_type"] == "query"
assert legacy_form_data["result_format"] == "json"
assert legacy_form_data["force"] is True
assert result["endpoint"] == "/explore_json/form_data"
assert result["endpoint_kind"] == "legacy_explore_form_data"
assert result["dataset_id"] == 42
assert result["response_diagnostics"] == [
{"source": "query", "has_query": False},
{"source": "sql", "has_query": False},
{"source": "compiled_sql", "has_query": False},
{"source": "result.query", "has_query": True},
]
assert result["legacy_form_data"]["extra_filters"] == [
{"col": "country", "op": "IN", "val": ["DE"]}
]
assert result["query_context"]["datasource"] == {"id": 42, "type": "table"}
assert result["query_context"]["queries"][0]["filters"] == [
{"col": "country", "op": "IN", "val": ["DE"]}
assert result["strategy_attempts"] == [
{
"endpoint": "/explore_json/form_data",
"endpoint_kind": "legacy_explore_form_data",
"request_transport": "query_param_form_data",
"contains_root_datasource": False,
"contains_form_datasource": False,
"contains_query_object_datasource": False,
"request_param_keys": ["form_data"],
"request_payload_keys": [],
"success": True,
}
]
# [/DEF:test_compile_dataset_preview_uses_chart_data_and_result_query_sql:Function]
# [/DEF:test_compile_dataset_preview_prefers_legacy_explore_form_data_strategy:Function]
# [DEF:test_compile_dataset_preview_falls_back_to_chart_data_after_legacy_failures:Function]
# @PURPOSE: Superset preview compilation should fall back to chart-data when legacy form_data strategies are rejected.
def test_compile_dataset_preview_falls_back_to_chart_data_after_legacy_failures():
client = SupersetClient(_make_environment())
client.get_dataset = MagicMock(
return_value={
"result": {
"id": 42,
"schema": "public",
"datasource": {"id": 42, "type": "table"},
}
}
)
client.network = MagicMock()
client.network.request.side_effect = [
SupersetAPIError("legacy explore failed"),
SupersetAPIError("legacy data failed"),
{
"result": [
{
"query": "SELECT count(*) FROM public.sales",
}
]
},
]
result = client.compile_dataset_preview(dataset_id=42)
assert client.network.request.call_count == 3
first_call = client.network.request.call_args_list[0]
second_call = client.network.request.call_args_list[1]
third_call = client.network.request.call_args_list[2]
assert first_call.kwargs["endpoint"] == "/explore_json/form_data"
assert second_call.kwargs["endpoint"] == "/data"
assert third_call.kwargs["endpoint"] == "/chart/data"
assert third_call.kwargs["headers"] == {"Content-Type": "application/json"}
first_legacy_form_data = json.loads(first_call.kwargs["params"]["form_data"])
second_legacy_form_data = json.loads(second_call.kwargs["params"]["form_data"])
assert "datasource" not in first_legacy_form_data
assert "datasource" not in second_legacy_form_data
query_context = json.loads(third_call.kwargs["data"])
assert query_context["datasource"] == {"id": 42, "type": "table"}
assert result["endpoint"] == "/chart/data"
assert result["endpoint_kind"] == "v1_chart_data"
assert len(result["strategy_attempts"]) == 3
assert result["strategy_attempts"][0]["endpoint"] == "/explore_json/form_data"
assert result["strategy_attempts"][0]["endpoint_kind"] == "legacy_explore_form_data"
assert result["strategy_attempts"][0]["request_transport"] == "query_param_form_data"
assert result["strategy_attempts"][0]["contains_root_datasource"] is False
assert result["strategy_attempts"][0]["contains_form_datasource"] is False
assert result["strategy_attempts"][0]["contains_query_object_datasource"] is False
assert result["strategy_attempts"][0]["request_param_keys"] == ["form_data"]
assert result["strategy_attempts"][0]["request_payload_keys"] == []
assert result["strategy_attempts"][0]["success"] is False
assert "legacy explore failed" in result["strategy_attempts"][0]["error"]
assert result["strategy_attempts"][1]["endpoint"] == "/data"
assert result["strategy_attempts"][1]["endpoint_kind"] == "legacy_data_form_data"
assert result["strategy_attempts"][1]["request_transport"] == "query_param_form_data"
assert result["strategy_attempts"][1]["contains_root_datasource"] is False
assert result["strategy_attempts"][1]["contains_form_datasource"] is False
assert result["strategy_attempts"][1]["contains_query_object_datasource"] is False
assert result["strategy_attempts"][1]["request_param_keys"] == ["form_data"]
assert result["strategy_attempts"][1]["request_payload_keys"] == []
assert result["strategy_attempts"][1]["success"] is False
assert "legacy data failed" in result["strategy_attempts"][1]["error"]
assert result["strategy_attempts"][2] == {
"endpoint": "/chart/data",
"endpoint_kind": "v1_chart_data",
"request_transport": "json_body",
"contains_root_datasource": True,
"contains_form_datasource": False,
"contains_query_object_datasource": False,
"request_param_keys": [],
"request_payload_keys": ["datasource", "force", "form_data", "queries", "result_format", "result_type"],
"success": True,
}
# [/DEF:test_compile_dataset_preview_falls_back_to_chart_data_after_legacy_failures:Function]
# [DEF:test_build_dataset_preview_query_context_places_recovered_filters_in_chart_style_form_data:Function]
# @PURPOSE: Preview query context should mirror chart-style filter transport so recovered native filters reach Superset compilation.
def test_build_dataset_preview_query_context_places_recovered_filters_in_chart_style_form_data():
client = SupersetClient(_make_environment())
query_context = client.build_dataset_preview_query_context(
dataset_id=7,
dataset_record={
"id": 7,
"schema": "public",
"datasource": {"id": 7, "type": "table"},
"default_time_range": "Last year",
},
template_params={"country": "DE"},
effective_filters=[
{
"filter_name": "country",
"display_name": "Country",
"effective_value": ["DE"],
"normalized_filter_payload": {
"filter_clauses": [{"col": "country_code", "op": "IN", "val": ["DE"]}],
"extra_form_data": {"filters": [{"col": "country_code", "op": "IN", "val": ["DE"]}]},
"value_origin": "extra_form_data.filters",
},
},
{"filter_name": "status", "effective_value": "active"},
],
)
assert query_context["force"] is True
assert query_context["result_type"] == "query"
assert query_context["datasource"] == {"id": 7, "type": "table"}
assert "datasource" not in query_context["queries"][0]
assert query_context["queries"][0]["result_type"] == "query"
assert query_context["queries"][0]["filters"] == [
{"col": "country_code", "op": "IN", "val": ["DE"]},
{"col": "status", "op": "==", "val": "active"},
]
assert query_context["form_data"]["datasource"] == "7__table"
assert query_context["form_data"]["datasource_id"] == 7
assert query_context["form_data"]["datasource_type"] == "table"
assert query_context["form_data"]["extra_filters"] == [
{"col": "country_code", "op": "IN", "val": ["DE"]},
{"col": "status", "op": "==", "val": "active"},
]
assert query_context["form_data"]["extra_form_data"] == {
"filters": [
{"col": "country_code", "op": "IN", "val": ["DE"]},
{"col": "status", "op": "==", "val": "active"},
],
"time_range": "Last year",
}
assert query_context["form_data"]["url_params"] == {"country": "DE"}
# [/DEF:test_build_dataset_preview_query_context_places_recovered_filters_in_chart_style_form_data:Function]
# [DEF:test_build_dataset_preview_query_context_merges_dataset_template_params_and_preserves_user_values:Function]
# @PURPOSE: Preview query context should merge dataset template params for parity with real dataset definitions while preserving explicit session overrides.
def test_build_dataset_preview_query_context_merges_dataset_template_params_and_preserves_user_values():
client = SupersetClient(_make_environment())
query_context = client.build_dataset_preview_query_context(
dataset_id=8,
dataset_record={
"id": 8,
"schema": "public",
"datasource": {"id": 8, "type": "table"},
"template_params": json.dumps({"region": "EMEA", "country": "FR"}),
},
template_params={"country": "DE"},
effective_filters=[],
)
assert query_context["queries"][0]["url_params"] == {"region": "EMEA", "country": "DE"}
assert query_context["form_data"]["url_params"] == {"region": "EMEA", "country": "DE"}
# [/DEF:test_build_dataset_preview_query_context_merges_dataset_template_params_and_preserves_user_values:Function]
# [DEF:test_build_dataset_preview_query_context_preserves_time_range_from_native_filter_payload:Function]
# @PURPOSE: Preview query context should preserve time-range native filter extras even when dataset defaults differ.
def test_build_dataset_preview_query_context_preserves_time_range_from_native_filter_payload():
client = SupersetClient(_make_environment())
query_context = client.build_dataset_preview_query_context(
dataset_id=9,
dataset_record={
"id": 9,
"schema": "public",
"datasource": {"id": 9, "type": "table"},
"default_time_range": "Last year",
},
template_params={},
effective_filters=[
{
"filter_name": "Order Date",
"display_name": "Order Date",
"effective_value": "2020-01-01 : 2020-12-31",
"normalized_filter_payload": {
"filter_clauses": [],
"extra_form_data": {"time_range": "2020-01-01 : 2020-12-31"},
"value_origin": "extra_form_data.time_range",
},
}
],
)
assert query_context["queries"][0]["time_range"] == "2020-01-01 : 2020-12-31"
assert query_context["form_data"]["extra_form_data"] == {
"time_range": "2020-01-01 : 2020-12-31"
}
assert query_context["queries"][0]["filters"] == []
# [/DEF:test_build_dataset_preview_query_context_preserves_time_range_from_native_filter_payload:Function]
# [DEF:test_build_dataset_preview_legacy_form_data_preserves_native_filter_clauses:Function]
# @PURPOSE: Legacy preview form_data should preserve recovered native filter clauses in browser-style fields without duplicating datasource for QueryObjectFactory.
def test_build_dataset_preview_legacy_form_data_preserves_native_filter_clauses():
client = SupersetClient(_make_environment())
legacy_form_data = client.build_dataset_preview_legacy_form_data(
dataset_id=11,
dataset_record={
"id": 11,
"schema": "public",
"datasource": {"id": 11, "type": "table"},
"default_time_range": "No filter",
},
template_params={"country": "DE"},
effective_filters=[
{
"filter_name": "Country",
"display_name": "Country",
"effective_value": ["DE", "FR"],
"normalized_filter_payload": {
"filter_clauses": [{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}],
"extra_form_data": {
"filters": [{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}],
"time_range": "Last quarter",
},
"value_origin": "extra_form_data.filters",
},
}
],
)
assert "datasource" not in legacy_form_data
assert legacy_form_data["datasource_id"] == 11
assert legacy_form_data["datasource_type"] == "table"
assert legacy_form_data["extra_filters"] == [
{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}
]
assert legacy_form_data["extra_form_data"] == {
"filters": [{"col": "country_code", "op": "IN", "val": ["DE", "FR"]}],
"time_range": "Last quarter",
}
assert legacy_form_data["time_range"] == "Last quarter"
assert legacy_form_data["url_params"] == {"country": "DE"}
assert legacy_form_data["result_type"] == "query"
# [/DEF:test_build_dataset_preview_legacy_form_data_preserves_native_filter_clauses:Function]
# [DEF:test_sync_network_404_mapping_keeps_non_dashboard_endpoints_generic:Function]

View File

@@ -315,6 +315,205 @@ class AsyncSupersetClient(SupersetClient):
"dataset_count": len(datasets),
}
# [/DEF:get_dashboard_detail_async:Function]
# [DEF:get_dashboard_permalink_state_async:Function]
# @COMPLEXITY: 2
# @PURPOSE: Fetch stored dashboard permalink state asynchronously.
# @POST: Returns dashboard permalink state payload from Superset API.
# @DATA_CONTRACT: Input[permalink_key: str] -> Output[Dict]
async def get_dashboard_permalink_state_async(self, permalink_key: str) -> Dict:
with belief_scope("AsyncSupersetClient.get_dashboard_permalink_state_async", f"key={permalink_key}"):
response = await self.network.request(
method="GET",
endpoint=f"/dashboard/permalink/{permalink_key}"
)
return cast(Dict, response)
# [/DEF:get_dashboard_permalink_state_async:Function]
# [DEF:get_native_filter_state_async:Function]
# @COMPLEXITY: 2
# @PURPOSE: Fetch stored native filter state asynchronously.
# @POST: Returns native filter state payload from Superset API.
# @DATA_CONTRACT: Input[dashboard_id: Union[int, str], filter_state_key: str] -> Output[Dict]
async def get_native_filter_state_async(self, dashboard_id: int, filter_state_key: str) -> Dict:
with belief_scope("AsyncSupersetClient.get_native_filter_state_async", f"dashboard={dashboard_id}, key={filter_state_key}"):
response = await self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id}/filter_state/{filter_state_key}"
)
return cast(Dict, response)
# [/DEF:get_native_filter_state_async:Function]
# [DEF:extract_native_filters_from_permalink_async:Function]
# @COMPLEXITY: 3
# @PURPOSE: Extract native filters dataMask from a permalink key asynchronously.
# @POST: Returns extracted dataMask with filter states.
# @DATA_CONTRACT: Input[permalink_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[self.get_dashboard_permalink_state_async]
async def extract_native_filters_from_permalink_async(self, permalink_key: str) -> Dict:
with belief_scope("AsyncSupersetClient.extract_native_filters_from_permalink_async", f"key={permalink_key}"):
permalink_response = await self.get_dashboard_permalink_state_async(permalink_key)
result = permalink_response.get("result", permalink_response)
state = result.get("state", result)
data_mask = state.get("dataMask", {})
extracted_filters = {}
for filter_id, filter_data in data_mask.items():
if not isinstance(filter_data, dict):
continue
extracted_filters[filter_id] = {
"extraFormData": filter_data.get("extraFormData", {}),
"filterState": filter_data.get("filterState", {}),
"ownState": filter_data.get("ownState", {}),
}
return {
"dataMask": extracted_filters,
"activeTabs": state.get("activeTabs", []),
"anchor": state.get("anchor"),
"chartStates": state.get("chartStates", {}),
"permalink_key": permalink_key,
}
# [/DEF:extract_native_filters_from_permalink_async:Function]
# [DEF:extract_native_filters_from_key_async:Function]
# @COMPLEXITY: 3
# @PURPOSE: Extract native filters from a native_filters_key URL parameter asynchronously.
# @POST: Returns extracted filter state with extraFormData.
# @DATA_CONTRACT: Input[dashboard_id: Union[int, str], filter_state_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[self.get_native_filter_state_async]
async def extract_native_filters_from_key_async(self, dashboard_id: int, filter_state_key: str) -> Dict:
with belief_scope("AsyncSupersetClient.extract_native_filters_from_key_async", f"dashboard={dashboard_id}, key={filter_state_key}"):
filter_response = await self.get_native_filter_state_async(dashboard_id, filter_state_key)
result = filter_response.get("result", filter_response)
value = result.get("value")
if isinstance(value, str):
try:
parsed_value = json.loads(value)
except json.JSONDecodeError as e:
app_logger.warning("[extract_native_filters_from_key_async][Warning] Failed to parse filter state JSON: %s", e)
parsed_value = {}
elif isinstance(value, dict):
parsed_value = value
else:
parsed_value = {}
extracted_filters = {}
if "id" in parsed_value and "extraFormData" in parsed_value:
filter_id = parsed_value.get("id", filter_state_key)
extracted_filters[filter_id] = {
"extraFormData": parsed_value.get("extraFormData", {}),
"filterState": parsed_value.get("filterState", {}),
"ownState": parsed_value.get("ownState", {}),
}
else:
for filter_id, filter_data in parsed_value.items():
if not isinstance(filter_data, dict):
continue
extracted_filters[filter_id] = {
"extraFormData": filter_data.get("extraFormData", {}),
"filterState": filter_data.get("filterState", {}),
"ownState": filter_data.get("ownState", {}),
}
return {
"dataMask": extracted_filters,
"dashboard_id": dashboard_id,
"filter_state_key": filter_state_key,
}
# [/DEF:extract_native_filters_from_key_async:Function]
# [DEF:parse_dashboard_url_for_filters_async:Function]
# @COMPLEXITY: 3
# @PURPOSE: Parse a Superset dashboard URL and extract native filter state asynchronously.
# @POST: Returns extracted filter state or empty dict if no filters found.
# @DATA_CONTRACT: Input[url: str] -> Output[Dict]
# @RELATION: [CALLS] ->[self.extract_native_filters_from_permalink_async]
# @RELATION: [CALLS] ->[self.extract_native_filters_from_key_async]
async def parse_dashboard_url_for_filters_async(self, url: str) -> Dict:
with belief_scope("AsyncSupersetClient.parse_dashboard_url_for_filters_async", f"url={url}"):
import urllib.parse
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
path_parts = parsed_url.path.rstrip("/").split("/")
result = {
"url": url,
"dashboard_id": None,
"filter_type": None,
"filters": {},
}
# Check for permalink URL: /dashboard/p/{key}/
if "p" in path_parts:
try:
p_index = path_parts.index("p")
if p_index + 1 < len(path_parts):
permalink_key = path_parts[p_index + 1]
filter_data = await self.extract_native_filters_from_permalink_async(permalink_key)
result["filter_type"] = "permalink"
result["filters"] = filter_data
return result
except ValueError:
pass
# Check for native_filters_key in query params
native_filters_key = query_params.get("native_filters_key", [None])[0]
if native_filters_key:
dashboard_ref = None
if "dashboard" in path_parts:
try:
dash_index = path_parts.index("dashboard")
if dash_index + 1 < len(path_parts):
potential_id = path_parts[dash_index + 1]
if potential_id not in ("p", "list", "new"):
dashboard_ref = potential_id
except ValueError:
pass
if dashboard_ref:
# Resolve slug to numeric ID — the filter_state API requires a numeric ID
resolved_id = None
try:
resolved_id = int(dashboard_ref)
except (ValueError, TypeError):
try:
dash_resp = await self.get_dashboard_async(dashboard_ref)
dash_data = dash_resp.get("result", dash_resp) if isinstance(dash_resp, dict) else {}
raw_id = dash_data.get("id")
if raw_id is not None:
resolved_id = int(raw_id)
except Exception as e:
app_logger.warning("[parse_dashboard_url_for_filters_async][Warning] Failed to resolve dashboard slug '%s' to ID: %s", dashboard_ref, e)
if resolved_id is not None:
filter_data = await self.extract_native_filters_from_key_async(resolved_id, native_filters_key)
result["filter_type"] = "native_filters_key"
result["dashboard_id"] = resolved_id
result["filters"] = filter_data
return result
else:
app_logger.warning("[parse_dashboard_url_for_filters_async][Warning] Could not resolve dashboard_id from URL for native_filters_key")
return result
# Check for native_filters in query params (direct filter values)
native_filters = query_params.get("native_filters", [None])[0]
if native_filters:
try:
parsed_filters = json.loads(native_filters)
result["filter_type"] = "native_filters"
result["filters"] = {"dataMask": parsed_filters}
return result
except json.JSONDecodeError as e:
app_logger.warning("[parse_dashboard_url_for_filters_async][Warning] Failed to parse native_filters JSON: %s", e)
return result
# [/DEF:parse_dashboard_url_for_filters_async:Function]
# [/DEF:AsyncSupersetClient:Class]
# [/DEF:backend.src.core.async_superset_client:Module]

View File

@@ -369,6 +369,69 @@ def ensure_connection_configs_table(bind_engine):
# [/DEF:ensure_connection_configs_table:Function]
# [DEF:_ensure_filter_source_enum_values:Function]
# @COMPLEXITY: 3
# @PURPOSE: Adds missing FilterSource enum values to the PostgreSQL native filtersource type.
# @PRE: bind_engine points to application database with imported_filters table.
# @POST: New enum values are available without data loss.
def _ensure_filter_source_enum_values(bind_engine):
with belief_scope("_ensure_filter_source_enum_values"):
try:
with bind_engine.connect() as connection:
# Check if the native enum type exists
result = connection.execute(
text(
"SELECT t.typname FROM pg_type t "
"JOIN pg_namespace n ON t.typnamespace = n.oid "
"WHERE t.typname = 'filtersource' AND n.nspname = 'public'"
)
)
if result.fetchone() is None:
logger.reason("filtersource enum type does not exist yet; skipping migration")
return
# Get existing enum values
result = connection.execute(
text(
"SELECT e.enumlabel FROM pg_enum e "
"JOIN pg_type t ON e.enumtypid = t.oid "
"WHERE t.typname = 'filtersource' "
"ORDER BY e.enumsortorder"
)
)
existing_values = {row[0] for row in result.fetchall()}
required_values = ["SUPERSET_PERMALINK", "SUPERSET_NATIVE_FILTERS_KEY"]
missing_values = [v for v in required_values if v not in existing_values]
if not missing_values:
logger.reason(
"filtersource enum already up to date",
extra={"existing": sorted(existing_values)},
)
return
logger.reason(
"Adding missing values to filtersource enum",
extra={"missing": missing_values},
)
for value in missing_values:
connection.execute(
text(f"ALTER TYPE filtersource ADD VALUE IF NOT EXISTS '{value}'")
)
connection.commit()
logger.reason(
"filtersource enum migration completed",
extra={"added": missing_values},
)
except Exception as migration_error:
logger.warning(
"[database][EXPLORE] FilterSource enum additive migration failed: %s",
migration_error,
)
# [/DEF:_ensure_filter_source_enum_values:Function]
# [DEF:init_db:Function]
# @COMPLEXITY: 3
# @PURPOSE: Initializes the database by creating all tables.
@@ -386,6 +449,7 @@ def init_db():
_ensure_git_server_configs_columns(engine)
_ensure_auth_users_columns(auth_engine)
ensure_connection_configs_table(engine)
_ensure_filter_source_enum_values(engine)
# [/DEF:init_db:Function]
# [DEF:get_db:Function]

View File

@@ -35,6 +35,7 @@ class SupersetClient:
# @PRE: `env` должен быть валидным объектом Environment.
# @POST: Атрибуты `env` и `network` созданы и готовы к работе.
# @DATA_CONTRACT: Input[Environment] -> self.network[APIClient]
# @RELATION: [DEPENDS_ON] ->[Environment]
# @RELATION: [DEPENDS_ON] ->[APIClient]
def __init__(self, env: Environment):
with belief_scope("__init__"):
@@ -311,7 +312,7 @@ class SupersetClient:
})
return total_count, result
# [/DEF:backend.src.core.superset_client.SupersetClient.get_dashboards_summary_page:Function]
# [/DEF:SupersetClient.get_dashboards_summary_page:Function]
# [DEF:SupersetClient._extract_owner_labels:Function]
# @COMPLEXITY: 1
@@ -368,9 +369,9 @@ class SupersetClient:
if email:
return email
return None
# [/DEF:backend.src.core.superset_client.SupersetClient._extract_user_display:Function]
# [/DEF:SupersetClient._extract_user_display:Function]
# [DEF:backend.src.core.superset_client.SupersetClient._sanitize_user_text:Function]
# [DEF:SupersetClient._sanitize_user_text:Function]
# @COMPLEXITY: 1
# @PURPOSE: Convert scalar value to non-empty user-facing text.
# @PRE: value can be any scalar type.
@@ -382,7 +383,7 @@ class SupersetClient:
if not normalized:
return None
return normalized
# [/DEF:backend.src.core.superset_client.SupersetClient._sanitize_user_text:Function]
# [/DEF:SupersetClient._sanitize_user_text:Function]
# [DEF:SupersetClient.get_dashboard:Function]
# @COMPLEXITY: 3
@@ -413,6 +414,206 @@ class SupersetClient:
return cast(Dict, response)
# [/DEF:SupersetClient.get_dashboard_permalink_state:Function]
# [DEF:SupersetClient.get_native_filter_state:Function]
# @COMPLEXITY: 2
# @PURPOSE: Fetches stored native filter state by filter state key.
# @PRE: Client is authenticated and filter_state_key exists.
# @POST: Returns native filter state payload from Superset API.
# @DATA_CONTRACT: Input[dashboard_id: Union[int, str], filter_state_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[APIClient.request]
def get_native_filter_state(self, dashboard_id: Union[int, str], filter_state_key: str) -> Dict:
with belief_scope("SupersetClient.get_native_filter_state", f"dashboard={dashboard_id}, key={filter_state_key}"):
response = self.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id}/filter_state/{filter_state_key}"
)
return cast(Dict, response)
# [/DEF:SupersetClient.get_native_filter_state:Function]
# [DEF:SupersetClient.extract_native_filters_from_permalink:Function]
# @COMPLEXITY: 3
# @PURPOSE: Extract native filters dataMask from a permalink key.
# @PRE: Client is authenticated and permalink_key exists.
# @POST: Returns extracted dataMask with filter states.
# @DATA_CONTRACT: Input[permalink_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[SupersetClient.get_dashboard_permalink_state]
def extract_native_filters_from_permalink(self, permalink_key: str) -> Dict:
with belief_scope("SupersetClient.extract_native_filters_from_permalink", f"key={permalink_key}"):
permalink_response = self.get_dashboard_permalink_state(permalink_key)
# Permalink response structure: { "result": { "state": { "dataMask": {...}, ... } } }
# or directly: { "state": { "dataMask": {...}, ... } }
result = permalink_response.get("result", permalink_response)
state = result.get("state", result)
data_mask = state.get("dataMask", {})
extracted_filters = {}
for filter_id, filter_data in data_mask.items():
if not isinstance(filter_data, dict):
continue
extracted_filters[filter_id] = {
"extraFormData": filter_data.get("extraFormData", {}),
"filterState": filter_data.get("filterState", {}),
"ownState": filter_data.get("ownState", {}),
}
return {
"dataMask": extracted_filters,
"activeTabs": state.get("activeTabs", []),
"anchor": state.get("anchor"),
"chartStates": state.get("chartStates", {}),
"permalink_key": permalink_key,
}
# [/DEF:SupersetClient.extract_native_filters_from_permalink:Function]
# [DEF:SupersetClient.extract_native_filters_from_key:Function]
# @COMPLEXITY: 3
# @PURPOSE: Extract native filters from a native_filters_key URL parameter.
# @PRE: Client is authenticated, dashboard_id and filter_state_key exist.
# @POST: Returns extracted filter state with extraFormData.
# @DATA_CONTRACT: Input[dashboard_id: Union[int, str], filter_state_key: str] -> Output[Dict]
# @RELATION: [CALLS] ->[SupersetClient.get_native_filter_state]
def extract_native_filters_from_key(self, dashboard_id: Union[int, str], filter_state_key: str) -> Dict:
with belief_scope("SupersetClient.extract_native_filters_from_key", f"dashboard={dashboard_id}, key={filter_state_key}"):
filter_response = self.get_native_filter_state(dashboard_id, filter_state_key)
# Filter state response structure: { "result": { "value": "{...json...}" } }
# or: { "value": "{...json...}" }
result = filter_response.get("result", filter_response)
value = result.get("value")
if isinstance(value, str):
try:
parsed_value = json.loads(value)
except json.JSONDecodeError as e:
app_logger.warning("[extract_native_filters_from_key][Warning] Failed to parse filter state JSON: %s", e)
parsed_value = {}
elif isinstance(value, dict):
parsed_value = value
else:
parsed_value = {}
# The parsed value contains filter state with structure:
# { "filter_id": { "id": "...", "extraFormData": {...}, "filterState": {...} } }
# or a single filter: { "id": "...", "extraFormData": {...}, "filterState": {...} }
extracted_filters = {}
if "id" in parsed_value and "extraFormData" in parsed_value:
# Single filter format
filter_id = parsed_value.get("id", filter_state_key)
extracted_filters[filter_id] = {
"extraFormData": parsed_value.get("extraFormData", {}),
"filterState": parsed_value.get("filterState", {}),
"ownState": parsed_value.get("ownState", {}),
}
else:
# Multiple filters format
for filter_id, filter_data in parsed_value.items():
if not isinstance(filter_data, dict):
continue
extracted_filters[filter_id] = {
"extraFormData": filter_data.get("extraFormData", {}),
"filterState": filter_data.get("filterState", {}),
"ownState": filter_data.get("ownState", {}),
}
return {
"dataMask": extracted_filters,
"dashboard_id": dashboard_id,
"filter_state_key": filter_state_key,
}
# [/DEF:SupersetClient.extract_native_filters_from_key:Function]
# [DEF:SupersetClient.parse_dashboard_url_for_filters:Function]
# @COMPLEXITY: 3
# @PURPOSE: Parse a Superset dashboard URL and extract native filter state if present.
# @PRE: url must be a valid Superset dashboard URL with optional permalink or native_filters_key.
# @POST: Returns extracted filter state or empty dict if no filters found.
# @DATA_CONTRACT: Input[url: str] -> Output[Dict]
# @RELATION: [CALLS] ->[SupersetClient.extract_native_filters_from_permalink]
# @RELATION: [CALLS] ->[SupersetClient.extract_native_filters_from_key]
def parse_dashboard_url_for_filters(self, url: str) -> Dict:
with belief_scope("SupersetClient.parse_dashboard_url_for_filters", f"url={url}"):
import urllib.parse
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
path_parts = parsed_url.path.rstrip("/").split("/")
result = {
"url": url,
"dashboard_id": None,
"filter_type": None,
"filters": {},
}
# Check for permalink URL: /dashboard/p/{key}/ or /superset/dashboard/p/{key}/
if "p" in path_parts:
try:
p_index = path_parts.index("p")
if p_index + 1 < len(path_parts):
permalink_key = path_parts[p_index + 1]
filter_data = self.extract_native_filters_from_permalink(permalink_key)
result["filter_type"] = "permalink"
result["filters"] = filter_data
return result
except ValueError:
pass
# Check for native_filters_key in query params
native_filters_key = query_params.get("native_filters_key", [None])[0]
if native_filters_key:
# Extract dashboard ID or slug from URL path
dashboard_ref = None
if "dashboard" in path_parts:
try:
dash_index = path_parts.index("dashboard")
if dash_index + 1 < len(path_parts):
potential_id = path_parts[dash_index + 1]
# Skip if it's a reserved word
if potential_id not in ("p", "list", "new"):
dashboard_ref = potential_id
except ValueError:
pass
if dashboard_ref:
# Resolve slug to numeric ID — the filter_state API requires a numeric ID
resolved_id = None
try:
resolved_id = int(dashboard_ref)
except (ValueError, TypeError):
try:
dash_resp = self.get_dashboard(dashboard_ref)
dash_data = dash_resp.get("result", dash_resp) if isinstance(dash_resp, dict) else {}
raw_id = dash_data.get("id")
if raw_id is not None:
resolved_id = int(raw_id)
except Exception as e:
app_logger.warning("[parse_dashboard_url_for_filters][Warning] Failed to resolve dashboard slug '%s' to ID: %s", dashboard_ref, e)
if resolved_id is not None:
filter_data = self.extract_native_filters_from_key(resolved_id, native_filters_key)
result["filter_type"] = "native_filters_key"
result["dashboard_id"] = resolved_id
result["filters"] = filter_data
return result
else:
app_logger.warning("[parse_dashboard_url_for_filters][Warning] Could not resolve dashboard_id from URL for native_filters_key")
# Check for native_filters in query params (direct filter values)
native_filters = query_params.get("native_filters", [None])[0]
if native_filters:
try:
parsed_filters = json.loads(native_filters)
result["filter_type"] = "native_filters"
result["filters"] = {"dataMask": parsed_filters}
return result
except json.JSONDecodeError as e:
app_logger.warning("[parse_dashboard_url_for_filters][Warning] Failed to parse native_filters JSON: %s", e)
return result
# [/DEF:SupersetClient.parse_dashboard_url_for_filters:Function]
# [DEF:SupersetClient.get_chart:Function]
# @COMPLEXITY: 3
# @PURPOSE: Fetches a single chart by ID.
@@ -911,13 +1112,13 @@ class SupersetClient:
return result
# [/DEF:backend.src.core.superset_client.SupersetClient.get_dataset_detail:Function]
# [DEF:backend.src.core.superset_client.SupersetClient.get_dataset:Function]
# [DEF:SupersetClient.get_dataset:Function]
# @COMPLEXITY: 3
# @PURPOSE: Получает информацию о конкретном датасете по его ID.
# @PRE: dataset_id must exist.
# @POST: Returns dataset details.
# @DATA_CONTRACT: Input[dataset_id: int] -> Output[Dict]
# @RELATION: [CALLS] ->[backend.src.core.utils.network.APIClient.request]
# @RELATION: [CALLS] ->[APIClient.request]
def get_dataset(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset", f"id={dataset_id}"):
app_logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id)
@@ -925,19 +1126,20 @@ class SupersetClient:
response = cast(Dict, response)
app_logger.info("[get_dataset][Exit] Got dataset %s.", dataset_id)
return response
# [/DEF:backend.src.core.superset_client.SupersetClient.get_dataset:Function]
# [/DEF:SupersetClient.get_dataset:Function]
# [DEF:SupersetClient.compile_dataset_preview:Function]
# @COMPLEXITY: 4
# @PURPOSE: Compile dataset preview SQL through the real Superset chart-data endpoint and return normalized SQL output.
# @PURPOSE: Compile dataset preview SQL through the strongest supported Superset preview endpoint family and return normalized SQL output.
# @PRE: dataset_id must be valid and template_params/effective_filters must represent the current preview session inputs.
# @POST: Returns normalized compiled SQL plus raw upstream response without guessing unsupported endpoints.
# @POST: Returns normalized compiled SQL plus raw upstream response, preferring legacy form_data transport with explicit fallback to chart-data.
# @DATA_CONTRACT: Input[dataset_id:int, template_params:Dict, effective_filters:List[Dict]] -> Output[Dict[str, Any]]
# @RELATION: [CALLS] ->[SupersetClient.get_dataset]
# @RELATION: [CALLS] ->[SupersetClient.build_dataset_preview_query_context]
# @RELATION: [CALLS] ->[SupersetClient.build_dataset_preview_legacy_form_data]
# @RELATION: [CALLS] ->[APIClient.request]
# @RELATION: [CALLS] ->[SupersetClient._extract_compiled_sql_from_chart_data_response]
# @SIDE_EFFECT: Performs upstream dataset lookup and chart-data network I/O against Superset.
# @RELATION: [CALLS] ->[SupersetClient._extract_compiled_sql_from_preview_response]
# @SIDE_EFFECT: Performs upstream dataset lookup and preview network I/O against Superset.
def compile_dataset_preview(
self,
dataset_id: int,
@@ -945,14 +1147,6 @@ class SupersetClient:
effective_filters: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
with belief_scope("SupersetClient.compile_dataset_preview", f"id={dataset_id}"):
app_logger.reason(
"Compiling dataset preview via Superset chart-data endpoint",
extra={
"dataset_id": dataset_id,
"template_param_count": len(template_params or {}),
"filter_count": len(effective_filters or []),
},
)
dataset_response = self.get_dataset(dataset_id)
dataset_record = dataset_response.get("result", dataset_response) if isinstance(dataset_response, dict) else {}
query_context = self.build_dataset_preview_query_context(
@@ -961,31 +1155,197 @@ class SupersetClient:
template_params=template_params or {},
effective_filters=effective_filters or [],
)
response = self.network.request(
method="POST",
endpoint="/chart/data",
data=json.dumps(query_context),
headers={"Content-Type": "application/json"},
legacy_form_data = self.build_dataset_preview_legacy_form_data(
dataset_id=dataset_id,
dataset_record=dataset_record,
template_params=template_params or {},
effective_filters=effective_filters or [],
)
normalized = self._extract_compiled_sql_from_chart_data_response(response)
normalized["query_context"] = query_context
legacy_form_data_payload = json.dumps(legacy_form_data, sort_keys=True, default=str)
request_payload = json.dumps(query_context)
strategy_attempts: List[Dict[str, Any]] = []
strategy_candidates: List[Dict[str, Any]] = [
{
"endpoint_kind": "legacy_explore_form_data",
"endpoint": "/explore_json/form_data",
"request_transport": "query_param_form_data",
"params": {"form_data": legacy_form_data_payload},
},
{
"endpoint_kind": "legacy_data_form_data",
"endpoint": "/data",
"request_transport": "query_param_form_data",
"params": {"form_data": legacy_form_data_payload},
},
{
"endpoint_kind": "v1_chart_data",
"endpoint": "/chart/data",
"request_transport": "json_body",
"data": request_payload,
"headers": {"Content-Type": "application/json"},
},
]
for candidate in strategy_candidates:
endpoint_kind = candidate["endpoint_kind"]
endpoint_path = candidate["endpoint"]
request_transport = candidate["request_transport"]
request_params = deepcopy(candidate.get("params") or {})
request_body = candidate.get("data")
request_headers = deepcopy(candidate.get("headers") or {})
request_param_keys = sorted(request_params.keys())
request_payload_keys: List[str] = []
if isinstance(request_body, str):
try:
decoded_request_body = json.loads(request_body)
if isinstance(decoded_request_body, dict):
request_payload_keys = sorted(decoded_request_body.keys())
except json.JSONDecodeError:
request_payload_keys = []
elif isinstance(request_body, dict):
request_payload_keys = sorted(request_body.keys())
strategy_diagnostics = {
"endpoint": endpoint_path,
"endpoint_kind": endpoint_kind,
"request_transport": request_transport,
"contains_root_datasource": endpoint_kind == "v1_chart_data" and "datasource" in query_context,
"contains_form_datasource": endpoint_kind.startswith("legacy_") and "datasource" in legacy_form_data,
"contains_query_object_datasource": bool(query_context.get("queries")) and isinstance(query_context["queries"][0], dict) and "datasource" in query_context["queries"][0],
"request_param_keys": request_param_keys,
"request_payload_keys": request_payload_keys,
}
app_logger.reason(
"Attempting Superset dataset preview compilation strategy",
extra={
"dataset_id": dataset_id,
**strategy_diagnostics,
"request_params": request_params,
"request_payload": request_body,
"legacy_form_data": legacy_form_data if endpoint_kind.startswith("legacy_") else None,
"query_context": query_context if endpoint_kind == "v1_chart_data" else None,
"template_param_count": len(template_params or {}),
"filter_count": len(effective_filters or []),
},
)
try:
response = self.network.request(
method="POST",
endpoint=endpoint_path,
params=request_params or None,
data=request_body,
headers=request_headers or None,
)
normalized = self._extract_compiled_sql_from_preview_response(response)
normalized["query_context"] = query_context
normalized["legacy_form_data"] = legacy_form_data
normalized["endpoint"] = endpoint_path
normalized["endpoint_kind"] = endpoint_kind
normalized["dataset_id"] = dataset_id
normalized["strategy_attempts"] = strategy_attempts + [
{
**strategy_diagnostics,
"success": True,
}
]
app_logger.reflect(
"Dataset preview compilation returned normalized SQL payload",
extra={
"dataset_id": dataset_id,
**strategy_diagnostics,
"success": True,
"compiled_sql_length": len(str(normalized.get("compiled_sql") or "")),
"response_diagnostics": normalized.get("response_diagnostics"),
},
)
return normalized
except Exception as exc:
failure_diagnostics = {
**strategy_diagnostics,
"success": False,
"error": str(exc),
}
strategy_attempts.append(failure_diagnostics)
app_logger.explore(
"Superset dataset preview compilation strategy failed",
extra={
"dataset_id": dataset_id,
**failure_diagnostics,
"request_params": request_params,
"request_payload": request_body,
},
)
raise SupersetAPIError(
"Superset preview compilation failed for all known strategies "
f"(attempts={strategy_attempts!r})"
)
# [/DEF:SupersetClient.compile_dataset_preview:Function]
# [DEF:SupersetClient.build_dataset_preview_legacy_form_data:Function]
# @COMPLEXITY: 4
# @PURPOSE: Build browser-style legacy form_data payload for Superset preview endpoints inferred from observed deployment traffic.
# @PRE: dataset_record should come from Superset dataset detail when possible.
# @POST: Returns one serialized-ready form_data structure preserving native filter clauses in legacy transport fields.
# @DATA_CONTRACT: Input[dataset_id:int,dataset_record:Dict,template_params:Dict,effective_filters:List[Dict]] -> Output[Dict[str, Any]]
# @RELATION: [CALLS] ->[SupersetClient.build_dataset_preview_query_context]
# @SIDE_EFFECT: Emits reasoning diagnostics describing the inferred legacy payload shape.
def build_dataset_preview_legacy_form_data(
self,
dataset_id: int,
dataset_record: Dict[str, Any],
template_params: Dict[str, Any],
effective_filters: List[Dict[str, Any]],
) -> Dict[str, Any]:
with belief_scope("SupersetClient.build_dataset_preview_legacy_form_data", f"id={dataset_id}"):
query_context = self.build_dataset_preview_query_context(
dataset_id=dataset_id,
dataset_record=dataset_record,
template_params=template_params,
effective_filters=effective_filters,
)
query_object = deepcopy(query_context.get("queries", [{}])[0] if query_context.get("queries") else {})
legacy_form_data = deepcopy(query_context.get("form_data", {}))
legacy_form_data.pop("datasource", None)
legacy_form_data["metrics"] = deepcopy(query_object.get("metrics", ["count"]))
legacy_form_data["columns"] = deepcopy(query_object.get("columns", []))
legacy_form_data["orderby"] = deepcopy(query_object.get("orderby", []))
legacy_form_data["annotation_layers"] = deepcopy(query_object.get("annotation_layers", []))
legacy_form_data["row_limit"] = query_object.get("row_limit", 1000)
legacy_form_data["series_limit"] = query_object.get("series_limit", 0)
legacy_form_data["url_params"] = deepcopy(query_object.get("url_params", template_params))
legacy_form_data["applied_time_extras"] = deepcopy(query_object.get("applied_time_extras", {}))
legacy_form_data["result_format"] = query_context.get("result_format", "json")
legacy_form_data["result_type"] = query_context.get("result_type", "query")
legacy_form_data["force"] = bool(query_context.get("force", True))
extras = query_object.get("extras")
if isinstance(extras, dict):
legacy_form_data["extras"] = deepcopy(extras)
time_range = query_object.get("time_range")
if time_range:
legacy_form_data["time_range"] = time_range
app_logger.reflect(
"Dataset preview compilation returned normalized SQL payload",
"Built Superset legacy preview form_data payload from browser-observed request shape",
extra={
"dataset_id": dataset_id,
"compiled_sql_length": len(str(normalized.get("compiled_sql") or "")),
"legacy_endpoint_inference": "POST /explore_json/form_data?form_data=... primary, POST /data?form_data=... fallback, based on observed browser traffic",
"contains_form_datasource": "datasource" in legacy_form_data,
"legacy_form_data_keys": sorted(legacy_form_data.keys()),
"legacy_extra_filters": legacy_form_data.get("extra_filters", []),
"legacy_extra_form_data": legacy_form_data.get("extra_form_data", {}),
},
)
return normalized
# [/DEF:backend.src.core.superset_client.SupersetClient.compile_dataset_preview:Function]
return legacy_form_data
# [/DEF:SupersetClient.build_dataset_preview_legacy_form_data:Function]
# [DEF:backend.src.core.superset_client.SupersetClient.build_dataset_preview_query_context:Function]
# [DEF:SupersetClient.build_dataset_preview_query_context:Function]
# @COMPLEXITY: 4
# @PURPOSE: Build a reduced-scope chart-data query context for deterministic dataset preview compilation.
# @PRE: dataset_record should come from Superset dataset detail when possible.
# @POST: Returns an explicit chart-data payload based on current session inputs and dataset metadata.
# @DATA_CONTRACT: Input[dataset_id:int,dataset_record:Dict,template_params:Dict,effective_filters:List[Dict]] -> Output[Dict[str, Any]]
# @RELATION: [CALLS] ->[backend.src.core.superset_client.SupersetClient._normalize_effective_filters_for_query_context]
# @RELATION: [CALLS] ->[SupersetClient._normalize_effective_filters_for_query_context]
# @SIDE_EFFECT: Emits reasoning and reflection logs for deterministic preview payload construction.
def build_dataset_preview_query_context(
self,
@@ -996,7 +1356,9 @@ class SupersetClient:
) -> Dict[str, Any]:
with belief_scope("SupersetClient.build_dataset_preview_query_context", f"id={dataset_id}"):
normalized_template_params = deepcopy(template_params or {})
normalized_filters = self._normalize_effective_filters_for_query_context(effective_filters or [])
normalized_filter_payload = self._normalize_effective_filters_for_query_context(effective_filters or [])
normalized_filters = normalized_filter_payload["filters"]
normalized_extra_form_data = normalized_filter_payload["extra_form_data"]
datasource_payload: Dict[str, Any] = {
"id": dataset_id,
@@ -1011,6 +1373,23 @@ class SupersetClient:
if datasource_type:
datasource_payload["type"] = datasource_type
serialized_dataset_template_params = dataset_record.get("template_params")
if isinstance(serialized_dataset_template_params, str) and serialized_dataset_template_params.strip():
try:
parsed_dataset_template_params = json.loads(serialized_dataset_template_params)
if isinstance(parsed_dataset_template_params, dict):
for key, value in parsed_dataset_template_params.items():
normalized_template_params.setdefault(str(key), value)
except json.JSONDecodeError:
app_logger.explore(
"Dataset template_params could not be parsed while building preview query context",
extra={"dataset_id": dataset_id},
)
extra_form_data: Dict[str, Any] = deepcopy(normalized_extra_form_data)
if normalized_filters:
extra_form_data["filters"] = deepcopy(normalized_filters)
query_object: Dict[str, Any] = {
"filters": normalized_filters,
"extras": {"where": ""},
@@ -1021,33 +1400,56 @@ class SupersetClient:
"row_limit": 1000,
"series_limit": 0,
"url_params": normalized_template_params,
"custom_params": normalized_template_params,
"applied_time_extras": {},
"result_type": "query",
}
schema = dataset_record.get("schema")
if schema:
query_object["schema"] = schema
time_range = dataset_record.get("default_time_range")
time_range = extra_form_data.get("time_range") or dataset_record.get("default_time_range")
if time_range:
query_object["time_range"] = time_range
extra_form_data["time_range"] = time_range
result_format = dataset_record.get("result_format") or "json"
result_type = dataset_record.get("result_type") or "full"
result_type = "query"
return {
form_data: Dict[str, Any] = {
"datasource": f"{datasource_payload['id']}__{datasource_payload['type']}",
"datasource_id": datasource_payload["id"],
"datasource_type": datasource_payload["type"],
"viz_type": "table",
"slice_id": None,
"query_mode": "raw",
"url_params": normalized_template_params,
"extra_filters": deepcopy(normalized_filters),
"adhoc_filters": [],
}
if extra_form_data:
form_data["extra_form_data"] = extra_form_data
payload = {
"datasource": datasource_payload,
"queries": [query_object],
"form_data": {
"datasource": f"{datasource_payload['id']}__{datasource_payload['type']}",
"viz_type": "table",
"slice_id": None,
"query_mode": "raw",
"url_params": normalized_template_params,
},
"form_data": form_data,
"result_format": result_format,
"result_type": result_type,
"force": True,
}
app_logger.reflect(
"Built Superset dataset preview query context",
extra={
"dataset_id": dataset_id,
"datasource": datasource_payload,
"normalized_effective_filters": normalized_filters,
"normalized_filter_diagnostics": normalized_filter_payload["diagnostics"],
"result_type": result_type,
"result_format": result_format,
},
)
return payload
# [/DEF:backend.src.core.superset_client.SupersetClient.build_dataset_preview_query_context:Function]
# [DEF:backend.src.core.superset_client.SupersetClient._normalize_effective_filters_for_query_context:Function]
@@ -1058,56 +1460,170 @@ class SupersetClient:
def _normalize_effective_filters_for_query_context(
self,
effective_filters: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
) -> Dict[str, Any]:
with belief_scope("SupersetClient._normalize_effective_filters_for_query_context"):
normalized_filters: List[Dict[str, Any]] = []
merged_extra_form_data: Dict[str, Any] = {}
diagnostics: List[Dict[str, Any]] = []
for item in effective_filters:
if not isinstance(item, dict):
continue
column = str(item.get("variable_name") or item.get("filter_name") or "").strip()
if not column:
continue
value = item.get("effective_value")
if value is None:
continue
operator = "IN" if isinstance(value, list) else "=="
normalized_filters.append(
display_name = str(
item.get("display_name")
or item.get("filter_name")
or item.get("variable_name")
or "unresolved_filter"
).strip()
value = item.get("effective_value")
normalized_payload = item.get("normalized_filter_payload")
preserved_clauses: List[Dict[str, Any]] = []
preserved_extra_form_data: Dict[str, Any] = {}
used_preserved_clauses = False
if isinstance(normalized_payload, dict):
raw_clauses = normalized_payload.get("filter_clauses")
if isinstance(raw_clauses, list):
preserved_clauses = [
deepcopy(clause)
for clause in raw_clauses
if isinstance(clause, dict)
]
raw_extra_form_data = normalized_payload.get("extra_form_data")
if isinstance(raw_extra_form_data, dict):
preserved_extra_form_data = deepcopy(raw_extra_form_data)
if isinstance(preserved_extra_form_data, dict):
for key, extra_value in preserved_extra_form_data.items():
if key == "filters":
continue
merged_extra_form_data[key] = deepcopy(extra_value)
outgoing_clauses: List[Dict[str, Any]] = []
if preserved_clauses:
for clause in preserved_clauses:
clause_copy = deepcopy(clause)
if "val" not in clause_copy and value is not None:
clause_copy["val"] = deepcopy(value)
outgoing_clauses.append(clause_copy)
used_preserved_clauses = True
elif preserved_extra_form_data:
outgoing_clauses = []
else:
column = str(item.get("variable_name") or item.get("filter_name") or "").strip()
if column and value is not None:
operator = "IN" if isinstance(value, list) else "=="
outgoing_clauses.append(
{
"col": column,
"op": operator,
"val": value,
}
)
normalized_filters.extend(outgoing_clauses)
diagnostics.append(
{
"col": column,
"op": operator,
"val": value,
"filter_name": display_name,
"value_origin": (
normalized_payload.get("value_origin")
if isinstance(normalized_payload, dict)
else None
),
"used_preserved_clauses": used_preserved_clauses,
"outgoing_clauses": deepcopy(outgoing_clauses),
}
)
return normalized_filters
app_logger.reason(
"Normalized effective preview filter for Superset query context",
extra={
"filter_name": display_name,
"used_preserved_clauses": used_preserved_clauses,
"outgoing_clauses": outgoing_clauses,
"value_origin": (
normalized_payload.get("value_origin")
if isinstance(normalized_payload, dict)
else "heuristic_reconstruction"
),
},
)
return {
"filters": normalized_filters,
"extra_form_data": merged_extra_form_data,
"diagnostics": diagnostics,
}
# [/DEF:backend.src.core.superset_client.SupersetClient._normalize_effective_filters_for_query_context:Function]
# [DEF:backend.src.core.superset_client.SupersetClient._extract_compiled_sql_from_chart_data_response:Function]
# [DEF:backend.src.core.superset_client.SupersetClient._extract_compiled_sql_from_preview_response:Function]
# @COMPLEXITY: 3
# @PURPOSE: Normalize compiled SQL from a chart-data response by reading result[].query fields first.
# @PRE: response must be the decoded response body from /api/v1/chart/data.
# @PURPOSE: Normalize compiled SQL from either chart-data or legacy form_data preview responses.
# @PRE: response must be the decoded preview response body from a supported Superset endpoint.
# @POST: Returns compiled SQL and raw response or raises SupersetAPIError when the endpoint does not expose query text.
def _extract_compiled_sql_from_chart_data_response(self, response: Any) -> Dict[str, Any]:
with belief_scope("SupersetClient._extract_compiled_sql_from_chart_data_response"):
def _extract_compiled_sql_from_preview_response(self, response: Any) -> Dict[str, Any]:
with belief_scope("SupersetClient._extract_compiled_sql_from_preview_response"):
if not isinstance(response, dict):
raise SupersetAPIError("Superset chart/data response was not a JSON object")
raise SupersetAPIError("Superset preview response was not a JSON object")
response_diagnostics: List[Dict[str, Any]] = []
result_payload = response.get("result")
if not isinstance(result_payload, list):
raise SupersetAPIError("Superset chart/data response did not include a result list")
if isinstance(result_payload, list):
for index, item in enumerate(result_payload):
if not isinstance(item, dict):
continue
compiled_sql = str(item.get("query") or item.get("sql") or item.get("compiled_sql") or "").strip()
response_diagnostics.append(
{
"index": index,
"status": item.get("status"),
"applied_filters": item.get("applied_filters"),
"rejected_filters": item.get("rejected_filters"),
"has_query": bool(compiled_sql),
"source": "result_list",
}
)
if compiled_sql:
return {
"compiled_sql": compiled_sql,
"raw_response": response,
"response_diagnostics": response_diagnostics,
}
for item in result_payload:
if not isinstance(item, dict):
continue
compiled_sql = str(item.get("query") or "").strip()
top_level_candidates: List[Tuple[str, Any]] = [
("query", response.get("query")),
("sql", response.get("sql")),
("compiled_sql", response.get("compiled_sql")),
]
if isinstance(result_payload, dict):
top_level_candidates.extend(
[
("result.query", result_payload.get("query")),
("result.sql", result_payload.get("sql")),
("result.compiled_sql", result_payload.get("compiled_sql")),
]
)
for source, candidate in top_level_candidates:
compiled_sql = str(candidate or "").strip()
response_diagnostics.append(
{
"source": source,
"has_query": bool(compiled_sql),
}
)
if compiled_sql:
return {
"compiled_sql": compiled_sql,
"raw_response": response,
"response_diagnostics": response_diagnostics,
}
raise SupersetAPIError("Superset chart/data response did not expose compiled SQL in result[].query")
# [/DEF:backend.src.core.superset_client.SupersetClient._extract_compiled_sql_from_chart_data_response:Function]
raise SupersetAPIError(
"Superset preview response did not expose compiled SQL "
f"(diagnostics={response_diagnostics!r})"
)
# [/DEF:backend.src.core.superset_client.SupersetClient._extract_compiled_sql_from_preview_response:Function]
# [DEF:SupersetClient.update_dataset:Function]
# @COMPLEXITY: 3

View File

@@ -115,6 +115,7 @@ class AsyncAPIClient:
# @DATA_CONTRACT: None -> Output[Dict[str, str]]
# @RELATION: [CALLS] ->[SupersetAuthCache.get]
# @RELATION: [CALLS] ->[SupersetAuthCache.set]
# @RELATION: [CALLS] ->[AsyncAPIClient._get_auth_lock]
async def authenticate(self) -> Dict[str, str]:
cached_tokens = SupersetAuthCache.get(self._auth_cache_key)
if cached_tokens and cached_tokens.get("access_token") and cached_tokens.get("csrf_token"):
@@ -227,6 +228,12 @@ class AsyncAPIClient:
# @PURPOSE: Translate upstream HTTP errors into stable domain exceptions.
# @POST: Raises domain-specific exception for caller flow control.
# @DATA_CONTRACT: Input[httpx.HTTPStatusError] -> Exception
# @RELATION: [CALLS] ->[AsyncAPIClient._is_dashboard_endpoint]
# @RELATION: [DEPENDS_ON] ->[DashboardNotFoundError]
# @RELATION: [DEPENDS_ON] ->[SupersetAPIError]
# @RELATION: [DEPENDS_ON] ->[PermissionDeniedError]
# @RELATION: [DEPENDS_ON] ->[AuthenticationError]
# @RELATION: [DEPENDS_ON] ->[NetworkError]
def _handle_http_error(self, exc: httpx.HTTPStatusError, endpoint: str) -> None:
with belief_scope("AsyncAPIClient._handle_http_error"):
status_code = exc.response.status_code
@@ -264,13 +271,14 @@ class AsyncAPIClient:
if normalized_endpoint.startswith("/api/v1/"):
normalized_endpoint = normalized_endpoint[len("/api/v1"):]
return normalized_endpoint.startswith("/dashboard/") or normalized_endpoint == "/dashboard"
# [/DEF:backend.src.core.utils.async_network.AsyncAPIClient._is_dashboard_endpoint:Function]
# [/DEF:AsyncAPIClient._is_dashboard_endpoint:Function]
# [DEF:backend.src.core.utils.async_network.AsyncAPIClient._handle_network_error:Function]
# [DEF:AsyncAPIClient._handle_network_error:Function]
# @COMPLEXITY: 3
# @PURPOSE: Translate generic httpx errors into NetworkError.
# @POST: Raises NetworkError with URL context.
# @DATA_CONTRACT: Input[httpx.HTTPError] -> NetworkError
# @RELATION: [DEPENDS_ON] ->[NetworkError]
def _handle_network_error(self, exc: httpx.HTTPError, url: str) -> None:
with belief_scope("AsyncAPIClient._handle_network_error"):
if isinstance(exc, httpx.TimeoutException):
@@ -280,16 +288,17 @@ class AsyncAPIClient:
else:
message = f"Unknown network error: {exc}"
raise NetworkError(message, url=url) from exc
# [/DEF:backend.src.core.utils.async_network.AsyncAPIClient._handle_network_error:Function]
# [/DEF:AsyncAPIClient._handle_network_error:Function]
# [DEF:backend.src.core.utils.async_network.AsyncAPIClient.aclose:Function]
# [DEF:AsyncAPIClient.aclose:Function]
# @COMPLEXITY: 3
# @PURPOSE: Close underlying httpx client.
# @POST: Client resources are released.
# @SIDE_EFFECT: Closes network connections.
# @RELATION: [DEPENDS_ON] ->[AsyncAPIClient.__init__]
async def aclose(self) -> None:
await self._client.aclose()
# [/DEF:backend.src.core.utils.async_network.AsyncAPIClient.aclose:Function]
# [/DEF:backend.src.core.utils.async_network.AsyncAPIClient:Class]
# [/DEF:AsyncAPIClient.aclose:Function]
# [/DEF:AsyncAPIClient:Class]
# [/DEF:backend.src.core.utils.async_network:Module]
# [/DEF:AsyncNetworkModule:Module]

View File

@@ -111,6 +111,7 @@ class SupersetAuthCache:
return (str(base_url or "").strip(), username, bool(verify_ssl))
@classmethod
# [DEF:SupersetAuthCache.get:Function]
def get(cls, key: Tuple[str, str, bool]) -> Optional[Dict[str, str]]:
now = time.time()
with cls._lock:
@@ -129,8 +130,10 @@ class SupersetAuthCache:
"access_token": str(tokens.get("access_token") or ""),
"csrf_token": str(tokens.get("csrf_token") or ""),
}
# [/DEF:SupersetAuthCache.get:Function]
@classmethod
# [DEF:SupersetAuthCache.set:Function]
def set(cls, key: Tuple[str, str, bool], tokens: Dict[str, str], ttl_seconds: Optional[int] = None) -> None:
normalized_ttl = max(int(ttl_seconds or cls.TTL_SECONDS), 1)
with cls._lock:
@@ -141,6 +144,7 @@ class SupersetAuthCache:
},
"expires_at": time.time() + normalized_ttl,
}
# [/DEF:SupersetAuthCache.set:Function]
@classmethod
def invalidate(cls, key: Tuple[str, str, bool]) -> None:
@@ -156,7 +160,7 @@ class SupersetAuthCache:
class APIClient:
DEFAULT_TIMEOUT = 30
# [DEF:__init__:Function]
# [DEF:APIClient.__init__:Function]
# @PURPOSE: Инициализирует API клиент с конфигурацией, сессией и логгером.
# @PARAM: config (Dict[str, Any]) - Конфигурация.
# @PARAM: verify_ssl (bool) - Проверять ли SSL.
@@ -179,7 +183,7 @@ class APIClient:
)
self._authenticated = False
app_logger.info("[APIClient.__init__][Exit] APIClient initialized.")
# [/DEF:__init__:Function]
# [/DEF:APIClient.__init__:Function]
# [DEF:_init_session:Function]
# @PURPOSE: Создает и настраивает `requests.Session` с retry-логикой.
@@ -261,6 +265,8 @@ class APIClient:
# @POST: `self._tokens` заполнен, `self._authenticated` установлен в `True`.
# @RETURN: Dict[str, str] - Словарь с токенами.
# @THROW: AuthenticationError, NetworkError - при ошибках.
# @RELATION: [CALLS] ->[SupersetAuthCache.get]
# @RELATION: [CALLS] ->[SupersetAuthCache.set]
def authenticate(self) -> Dict[str, str]:
with belief_scope("authenticate"):
app_logger.info("[authenticate][Enter] Authenticating to %s", self.base_url)

View File

@@ -224,13 +224,13 @@ class SupersetCompilationAdapter:
# @PURPOSE: Request preview compilation through explicit client support backed by real Superset endpoints only.
# @RELATION: [CALLS] ->[SupersetClient.compile_dataset_preview]
# @PRE: payload contains a valid dataset identifier and deterministic execution inputs for one preview attempt.
# @POST: returns one normalized upstream compilation response without endpoint guessing.
# @SIDE_EFFECT: issues one Superset chart-data request through the client.
# @POST: returns one normalized upstream compilation response including the chosen strategy metadata.
# @SIDE_EFFECT: issues one or more Superset preview requests through the client fallback chain.
# @DATA_CONTRACT: Input[PreviewCompilationPayload] -> Output[Dict[str,Any]]
def _request_superset_preview(self, payload: PreviewCompilationPayload) -> Dict[str, Any]:
try:
logger.reason(
"Attempting deterministic Superset preview compilation via chart/data",
"Attempting deterministic Superset preview compilation through supported endpoint strategies",
extra={
"dataset_id": payload.dataset_id,
"session_id": payload.session_id,
@@ -245,7 +245,7 @@ class SupersetCompilationAdapter:
)
except Exception as exc:
logger.explore(
"Superset preview compilation via chart/data failed",
"Superset preview compilation failed across supported endpoint strategies",
extra={
"dataset_id": payload.dataset_id,
"session_id": payload.session_id,
@@ -256,7 +256,7 @@ class SupersetCompilationAdapter:
normalized = self._normalize_preview_response(response)
if normalized is None:
raise RuntimeError("Superset chart/data compilation response could not be normalized")
raise RuntimeError("Superset preview compilation response could not be normalized")
return normalized
# [/DEF:SupersetCompilationAdapter._request_superset_preview:Function]

View File

@@ -16,6 +16,7 @@ from __future__ import annotations
# [DEF:SupersetContextExtractor.imports:Block]
import json
import re
from copy import deepcopy
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
from urllib.parse import parse_qs, unquote, urlparse
@@ -128,6 +129,14 @@ class SupersetContextExtractor:
if isinstance(permalink_state, dict):
for key, value in permalink_state.items():
query_state.setdefault(key, value)
# Extract filters from permalink dataMask
data_mask = permalink_state.get("dataMask")
if isinstance(data_mask, dict) and data_mask:
query_state["dataMask"] = data_mask
logger.reason(
"Extracted native filters from permalink dataMask",
extra={"filter_count": len(data_mask)},
)
resolved_dashboard_id = self._extract_dashboard_id_from_state(permalink_state)
resolved_chart_id = self._extract_chart_id_from_state(permalink_state)
if resolved_dashboard_id is not None:
@@ -182,10 +191,44 @@ class SupersetContextExtractor:
"Resolving dashboard-bound dataset from Superset",
extra={"dashboard_ref": resolved_dashboard_ref},
)
# Resolve dashboard detail first — handles both numeric ID and slug,
# ensuring dashboard_id is available for the native_filters_key fetch below.
dashboard_detail = self.client.get_dashboard_detail(resolved_dashboard_ref)
resolved_dashboard_id = dashboard_detail.get("id")
if resolved_dashboard_id is not None:
dashboard_id = int(resolved_dashboard_id)
# Check for native_filters_key in query params and fetch filter state.
# This must run AFTER dashboard_id is resolved from slug above.
native_filters_key = query_params.get("native_filters_key", [None])[0]
if native_filters_key and dashboard_id is not None:
try:
logger.reason(
"Fetching native filter state from Superset",
extra={"dashboard_id": dashboard_id, "filter_key": native_filters_key},
)
extracted = self.client.extract_native_filters_from_key(
dashboard_id, native_filters_key
)
data_mask = extracted.get("dataMask")
if isinstance(data_mask, dict) and data_mask:
query_state["native_filter_state"] = data_mask
logger.reason(
"Extracted native filter state from Superset via native_filters_key",
extra={"filter_count": len(data_mask)},
)
else:
logger.explore(
"Native filter state returned empty dataMask",
extra={"dashboard_id": dashboard_id, "filter_key": native_filters_key},
)
except Exception as exc:
logger.explore(
"Failed to fetch native filter state from Superset",
extra={"dashboard_id": dashboard_id, "filter_key": native_filters_key, "error": str(exc)},
)
datasets = dashboard_detail.get("datasets") or []
if datasets:
first_dataset = datasets[0]
@@ -287,6 +330,114 @@ class SupersetContextExtractor:
with belief_scope("SupersetContextExtractor.recover_imported_filters"):
recovered_filters: List[Dict[str, Any]] = []
seen_filter_keys: Set[str] = set()
metadata_filters: List[Dict[str, Any]] = []
metadata_filters_by_id: Dict[str, Dict[str, Any]] = {}
def merge_recovered_filter(candidate: Dict[str, Any]) -> None:
filter_key = candidate["filter_name"].strip().lower()
existing_index = next(
(
index
for index, existing in enumerate(recovered_filters)
if existing["filter_name"].strip().lower() == filter_key
),
None,
)
if existing_index is None:
seen_filter_keys.add(filter_key)
recovered_filters.append(candidate)
return
existing = recovered_filters[existing_index]
if existing.get("display_name") in {None, "", existing.get("filter_name")} and candidate.get("display_name"):
existing["display_name"] = candidate["display_name"]
if existing.get("raw_value") is None and candidate.get("raw_value") is not None:
existing["raw_value"] = candidate["raw_value"]
existing["confidence_state"] = candidate.get("confidence_state", "imported")
existing["requires_confirmation"] = candidate.get("requires_confirmation", False)
existing["recovery_status"] = candidate.get("recovery_status", "recovered")
existing["source"] = candidate.get("source", existing.get("source"))
if existing.get("normalized_value") is None and candidate.get("normalized_value") is not None:
existing["normalized_value"] = deepcopy(candidate["normalized_value"])
if existing.get("notes") and candidate.get("notes") and candidate["notes"] not in existing["notes"]:
existing["notes"] = f'{existing["notes"]}; {candidate["notes"]}'
if parsed_context.dashboard_id is not None:
try:
dashboard_payload = self.client.get_dashboard(parsed_context.dashboard_id)
dashboard_record = (
dashboard_payload.get("result", dashboard_payload)
if isinstance(dashboard_payload, dict)
else {}
)
json_metadata = dashboard_record.get("json_metadata")
if isinstance(json_metadata, str) and json_metadata.strip():
json_metadata = json.loads(json_metadata)
if not isinstance(json_metadata, dict):
json_metadata = {}
native_filter_configuration = json_metadata.get("native_filter_configuration") or []
default_filters = json_metadata.get("default_filters") or {}
if isinstance(default_filters, str) and default_filters.strip():
try:
default_filters = json.loads(default_filters)
except Exception:
logger.explore(
"Superset default_filters payload was not valid JSON",
extra={"dashboard_id": parsed_context.dashboard_id},
)
default_filters = {}
for item in native_filter_configuration:
if not isinstance(item, dict):
continue
filter_name = str(
item.get("name")
or item.get("filter_name")
or item.get("column")
or ""
).strip()
if not filter_name:
continue
display_name = item.get("label") or item.get("name") or filter_name
filter_id = str(item.get("id") or "").strip()
default_value = None
if isinstance(default_filters, dict):
default_value = default_filters.get(filter_name)
metadata_filter = self._normalize_imported_filter_payload(
{
"filter_name": filter_name,
"display_name": display_name,
"raw_value": default_value,
"source": "superset_native",
"recovery_status": "recovered" if default_value is not None else "partial",
"requires_confirmation": default_value is None,
"notes": "Recovered from Superset dashboard native filter configuration",
},
default_source="superset_native",
default_note="Recovered from Superset dashboard native filter configuration",
)
metadata_filters.append(metadata_filter)
if filter_id:
metadata_filters_by_id[filter_id.lower()] = {
"filter_name": filter_name,
"display_name": display_name,
}
except Exception as exc:
logger.explore(
"Dashboard native filter enrichment failed; preserving partial imported filters",
extra={
"dashboard_id": parsed_context.dashboard_id,
"error": str(exc),
"filter_count": len(recovered_filters),
},
)
metadata_filters = []
metadata_filters_by_id = {}
for item in parsed_context.imported_filters:
normalized = self._normalize_imported_filter_payload(
@@ -294,11 +445,24 @@ class SupersetContextExtractor:
default_source="superset_url",
default_note="Recovered from Superset URL state",
)
filter_key = normalized["filter_name"].strip().lower()
if filter_key in seen_filter_keys:
continue
seen_filter_keys.add(filter_key)
recovered_filters.append(normalized)
metadata_match = metadata_filters_by_id.get(normalized["filter_name"].strip().lower())
if metadata_match is not None:
normalized["filter_name"] = metadata_match["filter_name"]
normalized["display_name"] = metadata_match["display_name"]
normalized["notes"] = (
"Recovered from Superset URL state and reconciled against dashboard native filter metadata"
)
merge_recovered_filter(normalized)
logger.reflect(
"Recovered filter from URL state",
extra={
"filter_name": normalized["filter_name"],
"source": normalized["source"],
"has_value": normalized["raw_value"] is not None,
"canonicalized": metadata_match is not None,
},
)
if parsed_context.dashboard_id is None:
logger.reflect(
@@ -311,108 +475,48 @@ class SupersetContextExtractor:
)
return recovered_filters
try:
dashboard_payload = self.client.get_dashboard(parsed_context.dashboard_id)
dashboard_record = (
dashboard_payload.get("result", dashboard_payload)
if isinstance(dashboard_payload, dict)
else {}
for saved_filter in metadata_filters:
merge_recovered_filter(saved_filter)
logger.reflect(
"Recovered filter from dashboard metadata",
extra={
"filter_name": saved_filter["filter_name"],
"has_value": saved_filter["raw_value"] is not None,
},
)
json_metadata = dashboard_record.get("json_metadata")
if isinstance(json_metadata, str) and json_metadata.strip():
json_metadata = json.loads(json_metadata)
if not isinstance(json_metadata, dict):
json_metadata = {}
native_filter_configuration = json_metadata.get("native_filter_configuration") or []
default_filters = json_metadata.get("default_filters") or {}
if isinstance(default_filters, str) and default_filters.strip():
try:
default_filters = json.loads(default_filters)
except Exception:
logger.explore(
"Superset default_filters payload was not valid JSON",
extra={"dashboard_id": parsed_context.dashboard_id},
)
default_filters = {}
for item in native_filter_configuration:
if not isinstance(item, dict):
continue
filter_name = str(
item.get("name")
or item.get("filter_name")
or item.get("column")
or ""
).strip()
if not filter_name:
continue
filter_key = filter_name.lower()
if filter_key in seen_filter_keys:
continue
default_value = None
if isinstance(default_filters, dict):
default_value = default_filters.get(filter_name)
saved_filter = self._normalize_imported_filter_payload(
if not recovered_filters:
recovered_filters.append(
self._normalize_imported_filter_payload(
{
"filter_name": filter_name,
"display_name": item.get("label") or item.get("name"),
"raw_value": default_value,
"filter_name": f"dashboard_{parsed_context.dashboard_id}_filters",
"display_name": "Dashboard native filters",
"raw_value": None,
"source": "superset_native",
"recovery_status": "recovered" if default_value is not None else "partial",
"requires_confirmation": default_value is None,
"notes": "Recovered from Superset dashboard native filter configuration",
"recovery_status": "partial",
"requires_confirmation": True,
"notes": "Superset dashboard filter configuration could not be recovered fully",
},
default_source="superset_native",
default_note="Recovered from Superset dashboard native filter configuration",
default_note="Superset dashboard filter configuration could not be recovered fully",
)
seen_filter_keys.add(filter_key)
recovered_filters.append(saved_filter)
)
logger.reflect(
"Imported filter recovery completed with dashboard enrichment",
extra={
"dashboard_id": parsed_context.dashboard_id,
"filter_count": len(recovered_filters),
"partial_entries": len(
[
item
for item in recovered_filters
if item["recovery_status"] == "partial"
]
),
},
)
return recovered_filters
except Exception as exc:
logger.explore(
"Dashboard native filter enrichment failed; preserving partial imported filters",
extra={
"dashboard_id": parsed_context.dashboard_id,
"error": str(exc),
"filter_count": len(recovered_filters),
},
)
if not recovered_filters:
recovered_filters.append(
self._normalize_imported_filter_payload(
{
"filter_name": f"dashboard_{parsed_context.dashboard_id}_filters",
"display_name": "Dashboard native filters",
"raw_value": None,
"source": "superset_native",
"recovery_status": "partial",
"requires_confirmation": True,
"notes": "Superset dashboard filter configuration could not be recovered fully",
},
default_source="superset_native",
default_note="Superset dashboard filter configuration could not be recovered fully",
)
)
return recovered_filters
logger.reflect(
"Imported filter recovery completed with dashboard enrichment",
extra={
"dashboard_id": parsed_context.dashboard_id,
"filter_count": len(recovered_filters),
"partial_entries": len(
[
item
for item in recovered_filters
if item["recovery_status"] == "partial"
]
),
},
)
return recovered_filters
# [/DEF:SupersetContextExtractor.recover_imported_filters:Function]
# [DEF:SupersetContextExtractor.discover_template_variables:Function]
@@ -692,11 +796,23 @@ class SupersetContextExtractor:
or item.get("name")
or f"native_filter_{index}"
)
direct_clause = None
if item.get("column") and ("value" in item or "val" in item):
direct_clause = {
"col": item.get("column"),
"op": item.get("op") or ("IN" if isinstance(item.get("value"), list) else "=="),
"val": item.get("val", item.get("value")),
}
imported_filters.append(
{
"filter_name": str(filter_name),
"raw_value": item.get("value"),
"display_name": item.get("label") or item.get("name"),
"normalized_value": {
"filter_clauses": [direct_clause] if isinstance(direct_clause, dict) else [],
"extra_form_data": {},
"value_origin": "native_filters",
},
"source": "superset_url",
"recovery_status": "recovered"
if item.get("value") is not None
@@ -706,6 +822,7 @@ class SupersetContextExtractor:
}
)
# Extract filters from permalink dataMask
dashboard_data_mask = query_state.get("dataMask")
if isinstance(dashboard_data_mask, dict):
for filter_key, item in dashboard_data_mask.items():
@@ -715,20 +832,54 @@ class SupersetContextExtractor:
extra_form_data = item.get("extraFormData")
display_name = None
raw_value = None
normalized_value = {
"filter_clauses": [],
"extra_form_data": deepcopy(extra_form_data) if isinstance(extra_form_data, dict) else {},
"value_origin": "unresolved",
}
# Try to get value from filterState
if isinstance(filter_state, dict):
display_name = filter_state.get("label")
raw_value = filter_state.get("value")
if raw_value is None and isinstance(extra_form_data, dict):
# Superset filterState uses 'value' for single values, 'values' for multi-select
raw_value = filter_state.get("value") or filter_state.get("values")
if raw_value is not None:
normalized_value["value_origin"] = "filter_state"
# Preserve exact Superset clauses from extraFormData.filters
if isinstance(extra_form_data, dict):
extra_filters = extra_form_data.get("filters")
if isinstance(extra_filters, list) and extra_filters:
first_filter = extra_filters[0]
if isinstance(first_filter, dict):
raw_value = first_filter.get("val")
if isinstance(extra_filters, list):
normalized_value["filter_clauses"] = [
deepcopy(extra_filter)
for extra_filter in extra_filters
if isinstance(extra_filter, dict)
]
# If no value found, try extraFormData.filters
if raw_value is None and normalized_value["filter_clauses"]:
first_filter = normalized_value["filter_clauses"][0]
raw_value = first_filter.get("val")
if raw_value is None:
raw_value = first_filter.get("value")
if raw_value is not None:
normalized_value["value_origin"] = "extra_form_data.filters"
# If still no value, try extraFormData directly for time_range, time_grain, etc.
if raw_value is None and isinstance(extra_form_data, dict):
# Common Superset filter fields
for field in ["time_range", "time_grain_sqla", "time_column", "granularity"]:
if field in extra_form_data:
raw_value = extra_form_data[field]
normalized_value["value_origin"] = f"extra_form_data.{field}"
break
imported_filters.append(
{
"filter_name": str(item.get("id") or filter_key),
"raw_value": raw_value,
"display_name": display_name,
"normalized_value": normalized_value,
"source": "superset_permalink",
"recovery_status": "recovered" if raw_value is not None else "partial",
"requires_confirmation": raw_value is None,
@@ -736,6 +887,73 @@ class SupersetContextExtractor:
}
)
# Extract filters from native_filter_state (fetched from Superset via native_filters_key)
native_filter_state = query_state.get("native_filter_state")
if isinstance(native_filter_state, dict):
for filter_key, item in native_filter_state.items():
if not isinstance(item, dict):
continue
# Handle both single filter format and multi-filter format
filter_id = item.get("id") or filter_key
filter_state = item.get("filterState")
extra_form_data = item.get("extraFormData")
display_name = None
raw_value = None
normalized_value = {
"filter_clauses": [],
"extra_form_data": deepcopy(extra_form_data) if isinstance(extra_form_data, dict) else {},
"value_origin": "unresolved",
}
# Try to get value from filterState
if isinstance(filter_state, dict):
display_name = filter_state.get("label")
# Superset filterState uses 'value' for single values, 'values' for multi-select
raw_value = filter_state.get("value") or filter_state.get("values")
if raw_value is not None:
normalized_value["value_origin"] = "filter_state"
# Preserve exact Superset clauses from extraFormData.filters
if isinstance(extra_form_data, dict):
extra_filters = extra_form_data.get("filters")
if isinstance(extra_filters, list):
normalized_value["filter_clauses"] = [
deepcopy(extra_filter)
for extra_filter in extra_filters
if isinstance(extra_filter, dict)
]
# If no value found, try extraFormData.filters
if raw_value is None and normalized_value["filter_clauses"]:
first_filter = normalized_value["filter_clauses"][0]
raw_value = first_filter.get("val")
if raw_value is None:
raw_value = first_filter.get("value")
if raw_value is not None:
normalized_value["value_origin"] = "extra_form_data.filters"
# If still no value, try extraFormData directly for time_range, time_grain, etc.
if raw_value is None and isinstance(extra_form_data, dict):
# Common Superset filter fields
for field in ["time_range", "time_grain_sqla", "time_column", "granularity"]:
if field in extra_form_data:
raw_value = extra_form_data[field]
normalized_value["value_origin"] = f"extra_form_data.{field}"
break
imported_filters.append(
{
"filter_name": str(filter_id),
"raw_value": raw_value,
"display_name": display_name,
"normalized_value": normalized_value,
"source": "superset_native_filters_key",
"recovery_status": "recovered" if raw_value is not None else "partial",
"requires_confirmation": raw_value is None,
"notes": "Recovered from Superset native_filters_key state",
}
)
form_data_payload = query_state.get("form_data")
if isinstance(form_data_payload, dict):
extra_filters = form_data_payload.get("extra_filters") or []
@@ -748,6 +966,11 @@ class SupersetContextExtractor:
"filter_name": str(filter_name),
"raw_value": item.get("val"),
"display_name": item.get("label"),
"normalized_value": {
"filter_clauses": [deepcopy(item)],
"extra_form_data": {},
"value_origin": "form_data.extra_filters",
},
"source": "superset_url",
"recovery_status": "recovered"
if item.get("val") is not None

View File

@@ -357,6 +357,8 @@ class SemanticCandidate(Base):
class FilterSource(str, enum.Enum):
SUPERSET_NATIVE = "superset_native"
SUPERSET_URL = "superset_url"
SUPERSET_PERMALINK = "superset_permalink"
SUPERSET_NATIVE_FILTERS_KEY = "superset_native_filters_key"
MANUAL = "manual"
INFERRED = "inferred"
# [/DEF:FilterSource:Class]

View File

@@ -0,0 +1,151 @@
# [DEF:backend.src.models.filter_state:Module]
#
# @COMPLEXITY: 2
# @SEMANTICS: superset, native, filters, pydantic, models, dataclasses
# @PURPOSE: Pydantic models for Superset native filter state extraction and restoration.
# @LAYER: Models
# @RELATION: [DEPENDS_ON] ->[pydantic]
# [SECTION: IMPORTS]
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, ConfigDict, Field
# [/SECTION]
# [DEF:FilterState:Model]
# @COMPLEXITY: 2
# @PURPOSE: Represents the state of a single native filter.
# @DATA_CONTRACT: Input[extraFormData: Dict, filterState: Dict, ownState: Optional[Dict]] -> Model[FilterState]
class FilterState(BaseModel):
"""Single native filter state with extraFormData, filterState, and ownState."""
model_config = ConfigDict(extra="allow")
extraFormData: Dict[str, Any] = Field(default_factory=dict, description="Extra form data for the filter")
filterState: Dict[str, Any] = Field(default_factory=dict, description="Current filter state")
ownState: Dict[str, Any] = Field(default_factory=dict, description="Own state of the filter")
# [/DEF:FilterState:Model]
# [DEF:NativeFilterDataMask:Model]
# @COMPLEXITY: 2
# @PURPOSE: Represents the dataMask containing all native filter states.
# @DATA_CONTRACT: Input[Dict[filter_id, FilterState]] -> Model[NativeFilterDataMask]
class NativeFilterDataMask(BaseModel):
"""Container for all native filter states in a dashboard."""
model_config = ConfigDict(extra="allow")
filters: Dict[str, Any] = Field(default_factory=dict, description="Map of filter ID to filter state data")
def get_filter_ids(self) -> List[str]:
"""Return list of all filter IDs."""
return list(self.filters.keys())
def get_extra_form_data(self, filter_id: str) -> Dict[str, Any]:
"""Get extraFormData for a specific filter."""
filter_state = self.filters.get(filter_id)
if filter_state:
return filter_state.extraFormData
return {}
# [/DEF:NativeFilterDataMask:Model]
# [DEF:ParsedNativeFilters:Model]
# @COMPLEXITY: 2
# @PURPOSE: Result of parsing native filters from permalink or native_filters_key.
# @DATA_CONTRACT: Input[dataMask: Dict, metadata: Dict] -> Model[ParsedNativeFilters]
class ParsedNativeFilters(BaseModel):
"""Result of extracting native filters from a Superset URL."""
model_config = ConfigDict(extra="allow")
dataMask: Dict[str, Any] = Field(default_factory=dict, description="Extracted dataMask from filters")
filter_type: Optional[str] = Field(default=None, description="Type of filter: permalink, native_filters_key, or native_filters")
dashboard_id: Optional[str] = Field(default=None, description="Dashboard ID if available")
permalink_key: Optional[str] = Field(default=None, description="Permalink key if used")
filter_state_key: Optional[str] = Field(default=None, description="Filter state key if used")
active_tabs: List[str] = Field(default_factory=list, description="Active tabs in dashboard")
anchor: Optional[str] = Field(default=None, description="Anchor position in dashboard")
chart_states: Dict[str, Any] = Field(default_factory=dict, description="Chart states in dashboard")
def has_filters(self) -> bool:
"""Check if any filters were extracted."""
return bool(self.dataMask)
def get_filter_count(self) -> int:
"""Get the number of filters extracted."""
return len(self.dataMask)
# [/DEF:ParsedNativeFilters:Model]
# [DEF:DashboardURLFilterExtraction:Model]
# @COMPLEXITY: 2
# @PURPOSE: Result of parsing a complete dashboard URL for filter information.
# @DATA_CONTRACT: Input[url: str, dashboard_id: Optional, filter_type: Optional, filters: Dict] -> Model[DashboardURLFilterExtraction]
class DashboardURLFilterExtraction(BaseModel):
"""Result of parsing a Superset dashboard URL to extract filter state."""
model_config = ConfigDict(extra="allow")
url: str = Field(..., description="Original dashboard URL")
dashboard_id: Optional[str] = Field(default=None, description="Extracted dashboard ID")
filter_type: Optional[str] = Field(default=None, description="Type of filter found")
filters: ParsedNativeFilters = Field(default_factory=ParsedNativeFilters, description="Extracted filter data")
success: bool = Field(default=True, description="Whether extraction was successful")
error: Optional[str] = Field(default=None, description="Error message if extraction failed")
# [/DEF:DashboardURLFilterExtraction:Model]
# [DEF:ExtraFormDataMerge:Model]
# @COMPLEXITY: 2
# @PURPOSE: Configuration for merging extraFormData from different sources.
# @DATA_CONTRACT: Input[append_keys: List[str], override_keys: List[str]] -> Model[ExtraFormDataMerge]
class ExtraFormDataMerge(BaseModel):
"""Configuration for merging extraFormData between original and new filter values."""
# Keys that should be appended (arrays, filters)
append_keys: List[str] = Field(
default_factory=lambda: ["filters", "extras", "columns", "metrics"],
description="Keys that should be merged by appending"
)
# Keys that should be overridden (single values)
override_keys: List[str] = Field(
default_factory=lambda: ["time_range", "time_grain_sqla", "time_column", "granularity"],
description="Keys that should be overridden by new values"
)
def merge(self, original: Dict[str, Any], new: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge two extraFormData dictionaries.
@param original: Original extraFormData from dashboard metadata
@param new: New extraFormData from URL/permalink
@return: Merged extraFormData dictionary
"""
result = {}
# Start with original
for key, value in original.items():
result[key] = value
# Apply overrides and appends from new
for key, new_value in new.items():
if key in self.override_keys:
# Override the value
result[key] = new_value
elif key in self.append_keys:
# Append to the existing value
existing = result.get(key)
if isinstance(existing, list) and isinstance(new_value, list):
result[key] = existing + new_value
else:
result[key] = new_value
else:
result[key] = new_value
return result
# [/DEF:ExtraFormDataMerge:Model]
# [/DEF:backend.src.models.filter_state:Module]

View File

@@ -798,6 +798,7 @@ class DatasetReviewOrchestrator:
approved_mapping_ids: List[str] = []
open_warning_refs: List[str] = []
preview_blockers: List[str] = []
mapped_filter_ids: set[str] = set()
for mapping in session.execution_mappings:
imported_filter = filter_lookup.get(mapping.filter_id)
@@ -821,23 +822,46 @@ class DatasetReviewOrchestrator:
preview_blockers.append(f"variable:{template_variable.variable_name}:missing_required_value")
continue
effective_filters.append(
{
"mapping_id": mapping.mapping_id,
"filter_id": imported_filter.filter_id,
"filter_name": imported_filter.filter_name,
"variable_id": template_variable.variable_id,
"variable_name": template_variable.variable_name,
"effective_value": effective_value,
"raw_input_value": mapping.raw_input_value,
}
)
mapped_filter_ids.add(imported_filter.filter_id)
if effective_value is not None:
effective_filters.append(
{
"mapping_id": mapping.mapping_id,
"filter_id": imported_filter.filter_id,
"filter_name": imported_filter.filter_name,
"display_name": imported_filter.display_name,
"variable_id": template_variable.variable_id,
"variable_name": template_variable.variable_name,
"effective_value": effective_value,
"raw_input_value": mapping.raw_input_value,
"normalized_filter_payload": imported_filter.normalized_value,
}
)
template_params[template_variable.variable_name] = effective_value
if mapping.approval_state == ApprovalState.APPROVED:
approved_mapping_ids.append(mapping.mapping_id)
if mapping.requires_explicit_approval and mapping.approval_state != ApprovalState.APPROVED:
open_warning_refs.append(mapping.mapping_id)
for imported_filter in session.imported_filters:
if imported_filter.filter_id in mapped_filter_ids:
continue
effective_value = imported_filter.normalized_value
if effective_value is None:
effective_value = imported_filter.raw_value
if effective_value is None:
continue
effective_filters.append(
{
"filter_id": imported_filter.filter_id,
"filter_name": imported_filter.filter_name,
"display_name": imported_filter.display_name,
"effective_value": effective_value,
"raw_input_value": imported_filter.raw_value,
"normalized_filter_payload": imported_filter.normalized_value,
}
)
mapped_variable_ids = {mapping.variable_id for mapping in session.execution_mappings}
for variable in session.template_variables:
if variable.variable_id in mapped_variable_ids: