semantic update
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
# @PURPOSE: Implements a plugin for searching text patterns across all datasets in a specific Superset environment.
|
||||
# @LAYER: Plugins
|
||||
# @RELATION: Inherits from PluginBase. Uses SupersetClient from core.
|
||||
# @RELATION: USES -> TaskContext
|
||||
# @CONSTRAINT: Must use belief_scope for logging.
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
@@ -11,6 +12,7 @@ from typing import Dict, Any, List, Optional
|
||||
from ..core.plugin_base import PluginBase
|
||||
from ..core.superset_client import SupersetClient
|
||||
from ..core.logger import logger, belief_scope
|
||||
from ..core.task_manager.context import TaskContext
|
||||
# [/SECTION]
|
||||
|
||||
# [DEF:SearchPlugin:Class]
|
||||
@@ -99,18 +101,26 @@ class SearchPlugin(PluginBase):
|
||||
# [/DEF:get_schema:Function]
|
||||
|
||||
# [DEF:execute:Function]
|
||||
# @PURPOSE: Executes the dataset search logic.
|
||||
# @PURPOSE: Executes the dataset search logic with TaskContext support.
|
||||
# @PARAM: params (Dict[str, Any]) - Search parameters.
|
||||
# @PARAM: context (Optional[TaskContext]) - Task context for logging with source attribution.
|
||||
# @PRE: Params contain valid 'env' and 'query'.
|
||||
# @POST: Returns a dictionary with count and results list.
|
||||
# @RETURN: Dict[str, Any] - Search results.
|
||||
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||
async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None) -> Dict[str, Any]:
|
||||
with belief_scope("SearchPlugin.execute", f"params={params}"):
|
||||
env_name = params.get("env")
|
||||
search_query = params.get("query")
|
||||
|
||||
# Use TaskContext logger if available, otherwise fall back to app logger
|
||||
log = context.logger if context else logger
|
||||
|
||||
# Create sub-loggers for different components
|
||||
superset_log = log.with_source("superset_api") if context else log
|
||||
search_log = log.with_source("search") if context else log
|
||||
|
||||
if not env_name or not search_query:
|
||||
logger.error("[SearchPlugin.execute][State] Missing required parameters.")
|
||||
log.error("Missing required parameters: env, query")
|
||||
raise ValueError("Missing required parameters: env, query")
|
||||
|
||||
# Get config and initialize client
|
||||
@@ -118,20 +128,20 @@ class SearchPlugin(PluginBase):
|
||||
config_manager = get_config_manager()
|
||||
env_config = config_manager.get_environment(env_name)
|
||||
if not env_config:
|
||||
logger.error(f"[SearchPlugin.execute][State] Environment '{env_name}' not found.")
|
||||
log.error(f"Environment '{env_name}' not found in configuration.")
|
||||
raise ValueError(f"Environment '{env_name}' not found in configuration.")
|
||||
|
||||
client = SupersetClient(env_config)
|
||||
client.authenticate()
|
||||
|
||||
logger.info(f"[SearchPlugin.execute][Action] Searching for pattern: '{search_query}' in environment: {env_name}")
|
||||
log.info(f"Searching for pattern: '{search_query}' in environment: {env_name}")
|
||||
|
||||
try:
|
||||
# Ported logic from search_script.py
|
||||
_, datasets = client.get_datasets(query={"columns": ["id", "table_name", "sql", "database", "columns"]})
|
||||
|
||||
if not datasets:
|
||||
logger.warning("[SearchPlugin.execute][State] No datasets found.")
|
||||
search_log.warning("No datasets found.")
|
||||
return {"count": 0, "results": []}
|
||||
|
||||
pattern = re.compile(search_query, re.IGNORECASE)
|
||||
@@ -155,17 +165,17 @@ class SearchPlugin(PluginBase):
|
||||
"full_value": value_str
|
||||
})
|
||||
|
||||
logger.info(f"[SearchPlugin.execute][Success] Found matches in {len(results)} locations.")
|
||||
search_log.info(f"Found matches in {len(results)} locations.")
|
||||
return {
|
||||
"count": len(results),
|
||||
"results": results
|
||||
}
|
||||
|
||||
except re.error as e:
|
||||
logger.error(f"[SearchPlugin.execute][Failure] Invalid regex pattern: {e}")
|
||||
search_log.error(f"Invalid regex pattern: {e}")
|
||||
raise ValueError(f"Invalid regex pattern: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"[SearchPlugin.execute][Failure] Error during search: {e}")
|
||||
log.error(f"Error during search: {e}")
|
||||
raise
|
||||
# [/DEF:execute:Function]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user