This commit is contained in:
2026-02-15 11:11:30 +03:00
parent 7790a2dc51
commit 6062712a92
19 changed files with 60656 additions and 58958 deletions

View File

@@ -16,6 +16,7 @@ from typing import List, Optional
from pydantic import BaseModel, Field
from ...dependencies import get_config_manager, get_task_manager, get_resource_service, has_permission
from ...core.logger import logger, belief_scope
from ...core.superset_client import SupersetClient
# [/SECTION]
router = APIRouter()
@@ -42,22 +43,64 @@ class DatasetItem(BaseModel):
last_task: Optional[LastTask] = None
# [/DEF:DatasetItem:DataClass]
# [DEF:LinkedDashboard:DataClass]
class LinkedDashboard(BaseModel):
id: int
title: str
slug: Optional[str] = None
# [/DEF:LinkedDashboard:DataClass]
# [DEF:DatasetColumn:DataClass]
class DatasetColumn(BaseModel):
id: int
name: str
type: Optional[str] = None
is_dttm: bool = False
is_active: bool = True
description: Optional[str] = None
# [/DEF:DatasetColumn:DataClass]
# [DEF:DatasetDetailResponse:DataClass]
class DatasetDetailResponse(BaseModel):
id: int
table_name: str
schema: str
database: str
description: Optional[str] = None
columns: List[DatasetColumn]
column_count: int
sql: Optional[str] = None
linked_dashboards: List[LinkedDashboard]
linked_dashboard_count: int
is_sqllab_view: bool = False
created_on: Optional[str] = None
changed_on: Optional[str] = None
# [/DEF:DatasetDetailResponse:DataClass]
# [DEF:DatasetsResponse:DataClass]
class DatasetsResponse(BaseModel):
datasets: List[DatasetItem]
total: int
page: int
page_size: int
total_pages: int
# [/DEF:DatasetsResponse:DataClass]
# [DEF:get_datasets:Function]
# @PURPOSE: Fetch list of datasets from a specific environment with mapping progress
# @PRE: env_id must be a valid environment ID
# @POST: Returns a list of datasets with enhanced metadata
# @PARAM: env_id (str) - The environment ID to fetch datasets from
# @PARAM: search (Optional[str]) - Filter by table name
# @RETURN: DatasetsResponse - List of datasets with status metadata
# [DEF:TaskResponse:DataClass]
class TaskResponse(BaseModel):
task_id: str
# [/DEF:TaskResponse:DataClass]
# [DEF:get_dataset_ids:Function]
# @PURPOSE: Fetch list of all dataset IDs from a specific environment (without pagination)
# @PRE: env_id must be a valid environment ID
# @POST: Returns a list of all dataset IDs
# @PARAM: env_id (str) - The environment ID to fetch datasets from
# @PARAM: search (Optional[str]) - Filter by table name
# @RETURN: List[int] - List of dataset IDs
# @RELATION: CALLS -> ResourceService.get_datasets_with_status
@router.get("/api/datasets", response_model=DatasetsResponse)
async def get_datasets(
@router.get("/api/datasets/ids")
async def get_dataset_ids(
env_id: str,
search: Optional[str] = None,
config_manager=Depends(get_config_manager),
@@ -65,7 +108,73 @@ async def get_datasets(
resource_service=Depends(get_resource_service),
_ = Depends(has_permission("plugin:migration", "READ"))
):
with belief_scope("get_datasets", f"env_id={env_id}, search={search}"):
with belief_scope("get_dataset_ids", f"env_id={env_id}, search={search}"):
# Validate environment exists
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == env_id), None)
if not env:
logger.error(f"[get_dataset_ids][Coherence:Failed] Environment not found: {env_id}")
raise HTTPException(status_code=404, detail="Environment not found")
try:
# Get all tasks for status lookup
all_tasks = task_manager.get_all_tasks()
# Fetch datasets with status using ResourceService
datasets = await resource_service.get_datasets_with_status(env, all_tasks)
# Apply search filter if provided
if search:
search_lower = search.lower()
datasets = [
d for d in datasets
if search_lower in d.get('table_name', '').lower()
]
# Extract and return just the IDs
dataset_ids = [d['id'] for d in datasets]
logger.info(f"[get_dataset_ids][Coherence:OK] Returning {len(dataset_ids)} dataset IDs")
return {"dataset_ids": dataset_ids}
except Exception as e:
logger.error(f"[get_dataset_ids][Coherence:Failed] Failed to fetch dataset IDs: {e}")
raise HTTPException(status_code=503, detail=f"Failed to fetch dataset IDs: {str(e)}")
# [/DEF:get_dataset_ids:Function]
# [DEF:get_datasets:Function]
# @PURPOSE: Fetch list of datasets from a specific environment with mapping progress
# @PRE: env_id must be a valid environment ID
# @PRE: page must be >= 1 if provided
# @PRE: page_size must be between 1 and 100 if provided
# @POST: Returns a list of datasets with enhanced metadata and pagination info
# @POST: Response includes pagination metadata (page, page_size, total, total_pages)
# @PARAM: env_id (str) - The environment ID to fetch datasets from
# @PARAM: search (Optional[str]) - Filter by table name
# @PARAM: page (Optional[int]) - Page number (default: 1)
# @PARAM: page_size (Optional[int]) - Items per page (default: 10, max: 100)
# @RETURN: DatasetsResponse - List of datasets with status metadata
# @RELATION: CALLS -> ResourceService.get_datasets_with_status
@router.get("/api/datasets", response_model=DatasetsResponse)
async def get_datasets(
env_id: str,
search: Optional[str] = None,
page: int = 1,
page_size: int = 10,
config_manager=Depends(get_config_manager),
task_manager=Depends(get_task_manager),
resource_service=Depends(get_resource_service),
_ = Depends(has_permission("plugin:migration", "READ"))
):
with belief_scope("get_datasets", f"env_id={env_id}, search={search}, page={page}, page_size={page_size}"):
# Validate pagination parameters
if page < 1:
logger.error(f"[get_datasets][Coherence:Failed] Invalid page: {page}")
raise HTTPException(status_code=400, detail="Page must be >= 1")
if page_size < 1 or page_size > 100:
logger.error(f"[get_datasets][Coherence:Failed] Invalid page_size: {page_size}")
raise HTTPException(status_code=400, detail="Page size must be between 1 and 100")
# Validate environment exists
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == env_id), None)
@@ -88,11 +197,23 @@ async def get_datasets(
if search_lower in d.get('table_name', '').lower()
]
logger.info(f"[get_datasets][Coherence:OK] Returning {len(datasets)} datasets")
# Calculate pagination
total = len(datasets)
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
# Slice datasets for current page
paginated_datasets = datasets[start_idx:end_idx]
logger.info(f"[get_datasets][Coherence:OK] Returning {len(paginated_datasets)} datasets (page {page}/{total_pages}, total: {total})")
return DatasetsResponse(
datasets=datasets,
total=len(datasets)
datasets=paginated_datasets,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
except Exception as e:
@@ -100,4 +221,175 @@ async def get_datasets(
raise HTTPException(status_code=503, detail=f"Failed to fetch datasets: {str(e)}")
# [/DEF:get_datasets:Function]
# [DEF:MapColumnsRequest:DataClass]
class MapColumnsRequest(BaseModel):
env_id: str = Field(..., description="Environment ID")
dataset_ids: List[int] = Field(..., description="List of dataset IDs to map")
source_type: str = Field(..., description="Source type: 'postgresql' or 'xlsx'")
connection_id: Optional[str] = Field(None, description="Connection ID for PostgreSQL source")
file_data: Optional[str] = Field(None, description="File path or data for XLSX source")
# [/DEF:MapColumnsRequest:DataClass]
# [DEF:map_columns:Function]
# @PURPOSE: Trigger bulk column mapping for datasets
# @PRE: User has permission plugin:mapper:execute
# @PRE: env_id is a valid environment ID
# @PRE: dataset_ids is a non-empty list
# @POST: Returns task_id for tracking mapping progress
# @POST: Task is created and queued for execution
# @PARAM: request (MapColumnsRequest) - Mapping request with environment and dataset IDs
# @RETURN: TaskResponse - Task ID for tracking
# @RELATION: DISPATCHES -> MapperPlugin
# @RELATION: CALLS -> task_manager.create_task
@router.post("/api/datasets/map-columns", response_model=TaskResponse)
async def map_columns(
request: MapColumnsRequest,
config_manager=Depends(get_config_manager),
task_manager=Depends(get_task_manager),
_ = Depends(has_permission("plugin:mapper", "EXECUTE"))
):
with belief_scope("map_columns", f"env={request.env_id}, count={len(request.dataset_ids)}, source={request.source_type}"):
# Validate request
if not request.dataset_ids:
logger.error("[map_columns][Coherence:Failed] No dataset IDs provided")
raise HTTPException(status_code=400, detail="At least one dataset ID must be provided")
# Validate source type
if request.source_type not in ['postgresql', 'xlsx']:
logger.error(f"[map_columns][Coherence:Failed] Invalid source type: {request.source_type}")
raise HTTPException(status_code=400, detail="Source type must be 'postgresql' or 'xlsx'")
# Validate environment exists
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == request.env_id), None)
if not env:
logger.error(f"[map_columns][Coherence:Failed] Environment not found: {request.env_id}")
raise HTTPException(status_code=404, detail="Environment not found")
try:
# Create mapping task
task_params = {
'env_id': request.env_id,
'datasets': request.dataset_ids,
'source_type': request.source_type,
'connection_id': request.connection_id,
'file_data': request.file_data
}
task_id = await task_manager.create_task(
plugin_id='dataset-mapper',
params=task_params
)
logger.info(f"[map_columns][Coherence:OK] Mapping task created: {task_id} for {len(request.dataset_ids)} datasets")
return TaskResponse(task_id=str(task_id))
except Exception as e:
logger.error(f"[map_columns][Coherence:Failed] Failed to create mapping task: {e}")
raise HTTPException(status_code=503, detail=f"Failed to create mapping task: {str(e)}")
# [/DEF:map_columns:Function]
# [DEF:GenerateDocsRequest:DataClass]
class GenerateDocsRequest(BaseModel):
env_id: str = Field(..., description="Environment ID")
dataset_ids: List[int] = Field(..., description="List of dataset IDs to generate docs for")
llm_provider: str = Field(..., description="LLM provider to use")
options: Optional[dict] = Field(None, description="Additional options for documentation generation")
# [/DEF:GenerateDocsRequest:DataClass]
# [DEF:generate_docs:Function]
# @PURPOSE: Trigger bulk documentation generation for datasets
# @PRE: User has permission plugin:llm_analysis:execute
# @PRE: env_id is a valid environment ID
# @PRE: dataset_ids is a non-empty list
# @POST: Returns task_id for tracking documentation generation progress
# @POST: Task is created and queued for execution
# @PARAM: request (GenerateDocsRequest) - Documentation generation request
# @RETURN: TaskResponse - Task ID for tracking
# @RELATION: DISPATCHES -> LLMAnalysisPlugin
# @RELATION: CALLS -> task_manager.create_task
@router.post("/api/datasets/generate-docs", response_model=TaskResponse)
async def generate_docs(
request: GenerateDocsRequest,
config_manager=Depends(get_config_manager),
task_manager=Depends(get_task_manager),
_ = Depends(has_permission("plugin:llm_analysis", "EXECUTE"))
):
with belief_scope("generate_docs", f"env={request.env_id}, count={len(request.dataset_ids)}, provider={request.llm_provider}"):
# Validate request
if not request.dataset_ids:
logger.error("[generate_docs][Coherence:Failed] No dataset IDs provided")
raise HTTPException(status_code=400, detail="At least one dataset ID must be provided")
# Validate environment exists
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == request.env_id), None)
if not env:
logger.error(f"[generate_docs][Coherence:Failed] Environment not found: {request.env_id}")
raise HTTPException(status_code=404, detail="Environment not found")
try:
# Create documentation generation task
task_params = {
'env_id': request.env_id,
'datasets': request.dataset_ids,
'llm_provider': request.llm_provider,
'options': request.options or {}
}
task_id = await task_manager.create_task(
plugin_id='llm_documentation',
params=task_params
)
logger.info(f"[generate_docs][Coherence:OK] Documentation generation task created: {task_id} for {len(request.dataset_ids)} datasets")
return TaskResponse(task_id=str(task_id))
except Exception as e:
logger.error(f"[generate_docs][Coherence:Failed] Failed to create documentation generation task: {e}")
raise HTTPException(status_code=503, detail=f"Failed to create documentation generation task: {str(e)}")
# [/DEF:generate_docs:Function]
# [DEF:get_dataset_detail:Function]
# @PURPOSE: Get detailed dataset information including columns and linked dashboards
# @PRE: env_id is a valid environment ID
# @PRE: dataset_id is a valid dataset ID
# @POST: Returns detailed dataset info with columns and linked dashboards
# @PARAM: env_id (str) - The environment ID
# @PARAM: dataset_id (int) - The dataset ID
# @RETURN: DatasetDetailResponse - Detailed dataset information
# @RELATION: CALLS -> SupersetClient.get_dataset_detail
@router.get("/api/datasets/{dataset_id}", response_model=DatasetDetailResponse)
async def get_dataset_detail(
env_id: str,
dataset_id: int,
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:migration", "READ"))
):
with belief_scope("get_dataset_detail", f"env_id={env_id}, dataset_id={dataset_id}"):
# Validate environment exists
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == env_id), None)
if not env:
logger.error(f"[get_dataset_detail][Coherence:Failed] Environment not found: {env_id}")
raise HTTPException(status_code=404, detail="Environment not found")
try:
# Fetch detailed dataset info using SupersetClient
client = SupersetClient(env)
dataset_detail = client.get_dataset_detail(dataset_id)
logger.info(f"[get_dataset_detail][Coherence:OK] Retrieved dataset {dataset_id} with {dataset_detail['column_count']} columns and {dataset_detail['linked_dashboard_count']} linked dashboards")
return DatasetDetailResponse(**dataset_detail)
except Exception as e:
logger.error(f"[get_dataset_detail][Coherence:Failed] Failed to fetch dataset detail: {e}")
raise HTTPException(status_code=503, detail=f"Failed to fetch dataset detail: {str(e)}")
# [/DEF:get_dataset_detail:Function]
# [/DEF:backend.src.api.routes.datasets:Module]