# [DEF:backend.src.api.routes.datasets:Module] # # @COMPLEXITY: 3 # @SEMANTICS: api, datasets, resources, hub # @PURPOSE: API endpoints for the Dataset Hub - listing datasets with mapping progress # @LAYER: API # @RELATION: DEPENDS_ON ->[AppDependencies] # @RELATION: DEPENDS_ON ->[backend.src.services.resource_service.ResourceService] # @RELATION: DEPENDS_ON ->[backend.src.core.superset_client.SupersetClient] # # @INVARIANT: All dataset responses include last_task metadata # [SECTION: IMPORTS] from fastapi import APIRouter, Depends, HTTPException, Query from typing import List, Optional from pydantic import BaseModel, Field from ...dependencies import get_config_manager, get_task_manager, get_resource_service, has_permission from ...core.logger import logger, belief_scope from ...core.superset_client import SupersetClient # [/SECTION] router = APIRouter(prefix="/api/datasets", tags=["Datasets"]) # [DEF:MappedFields:DataClass] # @COMPLEXITY: 1 # @PURPOSE: DTO for dataset mapping progress statistics class MappedFields(BaseModel): total: int mapped: int # [/DEF:MappedFields:DataClass] # [DEF:LastTask:DataClass] # @COMPLEXITY: 1 # @PURPOSE: DTO for the most recent task associated with a dataset class LastTask(BaseModel): task_id: Optional[str] = None status: Optional[str] = Field(None, pattern="^RUNNING|SUCCESS|ERROR|WAITING_INPUT$") # [/DEF:LastTask:DataClass] # [DEF:DatasetItem:DataClass] # @COMPLEXITY: 1 # @PURPOSE: Summary DTO for a dataset in the hub listing class DatasetItem(BaseModel): id: int table_name: str schema_name: str = Field(..., alias="schema") database: str mapped_fields: Optional[MappedFields] = None last_task: Optional[LastTask] = None class Config: allow_population_by_field_name = True # [/DEF:DatasetItem:DataClass] # [DEF:LinkedDashboard:DataClass] # @COMPLEXITY: 1 # @PURPOSE: DTO for a dashboard linked to a dataset class LinkedDashboard(BaseModel): id: int title: str slug: Optional[str] = None # [/DEF:LinkedDashboard:DataClass] # [DEF:DatasetColumn:DataClass] # @COMPLEXITY: 1 # @PURPOSE: DTO for a single dataset column's metadata class DatasetColumn(BaseModel): id: int name: str type: Optional[str] = None is_dttm: bool = False is_active: bool = True description: Optional[str] = None # [/DEF:DatasetColumn:DataClass] # [DEF:DatasetDetailResponse:DataClass] # @COMPLEXITY: 1 # @PURPOSE: Detailed DTO for a dataset including columns and links class DatasetDetailResponse(BaseModel): id: int table_name: Optional[str] = None schema_name: Optional[str] = Field(None, alias="schema") database: str description: Optional[str] = None columns: List[DatasetColumn] column_count: int sql: Optional[str] = None linked_dashboards: List[LinkedDashboard] linked_dashboard_count: int is_sqllab_view: bool = False created_on: Optional[str] = None changed_on: Optional[str] = None class Config: allow_population_by_field_name = True # [/DEF:DatasetDetailResponse:DataClass] # [DEF:DatasetsResponse:DataClass] # @COMPLEXITY: 1 # @PURPOSE: Paginated response DTO for dataset listings class DatasetsResponse(BaseModel): datasets: List[DatasetItem] total: int page: int page_size: int total_pages: int # [/DEF:DatasetsResponse:DataClass] # [DEF:TaskResponse:DataClass] # @COMPLEXITY: 1 # @PURPOSE: Response DTO containing a task ID for tracking class TaskResponse(BaseModel): task_id: str # [/DEF:TaskResponse:DataClass] # [DEF:get_dataset_ids:Function] # @COMPLEXITY: 3 # @PURPOSE: Fetch list of all dataset IDs from a specific environment (without pagination) # @PRE: env_id must be a valid environment ID # @POST: Returns a list of all dataset IDs # @PARAM: env_id (str) - The environment ID to fetch datasets from # @PARAM: search (Optional[str]) - Filter by table name # @RETURN: List[int] - List of dataset IDs # @RELATION: CALLS ->[get_datasets_with_status] @router.get("/ids") async def get_dataset_ids( env_id: str, search: Optional[str] = None, config_manager=Depends(get_config_manager), task_manager=Depends(get_task_manager), resource_service=Depends(get_resource_service), _ = Depends(has_permission("plugin:migration", "READ")) ): with belief_scope("get_dataset_ids", f"env_id={env_id}, search={search}"): # Validate environment exists environments = config_manager.get_environments() env = next((e for e in environments if e.id == env_id), None) if not env: logger.error(f"[get_dataset_ids][Coherence:Failed] Environment not found: {env_id}") raise HTTPException(status_code=404, detail="Environment not found") try: # Get all tasks for status lookup all_tasks = task_manager.get_all_tasks() # Fetch datasets with status using ResourceService datasets = await resource_service.get_datasets_with_status(env, all_tasks) # Apply search filter if provided if search: search_lower = search.lower() datasets = [ d for d in datasets if search_lower in d.get('table_name', '').lower() ] # Extract and return just the IDs dataset_ids = [d['id'] for d in datasets] logger.info(f"[get_dataset_ids][Coherence:OK] Returning {len(dataset_ids)} dataset IDs") return {"dataset_ids": dataset_ids} except Exception as e: logger.error(f"[get_dataset_ids][Coherence:Failed] Failed to fetch dataset IDs: {e}") raise HTTPException(status_code=503, detail=f"Failed to fetch dataset IDs: {str(e)}") # [/DEF:get_dataset_ids:Function] # [DEF:get_datasets:Function] # @COMPLEXITY: 3 # @PURPOSE: Fetch list of datasets from a specific environment with mapping progress # @PRE: env_id must be a valid environment ID # @PRE: page must be >= 1 if provided # @PRE: page_size must be between 1 and 100 if provided # @POST: Returns a list of datasets with enhanced metadata and pagination info # @POST: Response includes pagination metadata (page, page_size, total, total_pages) # @PARAM: env_id (str) - The environment ID to fetch datasets from # @PARAM: search (Optional[str]) - Filter by table name # @PARAM: page (Optional[int]) - Page number (default: 1) # @PARAM: page_size (Optional[int]) - Items per page (default: 10, max: 100) # @RETURN: DatasetsResponse - List of datasets with status metadata # @RELATION: CALLS ->[backend.src.services.resource_service.ResourceService.get_datasets_with_status] @router.get("", response_model=DatasetsResponse) async def get_datasets( env_id: str, search: Optional[str] = None, page: int = 1, page_size: int = 10, config_manager=Depends(get_config_manager), task_manager=Depends(get_task_manager), resource_service=Depends(get_resource_service), _ = Depends(has_permission("plugin:migration", "READ")) ): with belief_scope("get_datasets", f"env_id={env_id}, search={search}, page={page}, page_size={page_size}"): # Validate pagination parameters if page < 1: logger.error(f"[get_datasets][Coherence:Failed] Invalid page: {page}") raise HTTPException(status_code=400, detail="Page must be >= 1") if page_size < 1 or page_size > 100: logger.error(f"[get_datasets][Coherence:Failed] Invalid page_size: {page_size}") raise HTTPException(status_code=400, detail="Page size must be between 1 and 100") # Validate environment exists environments = config_manager.get_environments() env = next((e for e in environments if e.id == env_id), None) if not env: logger.error(f"[get_datasets][Coherence:Failed] Environment not found: {env_id}") raise HTTPException(status_code=404, detail="Environment not found") try: # Get all tasks for status lookup all_tasks = task_manager.get_all_tasks() # Fetch datasets with status using ResourceService datasets = await resource_service.get_datasets_with_status(env, all_tasks) # Apply search filter if provided if search: search_lower = search.lower() datasets = [ d for d in datasets if search_lower in d.get('table_name', '').lower() ] # Calculate pagination total = len(datasets) total_pages = (total + page_size - 1) // page_size if total > 0 else 1 start_idx = (page - 1) * page_size end_idx = start_idx + page_size # Slice datasets for current page paginated_datasets = datasets[start_idx:end_idx] logger.info(f"[get_datasets][Coherence:OK] Returning {len(paginated_datasets)} datasets (page {page}/{total_pages}, total: {total})") return DatasetsResponse( datasets=paginated_datasets, total=total, page=page, page_size=page_size, total_pages=total_pages ) except Exception as e: logger.error(f"[get_datasets][Coherence:Failed] Failed to fetch datasets: {e}") raise HTTPException(status_code=503, detail=f"Failed to fetch datasets: {str(e)}") # [/DEF:get_datasets:Function] # [DEF:MapColumnsRequest:DataClass] # @COMPLEXITY: 1 # @PURPOSE: Request DTO for initiating column mapping class MapColumnsRequest(BaseModel): env_id: str = Field(..., description="Environment ID") dataset_ids: List[int] = Field(..., description="List of dataset IDs to map") source_type: str = Field(..., description="Source type: 'postgresql' or 'xlsx'") connection_id: Optional[str] = Field(None, description="Connection ID for PostgreSQL source") file_data: Optional[str] = Field(None, description="File path or data for XLSX source") # [/DEF:MapColumnsRequest:DataClass] # [DEF:map_columns:Function] # @COMPLEXITY: 3 # @PURPOSE: Trigger bulk column mapping for datasets # @PRE: User has permission plugin:mapper:execute # @PRE: env_id is a valid environment ID # @PRE: dataset_ids is a non-empty list # @POST: Returns task_id for tracking mapping progress # @POST: Task is created and queued for execution # @PARAM: request (MapColumnsRequest) - Mapping request with environment and dataset IDs # @RETURN: TaskResponse - Task ID for tracking # @RELATION: DISPATCHES ->[backend.src.plugins.mapper.MapperPlugin] # @RELATION: CALLS ->[backend.src.core.task_manager.manager.TaskManager:create_task] @router.post("/map-columns", response_model=TaskResponse) async def map_columns( request: MapColumnsRequest, config_manager=Depends(get_config_manager), task_manager=Depends(get_task_manager), _ = Depends(has_permission("plugin:mapper", "EXECUTE")) ): with belief_scope("map_columns", f"env={request.env_id}, count={len(request.dataset_ids)}, source={request.source_type}"): # Validate request if not request.dataset_ids: logger.error("[map_columns][Coherence:Failed] No dataset IDs provided") raise HTTPException(status_code=400, detail="At least one dataset ID must be provided") # Validate source type if request.source_type not in ['postgresql', 'xlsx']: logger.error(f"[map_columns][Coherence:Failed] Invalid source type: {request.source_type}") raise HTTPException(status_code=400, detail="Source type must be 'postgresql' or 'xlsx'") # Validate environment exists environments = config_manager.get_environments() env = next((e for e in environments if e.id == request.env_id), None) if not env: logger.error(f"[map_columns][Coherence:Failed] Environment not found: {request.env_id}") raise HTTPException(status_code=404, detail="Environment not found") try: # Create mapping task task_params = { 'env': request.env_id, 'dataset_id': request.dataset_ids[0] if request.dataset_ids else None, 'source': request.source_type, 'connection_id': request.connection_id, 'file_data': request.file_data } task_obj = await task_manager.create_task( plugin_id='dataset-mapper', params=task_params ) logger.info(f"[map_columns][Coherence:OK] Mapping task created: {task_obj.id} for {len(request.dataset_ids)} datasets") return TaskResponse(task_id=str(task_obj.id)) except Exception as e: logger.error(f"[map_columns][Coherence:Failed] Failed to create mapping task: {e}") raise HTTPException(status_code=503, detail=f"Failed to create mapping task: {str(e)}") # [/DEF:map_columns:Function] # [DEF:GenerateDocsRequest:DataClass] # @COMPLEXITY: 1 # @PURPOSE: Request DTO for initiating documentation generation class GenerateDocsRequest(BaseModel): env_id: str = Field(..., description="Environment ID") dataset_ids: List[int] = Field(..., description="List of dataset IDs to generate docs for") llm_provider: str = Field(..., description="LLM provider to use") options: Optional[dict] = Field(None, description="Additional options for documentation generation") # [/DEF:GenerateDocsRequest:DataClass] # [DEF:generate_docs:Function] # @COMPLEXITY: 3 # @PURPOSE: Trigger bulk documentation generation for datasets # @PRE: User has permission plugin:llm_analysis:execute # @PRE: env_id is a valid environment ID # @PRE: dataset_ids is a non-empty list # @POST: Returns task_id for tracking documentation generation progress # @POST: Task is created and queued for execution # @PARAM: request (GenerateDocsRequest) - Documentation generation request # @RETURN: TaskResponse - Task ID for tracking # @RELATION: DISPATCHES ->[backend.src.plugins.llm_analysis.plugin.DocumentationPlugin] # @RELATION: CALLS ->[backend.src.core.task_manager.manager.TaskManager:create_task] @router.post("/generate-docs", response_model=TaskResponse) async def generate_docs( request: GenerateDocsRequest, config_manager=Depends(get_config_manager), task_manager=Depends(get_task_manager), _ = Depends(has_permission("plugin:llm_analysis", "EXECUTE")) ): with belief_scope("generate_docs", f"env={request.env_id}, count={len(request.dataset_ids)}, provider={request.llm_provider}"): # Validate request if not request.dataset_ids: logger.error("[generate_docs][Coherence:Failed] No dataset IDs provided") raise HTTPException(status_code=400, detail="At least one dataset ID must be provided") # Validate environment exists environments = config_manager.get_environments() env = next((e for e in environments if e.id == request.env_id), None) if not env: logger.error(f"[generate_docs][Coherence:Failed] Environment not found: {request.env_id}") raise HTTPException(status_code=404, detail="Environment not found") try: # Create documentation generation task task_params = { 'environment_id': request.env_id, 'dataset_id': str(request.dataset_ids[0]) if request.dataset_ids else None, 'provider_id': request.llm_provider, 'options': request.options or {} } task_obj = await task_manager.create_task( plugin_id='llm_documentation', params=task_params ) logger.info(f"[generate_docs][Coherence:OK] Documentation generation task created: {task_obj.id} for {len(request.dataset_ids)} datasets") return TaskResponse(task_id=str(task_obj.id)) except Exception as e: logger.error(f"[generate_docs][Coherence:Failed] Failed to create documentation generation task: {e}") raise HTTPException(status_code=503, detail=f"Failed to create documentation generation task: {str(e)}") # [/DEF:generate_docs:Function] # [DEF:get_dataset_detail:Function] # @COMPLEXITY: 3 # @PURPOSE: Get detailed dataset information including columns and linked dashboards # @PRE: env_id is a valid environment ID # @PRE: dataset_id is a valid dataset ID # @POST: Returns detailed dataset info with columns and linked dashboards # @PARAM: env_id (str) - The environment ID # @PARAM: dataset_id (int) - The dataset ID # @RETURN: DatasetDetailResponse - Detailed dataset information # @RELATION: CALLS ->[backend.src.core.superset_client.SupersetClient:get_dataset_detail] @router.get("/{dataset_id}", response_model=DatasetDetailResponse) async def get_dataset_detail( env_id: str, dataset_id: int, config_manager=Depends(get_config_manager), _ = Depends(has_permission("plugin:migration", "READ")) ): with belief_scope("get_dataset_detail", f"env_id={env_id}, dataset_id={dataset_id}"): # Validate environment exists environments = config_manager.get_environments() env = next((e for e in environments if e.id == env_id), None) if not env: logger.error(f"[get_dataset_detail][Coherence:Failed] Environment not found: {env_id}") raise HTTPException(status_code=404, detail="Environment not found") try: # Fetch detailed dataset info using SupersetClient client = SupersetClient(env) dataset_detail = client.get_dataset_detail(dataset_id) logger.info(f"[get_dataset_detail][Coherence:OK] Retrieved dataset {dataset_id} with {dataset_detail['column_count']} columns and {dataset_detail['linked_dashboard_count']} linked dashboards") return DatasetDetailResponse(**dataset_detail) except Exception as e: logger.error(f"[get_dataset_detail][Coherence:Failed] Failed to fetch dataset detail: {e}") raise HTTPException(status_code=503, detail=f"Failed to fetch dataset detail: {str(e)}") # [/DEF:get_dataset_detail:Function] # [/DEF:backend.src.api.routes.datasets:Module]