# [DEF:backend/src/plugins/llm_analysis/plugin.py:Module] # @TIER: STANDARD # @SEMANTICS: plugin, llm, analysis, documentation # @PURPOSE: Implements DashboardValidationPlugin and DocumentationPlugin. # @LAYER: Domain # @RELATION: INHERITS -> backend.src.core.plugin_base.PluginBase # @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.ScreenshotService # @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.LLMClient # @RELATION: CALLS -> backend.src.services.llm_provider.LLMProviderService # @RELATION: USES -> TaskContext # @INVARIANT: All LLM interactions must be executed as asynchronous tasks. from typing import Dict, Any, Optional import os import json from datetime import datetime, timedelta from ...core.plugin_base import PluginBase from ...core.logger import belief_scope, logger from ...core.database import SessionLocal from ...services.llm_provider import LLMProviderService from ...core.superset_client import SupersetClient from .service import ScreenshotService, LLMClient from .models import LLMProviderType, ValidationStatus, ValidationResult, DetectedIssue from ...models.llm import ValidationRecord from ...core.task_manager.context import TaskContext from ...services.llm_prompt_templates import ( DEFAULT_LLM_PROMPTS, normalize_llm_settings, render_prompt, ) # [DEF:DashboardValidationPlugin:Class] # @PURPOSE: Plugin for automated dashboard health analysis using LLMs. # @RELATION: IMPLEMENTS -> backend.src.core.plugin_base.PluginBase class DashboardValidationPlugin(PluginBase): @property def id(self) -> str: return "llm_dashboard_validation" @property def name(self) -> str: return "Dashboard LLM Validation" @property def description(self) -> str: return "Automated dashboard health analysis using multimodal LLMs." @property def version(self) -> str: return "1.0.0" def get_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { "dashboard_id": {"type": "string", "title": "Dashboard ID"}, "environment_id": {"type": "string", "title": "Environment ID"}, "provider_id": {"type": "string", "title": "LLM Provider ID"} }, "required": ["dashboard_id", "environment_id", "provider_id"] } # [DEF:DashboardValidationPlugin.execute:Function] # @PURPOSE: Executes the dashboard validation task with TaskContext support. # @PARAM: params (Dict[str, Any]) - Validation parameters. # @PARAM: context (Optional[TaskContext]) - Task context for logging with source attribution. # @PRE: params contains dashboard_id, environment_id, and provider_id. # @POST: Returns a dictionary with validation results and persists them to the database. # @SIDE_EFFECT: Captures a screenshot, calls LLM API, and writes to the database. async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None): with belief_scope("execute", f"plugin_id={self.id}"): # Use TaskContext logger if available, otherwise fall back to app logger log = context.logger if context else logger # Create sub-loggers for different components llm_log = log.with_source("llm") if context else log screenshot_log = log.with_source("screenshot") if context else log superset_log = log.with_source("superset_api") if context else log log.info(f"Executing {self.name} with params: {params}") dashboard_id_raw = params.get("dashboard_id") dashboard_id = str(dashboard_id_raw) if dashboard_id_raw is not None else None env_id = params.get("environment_id") provider_id = params.get("provider_id") db = SessionLocal() try: # 1. Get Environment from ...dependencies import get_config_manager config_mgr = get_config_manager() env = config_mgr.get_environment(env_id) if not env: log.error(f"Environment {env_id} not found") raise ValueError(f"Environment {env_id} not found") # 2. Get LLM Provider llm_service = LLMProviderService(db) db_provider = llm_service.get_provider(provider_id) if not db_provider: log.error(f"LLM Provider {provider_id} not found") raise ValueError(f"LLM Provider {provider_id} not found") llm_log.debug("Retrieved provider config:") llm_log.debug(f" Provider ID: {db_provider.id}") llm_log.debug(f" Provider Name: {db_provider.name}") llm_log.debug(f" Provider Type: {db_provider.provider_type}") llm_log.debug(f" Base URL: {db_provider.base_url}") llm_log.debug(f" Default Model: {db_provider.default_model}") llm_log.debug(f" Is Active: {db_provider.is_active}") api_key = llm_service.get_decrypted_api_key(provider_id) llm_log.debug(f"API Key decrypted (first 8 chars): {api_key[:8] if api_key and len(api_key) > 8 else 'EMPTY_OR_NONE'}...") # Check if API key was successfully decrypted if not api_key: raise ValueError( f"Failed to decrypt API key for provider {provider_id}. " f"The provider may have been encrypted with a different encryption key. " f"Please update the provider with a new API key through the UI." ) # 3. Capture Screenshot screenshot_service = ScreenshotService(env) storage_root = config_mgr.get_config().settings.storage.root_path screenshots_dir = os.path.join(storage_root, "screenshots") os.makedirs(screenshots_dir, exist_ok=True) filename = f"{dashboard_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" screenshot_path = os.path.join(screenshots_dir, filename) screenshot_log.info(f"Capturing screenshot for dashboard {dashboard_id}") await screenshot_service.capture_dashboard(dashboard_id, screenshot_path) screenshot_log.debug(f"Screenshot saved to: {screenshot_path}") # 4. Fetch Logs (from Environment /api/v1/log/) logs = [] try: client = SupersetClient(env) # Calculate time window (last 24 hours) start_time = (datetime.now() - timedelta(hours=24)).isoformat() # Construct filter for logs # Note: We filter by dashboard_id matching the object query_params = { "filters": [ {"col": "dashboard_id", "opr": "eq", "value": dashboard_id}, {"col": "dttm", "opr": "gt", "value": start_time} ], "order_column": "dttm", "order_direction": "desc", "page": 0, "page_size": 100 } superset_log.debug(f"Fetching logs for dashboard {dashboard_id}") response = client.network.request( method="GET", endpoint="/log/", params={"q": json.dumps(query_params)} ) if isinstance(response, dict) and "result" in response: for item in response["result"]: action = item.get("action", "unknown") dttm = item.get("dttm", "") details = item.get("json", "") logs.append(f"[{dttm}] {action}: {details}") if not logs: logs = ["No recent logs found for this dashboard."] superset_log.debug("No recent logs found for this dashboard") except Exception as e: superset_log.warning(f"Failed to fetch logs from environment: {e}") logs = [f"Error fetching remote logs: {str(e)}"] # 5. Analyze with LLM llm_client = LLMClient( provider_type=LLMProviderType(db_provider.provider_type), api_key=api_key, base_url=db_provider.base_url, default_model=db_provider.default_model ) llm_log.info(f"Analyzing dashboard {dashboard_id} with LLM") llm_settings = normalize_llm_settings(config_mgr.get_config().settings.llm) dashboard_prompt = llm_settings["prompts"].get( "dashboard_validation_prompt", DEFAULT_LLM_PROMPTS["dashboard_validation_prompt"], ) analysis = await llm_client.analyze_dashboard( screenshot_path, logs, prompt_template=dashboard_prompt, ) # Log analysis summary to task logs for better visibility llm_log.info(f"[ANALYSIS_SUMMARY] Status: {analysis['status']}") llm_log.info(f"[ANALYSIS_SUMMARY] Summary: {analysis['summary']}") if analysis.get("issues"): for i, issue in enumerate(analysis["issues"]): llm_log.info(f"[ANALYSIS_ISSUE][{i+1}] {issue.get('severity')}: {issue.get('message')} (Location: {issue.get('location', 'N/A')})") # 6. Persist Result validation_result = ValidationResult( dashboard_id=dashboard_id, status=ValidationStatus(analysis["status"]), summary=analysis["summary"], issues=[DetectedIssue(**issue) for issue in analysis["issues"]], screenshot_path=screenshot_path, raw_response=str(analysis) ) db_record = ValidationRecord( dashboard_id=validation_result.dashboard_id, status=validation_result.status.value, summary=validation_result.summary, issues=[issue.dict() for issue in validation_result.issues], screenshot_path=validation_result.screenshot_path, raw_response=validation_result.raw_response ) db.add(db_record) db.commit() # 7. Notification on failure (US1 / FR-015) if validation_result.status == ValidationStatus.FAIL: log.warning(f"Dashboard {dashboard_id} validation FAILED. Summary: {validation_result.summary}") # Placeholder for Email/Pulse notification dispatch # In a real implementation, we would call a NotificationService here # with a payload containing the summary and a link to the report. # Final log to ensure all analysis is visible in task logs log.info(f"Validation completed for dashboard {dashboard_id}. Status: {validation_result.status.value}") return validation_result.dict() finally: db.close() # [/DEF:DashboardValidationPlugin.execute:Function] # [/DEF:DashboardValidationPlugin:Class] # [DEF:DocumentationPlugin:Class] # @PURPOSE: Plugin for automated dataset documentation using LLMs. # @RELATION: IMPLEMENTS -> backend.src.core.plugin_base.PluginBase class DocumentationPlugin(PluginBase): @property def id(self) -> str: return "llm_documentation" @property def name(self) -> str: return "Dataset LLM Documentation" @property def description(self) -> str: return "Automated dataset and column documentation using LLMs." @property def version(self) -> str: return "1.0.0" def get_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { "dataset_id": {"type": "string", "title": "Dataset ID"}, "environment_id": {"type": "string", "title": "Environment ID"}, "provider_id": {"type": "string", "title": "LLM Provider ID"} }, "required": ["dataset_id", "environment_id", "provider_id"] } # [DEF:DocumentationPlugin.execute:Function] # @PURPOSE: Executes the dataset documentation task with TaskContext support. # @PARAM: params (Dict[str, Any]) - Documentation parameters. # @PARAM: context (Optional[TaskContext]) - Task context for logging with source attribution. # @PRE: params contains dataset_id, environment_id, and provider_id. # @POST: Returns generated documentation and updates the dataset in Superset. # @SIDE_EFFECT: Calls LLM API and updates dataset metadata in Superset. async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None): with belief_scope("execute", f"plugin_id={self.id}"): # Use TaskContext logger if available, otherwise fall back to app logger log = context.logger if context else logger # Create sub-loggers for different components llm_log = log.with_source("llm") if context else log superset_log = log.with_source("superset_api") if context else log log.info(f"Executing {self.name} with params: {params}") dataset_id = params.get("dataset_id") env_id = params.get("environment_id") provider_id = params.get("provider_id") db = SessionLocal() try: # 1. Get Environment from ...dependencies import get_config_manager config_mgr = get_config_manager() env = config_mgr.get_environment(env_id) if not env: log.error(f"Environment {env_id} not found") raise ValueError(f"Environment {env_id} not found") # 2. Get LLM Provider llm_service = LLMProviderService(db) db_provider = llm_service.get_provider(provider_id) if not db_provider: log.error(f"LLM Provider {provider_id} not found") raise ValueError(f"LLM Provider {provider_id} not found") llm_log.debug("Retrieved provider config:") llm_log.debug(f" Provider ID: {db_provider.id}") llm_log.debug(f" Provider Name: {db_provider.name}") llm_log.debug(f" Provider Type: {db_provider.provider_type}") llm_log.debug(f" Base URL: {db_provider.base_url}") llm_log.debug(f" Default Model: {db_provider.default_model}") api_key = llm_service.get_decrypted_api_key(provider_id) llm_log.debug(f"API Key decrypted (first 8 chars): {api_key[:8] if api_key and len(api_key) > 8 else 'EMPTY_OR_NONE'}...") # Check if API key was successfully decrypted if not api_key: raise ValueError( f"Failed to decrypt API key for provider {provider_id}. " f"The provider may have been encrypted with a different encryption key. " f"Please update the provider with a new API key through the UI." ) # 3. Fetch Metadata (US2 / T024) from ...core.superset_client import SupersetClient client = SupersetClient(env) superset_log.debug(f"Fetching dataset {dataset_id}") dataset = client.get_dataset(int(dataset_id)) # Extract columns and existing descriptions columns_data = [] for col in dataset.get("columns", []): columns_data.append({ "name": col.get("column_name"), "type": col.get("type"), "description": col.get("description") }) superset_log.debug(f"Extracted {len(columns_data)} columns from dataset") # 4. Construct Prompt & Analyze (US2 / T025) llm_client = LLMClient( provider_type=LLMProviderType(db_provider.provider_type), api_key=api_key, base_url=db_provider.base_url, default_model=db_provider.default_model ) llm_settings = normalize_llm_settings(config_mgr.get_config().settings.llm) documentation_prompt = llm_settings["prompts"].get( "documentation_prompt", DEFAULT_LLM_PROMPTS["documentation_prompt"], ) prompt = render_prompt( documentation_prompt, { "dataset_name": dataset.get("table_name") or "", "columns_json": json.dumps(columns_data, ensure_ascii=False), }, ) # Using a generic chat completion for text-only US2 llm_log.info(f"Generating documentation for dataset {dataset_id}") doc_result = await llm_client.get_json_completion([{"role": "user", "content": prompt}]) # 5. Update Metadata (US2 / T026) update_payload = { "description": doc_result["dataset_description"], "columns": [] } # Map generated descriptions back to column IDs for col_doc in doc_result["column_descriptions"]: for col in dataset.get("columns", []): if col.get("column_name") == col_doc["name"]: update_payload["columns"].append({ "id": col.get("id"), "description": col_doc["description"] }) superset_log.info(f"Updating dataset {dataset_id} with generated documentation") client.update_dataset(int(dataset_id), update_payload) log.info(f"Documentation completed for dataset {dataset_id}") return doc_result finally: db.close() # [/DEF:DocumentationPlugin.execute:Function] # [/DEF:DocumentationPlugin:Class] # [/DEF:backend/src/plugins/llm_analysis/plugin.py:Module]