Files
ss-tools/backend/src/services/dataset_review/orchestrator.py
2026-04-24 17:10:02 +03:00

613 lines
32 KiB
Python

# [DEF:DatasetReviewOrchestrator:Module]
# @COMPLEXITY: 5
# @SEMANTICS: dataset_review, orchestration, session_lifecycle, intake, recovery
# @PURPOSE: Coordinate dataset review session startup and lifecycle-safe intake recovery for one authenticated user.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> [DatasetReviewSessionRepository]
# @RELATION: DEPENDS_ON -> [SemanticSourceResolver]
# @RELATION: DEPENDS_ON -> [SupersetContextExtractor]
# @RELATION: DEPENDS_ON -> [SupersetCompilationAdapter]
# @RELATION: DEPENDS_ON -> [TaskManager]
# @RELATION: DISPATCHES -> [OrchestratorHelpers:Module]
# @RELATION: DISPATCHES -> [OrchestratorCommands:Module]
# @PRE: session mutations must execute inside a persisted session boundary scoped to one authenticated user.
# @POST: state transitions are persisted atomically and emit observable progress for long-running steps.
# @SIDE_EFFECT: creates task records, updates session aggregates, triggers upstream Superset calls, persists audit artifacts.
# @DATA_CONTRACT: Input[SessionCommand] -> Output[DatasetReviewSession | CompiledPreview | DatasetRunContext]
# @INVARIANT: Launch is blocked unless a current session has no open blocking findings, all launch-sensitive mappings are approved, and a non-stale Superset-generated compiled preview matches the current input fingerprint.
# @RATIONALE: Original 1198-line monolith violated INV_7 (400-line module limit). Decomposed into commands and helpers sub-modules while preserving the orchestrator class as the single entry point.
# @REJECTED: Keeping all orchestration logic in one file because it exceeded the fractal limit by 3x.
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional, cast
from src.core.config_manager import ConfigManager
from src.core.logger import belief_scope, logger
from src.core.task_manager import TaskManager
from src.core.utils.superset_compilation_adapter import (
PreviewCompilationPayload,
SqlLabLaunchPayload,
SupersetCompilationAdapter,
)
from src.core.utils.superset_context_extractor import (
SupersetContextExtractor,
SupersetParsedContext,
)
from src.models.auth import User
from src.models.dataset_review import (
ApprovalState,
BusinessSummarySource,
CompiledPreview,
ConfidenceState,
DatasetProfile,
DatasetReviewSession,
DatasetRunContext,
ExecutionMapping,
FilterConfidenceState,
FilterRecoveryStatus,
FilterSource,
FindingArea,
FindingSeverity,
ImportedFilter,
LaunchStatus,
MappingMethod,
MappingStatus,
PreviewStatus,
RecommendedAction,
ReadinessState,
ResolutionState,
SessionPhase,
SessionStatus,
TemplateVariable,
ValidationFinding,
VariableKind,
)
from src.services.dataset_review.repositories.session_repository import (
DatasetReviewSessionRepository,
)
from src.services.dataset_review.semantic_resolver import SemanticSourceResolver
from src.services.dataset_review.event_logger import SessionEventPayload
from src.services.dataset_review.orchestrator_pkg._commands import (
StartSessionCommand,
StartSessionResult,
PreparePreviewCommand,
PreparePreviewResult,
LaunchDatasetCommand,
LaunchDatasetResult,
)
from src.services.dataset_review.orchestrator_pkg._helpers import (
parse_dataset_selection,
build_initial_profile,
build_partial_recovery_findings,
build_execution_snapshot,
build_launch_blockers,
get_latest_preview,
compute_preview_fingerprint,
extract_effective_filter_value,
)
logger = cast(Any, logger)
# [DEF:DatasetReviewOrchestrator:Class]
# @COMPLEXITY: 5
# @PURPOSE: Coordinate safe session startup while preserving cross-user isolation and explicit partial recovery.
# @RELATION: DEPENDS_ON -> [DatasetReviewSessionRepository]
# @RELATION: DEPENDS_ON -> [SupersetContextExtractor]
# @RELATION: DEPENDS_ON -> [TaskManager]
# @RELATION: DEPENDS_ON -> [ConfigManager]
# @RELATION: DEPENDS_ON -> [SemanticSourceResolver]
# @RELATION: CALLS -> [OrchestratorHelpers:Module]
# @PRE: constructor dependencies are valid and tied to the current request/task scope.
# @POST: orchestrator instance can execute session-scoped mutations for one authenticated user.
# @SIDE_EFFECT: downstream operations may persist session/profile/finding state and enqueue background tasks.
# @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult]
# @INVARIANT: session ownership is preserved on every mutation and recovery remains explicit when partial.
class DatasetReviewOrchestrator:
# [DEF:DatasetReviewOrchestrator_init:Function]
# @COMPLEXITY: 3
# @PURPOSE: Bind repository, config, and task dependencies required by the orchestration boundary.
# @PRE: repository/config_manager are valid collaborators for the current request scope.
# @POST: Instance holds collaborator references used by start/preview/launch orchestration methods.
def __init__(
self,
repository: DatasetReviewSessionRepository,
config_manager: ConfigManager,
task_manager: Optional[TaskManager] = None,
semantic_resolver: Optional[SemanticSourceResolver] = None,
) -> None:
self.repository = repository
self.config_manager = config_manager
self.task_manager = task_manager
self.semantic_resolver = semantic_resolver or SemanticSourceResolver()
# [/DEF:DatasetReviewOrchestrator_init:Function]
# [DEF:start_session:Function]
# @COMPLEXITY: 5
# @PURPOSE: Initialize a new session from a Superset link or dataset selection and trigger context recovery.
# @RELATION: CALLS -> [SupersetContextExtractor.parse_superset_link]
# @RELATION: CALLS -> [TaskManager.create_task]
# @PRE: source input is non-empty and environment is accessible.
# @POST: session exists in persisted storage with intake/recovery state and task linkage when async work is required.
# @SIDE_EFFECT: persists session and may enqueue recovery task.
# @DATA_CONTRACT: Input[StartSessionCommand] -> Output[StartSessionResult]
# @INVARIANT: no cross-user session leakage occurs; session and follow-up task remain owned by the authenticated user.
def start_session(self, command: StartSessionCommand) -> StartSessionResult:
with belief_scope("DatasetReviewOrchestrator.start_session"):
normalized_source_kind = str(command.source_kind or "").strip()
normalized_source_input = str(command.source_input or "").strip()
normalized_environment_id = str(command.environment_id or "").strip()
if not normalized_source_input:
logger.explore("Blocked dataset review session start due to empty source input")
raise ValueError("source_input must be non-empty")
if normalized_source_kind not in {"superset_link", "dataset_selection"}:
logger.explore("Blocked dataset review session start due to unsupported source kind", extra={"source_kind": normalized_source_kind})
raise ValueError("source_kind must be 'superset_link' or 'dataset_selection'")
environment = self.config_manager.get_environment(normalized_environment_id)
if environment is None:
logger.explore("Blocked dataset review session start because environment was not found", extra={"environment_id": normalized_environment_id})
raise ValueError("Environment not found")
logger.reason("Starting dataset review session", extra={"user_id": command.user.id, "environment_id": normalized_environment_id, "source_kind": normalized_source_kind})
parsed_context: Optional[SupersetParsedContext] = None
findings: List[ValidationFinding] = []
dataset_ref = normalized_source_input
dataset_id: Optional[int] = None
dashboard_id: Optional[int] = None
readiness_state = ReadinessState.IMPORTING
recommended_action = RecommendedAction.REVIEW_DOCUMENTATION
current_phase = SessionPhase.RECOVERY
if normalized_source_kind == "superset_link":
extractor = SupersetContextExtractor(environment)
parsed_context = extractor.parse_superset_link(normalized_source_input)
dataset_ref = parsed_context.dataset_ref
dataset_id = parsed_context.dataset_id
dashboard_id = parsed_context.dashboard_id
if parsed_context.partial_recovery:
readiness_state = ReadinessState.RECOVERY_REQUIRED
recommended_action = RecommendedAction.REVIEW_DOCUMENTATION
findings.extend(build_partial_recovery_findings(parsed_context))
else:
readiness_state = ReadinessState.REVIEW_READY
else:
dataset_ref, dataset_id = parse_dataset_selection(normalized_source_input)
readiness_state = ReadinessState.REVIEW_READY
current_phase = SessionPhase.REVIEW
session = DatasetReviewSession(
user_id=command.user.id,
environment_id=normalized_environment_id,
source_kind=normalized_source_kind,
source_input=normalized_source_input,
dataset_ref=dataset_ref,
dataset_id=dataset_id,
dashboard_id=dashboard_id,
readiness_state=readiness_state,
recommended_action=recommended_action,
status=SessionStatus.ACTIVE,
current_phase=current_phase,
)
persisted_session = cast(Any, self.repository.create_session(session))
recovered_filters: List[ImportedFilter] = []
template_variables: List[TemplateVariable] = []
execution_mappings: List[ExecutionMapping] = []
if normalized_source_kind == "superset_link" and parsed_context is not None:
recovered_filters, template_variables, execution_mappings, findings = (
self._build_recovery_bootstrap(
environment=environment,
session=persisted_session,
parsed_context=parsed_context,
findings=findings,
)
)
profile = build_initial_profile(
session_id=persisted_session.session_id,
parsed_context=parsed_context,
dataset_ref=dataset_ref,
)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=persisted_session.session_id,
actor_user_id=command.user.id,
event_type="session_started",
event_summary="Dataset review session shell created",
current_phase=persisted_session.current_phase.value,
readiness_state=persisted_session.readiness_state.value,
event_details={
"source_kind": persisted_session.source_kind,
"dataset_ref": persisted_session.dataset_ref,
"dataset_id": persisted_session.dataset_id,
"dashboard_id": persisted_session.dashboard_id,
"partial_recovery": bool(parsed_context and parsed_context.partial_recovery),
},
)
)
persisted_session = self.repository.save_profile_and_findings(
persisted_session.session_id,
command.user.id,
profile,
findings,
)
if recovered_filters or template_variables or execution_mappings:
persisted_session = self.repository.save_recovery_state(
persisted_session.session_id,
command.user.id,
recovered_filters,
template_variables,
execution_mappings,
)
active_task_id = self._enqueue_recovery_task(
command=command,
session=persisted_session,
parsed_context=parsed_context,
)
if active_task_id:
persisted_session.active_task_id = active_task_id
self.repository.bump_session_version(persisted_session)
self.repository.db.commit()
self.repository.db.refresh(persisted_session)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=persisted_session.session_id,
actor_user_id=command.user.id,
event_type="recovery_task_linked",
event_summary="Recovery task linked to dataset review session",
current_phase=persisted_session.current_phase.value,
readiness_state=persisted_session.readiness_state.value,
event_details={"task_id": active_task_id},
)
)
logger.reason("Linked recovery task to started dataset review session", extra={"session_id": persisted_session.session_id, "task_id": active_task_id})
logger.reflect("Dataset review session start completed", extra={"session_id": persisted_session.session_id, "dataset_ref": persisted_session.dataset_ref, "readiness_state": persisted_session.readiness_state.value, "active_task_id": persisted_session.active_task_id, "finding_count": len(findings)})
return StartSessionResult(
session=persisted_session,
parsed_context=parsed_context,
findings=findings,
)
# [/DEF:start_session:Function]
# [DEF:prepare_launch_preview:Function]
# @COMPLEXITY: 4
# @PURPOSE: Assemble effective execution inputs and trigger Superset-side preview compilation.
# @RELATION: CALLS -> [SupersetCompilationAdapter.compile_preview]
# @PRE: all required variables have candidate values or explicitly accepted defaults.
# @POST: returns preview artifact in pending, ready, failed, or stale state.
# @SIDE_EFFECT: persists preview attempt and upstream compilation diagnostics.
# @DATA_CONTRACT: Input[PreparePreviewCommand] -> Output[PreparePreviewResult]
def prepare_launch_preview(self, command: PreparePreviewCommand) -> PreparePreviewResult:
with belief_scope("DatasetReviewOrchestrator.prepare_launch_preview"):
session = self.repository.load_session_detail(command.session_id, command.user.id)
if session is None or session.user_id != command.user.id:
logger.explore("Preview preparation rejected because owned session was not found", extra={"session_id": command.session_id, "user_id": command.user.id})
raise ValueError("Session not found")
if command.expected_version is not None:
self.repository.require_session_version(session, command.expected_version)
if session.dataset_id is None:
raise ValueError("Preview requires a resolved dataset_id")
environment = self.config_manager.get_environment(session.environment_id)
if environment is None:
raise ValueError("Environment not found")
execution_snapshot = build_execution_snapshot(session)
preview_blockers = execution_snapshot["preview_blockers"]
if preview_blockers:
logger.explore("Preview preparation blocked by incomplete execution context", extra={"session_id": session.session_id, "blocked_reasons": preview_blockers})
raise ValueError("Preview blocked: " + "; ".join(preview_blockers))
adapter = SupersetCompilationAdapter(environment)
preview = adapter.compile_preview(
PreviewCompilationPayload(
session_id=session.session_id,
dataset_id=session.dataset_id,
preview_fingerprint=execution_snapshot["preview_fingerprint"],
template_params=execution_snapshot["template_params"],
effective_filters=execution_snapshot["effective_filters"],
)
)
persisted_preview = self.repository.save_preview(
session.session_id,
command.user.id,
preview,
expected_version=command.expected_version,
)
session.current_phase = SessionPhase.PREVIEW
session.last_activity_at = datetime.utcnow()
if persisted_preview.preview_status == PreviewStatus.READY:
launch_blockers = build_launch_blockers(session=session, execution_snapshot=execution_snapshot, preview=persisted_preview)
if launch_blockers:
session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY
session.recommended_action = RecommendedAction.APPROVE_MAPPING
else:
session.readiness_state = ReadinessState.RUN_READY
session.recommended_action = RecommendedAction.LAUNCH_DATASET
else:
session.readiness_state = ReadinessState.PARTIALLY_READY
session.recommended_action = RecommendedAction.GENERATE_SQL_PREVIEW
self.repository.db.commit()
self.repository.db.refresh(session)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=session.session_id,
actor_user_id=command.user.id,
event_type="preview_generated",
event_summary="Superset preview generation persisted",
current_phase=session.current_phase.value,
readiness_state=session.readiness_state.value,
event_details={"preview_id": persisted_preview.preview_id, "preview_status": persisted_preview.preview_status.value, "preview_fingerprint": persisted_preview.preview_fingerprint},
)
)
logger.reflect("Superset preview preparation completed", extra={"session_id": session.session_id, "preview_id": persisted_preview.preview_id, "preview_status": persisted_preview.preview_status.value})
return PreparePreviewResult(session=session, preview=persisted_preview, blocked_reasons=[])
# [/DEF:prepare_launch_preview:Function]
# [DEF:launch_dataset:Function]
# @COMPLEXITY: 5
# @PURPOSE: Start the approved dataset execution through SQL Lab and persist run context for audit/replay.
# @RELATION: CALLS -> [SupersetCompilationAdapter.create_sql_lab_session]
# @PRE: session is run-ready and compiled preview is current.
# @POST: returns persisted run context with SQL Lab session reference and launch outcome.
# @SIDE_EFFECT: creates SQL Lab execution session and audit snapshot.
# @DATA_CONTRACT: Input[LaunchDatasetCommand] -> Output[LaunchDatasetResult]
# @INVARIANT: launch remains blocked unless blocking findings are closed, approvals are satisfied, and the latest preview fingerprint matches current execution inputs.
def launch_dataset(self, command: LaunchDatasetCommand) -> LaunchDatasetResult:
with belief_scope("DatasetReviewOrchestrator.launch_dataset"):
session = self.repository.load_session_detail(command.session_id, command.user.id)
if session is None or session.user_id != command.user.id:
logger.explore("Launch rejected because owned session was not found", extra={"session_id": command.session_id, "user_id": command.user.id})
raise ValueError("Session not found")
if command.expected_version is not None:
self.repository.require_session_version(session, command.expected_version)
if session.dataset_id is None:
raise ValueError("Launch requires a resolved dataset_id")
environment = self.config_manager.get_environment(session.environment_id)
if environment is None:
raise ValueError("Environment not found")
execution_snapshot = build_execution_snapshot(session)
current_preview = get_latest_preview(session)
launch_blockers_list = build_launch_blockers(session=session, execution_snapshot=execution_snapshot, preview=current_preview)
if launch_blockers_list:
logger.explore("Launch gate blocked dataset execution", extra={"session_id": session.session_id, "blocked_reasons": launch_blockers_list})
raise ValueError("Launch blocked: " + "; ".join(launch_blockers_list))
adapter = SupersetCompilationAdapter(environment)
try:
sql_lab_session_ref = adapter.create_sql_lab_session(
SqlLabLaunchPayload(
session_id=session.session_id,
dataset_id=session.dataset_id,
preview_id=current_preview.preview_id,
compiled_sql=str(current_preview.compiled_sql or ""),
template_params=execution_snapshot["template_params"],
)
)
launch_status = LaunchStatus.STARTED
launch_error = None
except Exception as exc:
logger.explore("SQL Lab launch failed after passing gates", extra={"session_id": session.session_id, "error": str(exc)})
sql_lab_session_ref = "unavailable"
launch_status = LaunchStatus.FAILED
launch_error = str(exc)
run_context = DatasetRunContext(
session_id=session.session_id,
dataset_ref=session.dataset_ref,
environment_id=session.environment_id,
preview_id=current_preview.preview_id,
sql_lab_session_ref=sql_lab_session_ref,
effective_filters=execution_snapshot["effective_filters"],
template_params=execution_snapshot["template_params"],
approved_mapping_ids=execution_snapshot["approved_mapping_ids"],
semantic_decision_refs=execution_snapshot["semantic_decision_refs"],
open_warning_refs=execution_snapshot["open_warning_refs"],
launch_status=launch_status,
launch_error=launch_error,
)
persisted_run_context = self.repository.save_run_context(
session.session_id,
command.user.id,
run_context,
expected_version=command.expected_version,
)
session.current_phase = SessionPhase.LAUNCH
session.last_activity_at = datetime.utcnow()
if launch_status == LaunchStatus.FAILED:
session.readiness_state = ReadinessState.COMPILED_PREVIEW_READY
session.recommended_action = RecommendedAction.LAUNCH_DATASET
else:
session.readiness_state = ReadinessState.RUN_IN_PROGRESS
session.recommended_action = RecommendedAction.EXPORT_OUTPUTS
self.repository.db.commit()
self.repository.db.refresh(session)
self.repository.event_logger.log_event(
SessionEventPayload(
session_id=session.session_id,
actor_user_id=command.user.id,
event_type="dataset_launch_requested",
event_summary="Dataset launch handoff persisted",
current_phase=session.current_phase.value,
readiness_state=session.readiness_state.value,
event_details={"run_context_id": persisted_run_context.run_context_id, "launch_status": persisted_run_context.launch_status.value, "preview_id": persisted_run_context.preview_id, "sql_lab_session_ref": persisted_run_context.sql_lab_session_ref},
)
)
logger.reflect("Dataset launch orchestration completed with audited run context", extra={"session_id": session.session_id, "run_context_id": persisted_run_context.run_context_id, "launch_status": persisted_run_context.launch_status.value})
return LaunchDatasetResult(session=session, run_context=persisted_run_context, blocked_reasons=[])
# [/DEF:launch_dataset:Function]
# [DEF:_build_recovery_bootstrap:Function]
# @COMPLEXITY: 4
# @PURPOSE: Recover and materialize initial imported filters, template variables, and draft execution mappings after session creation.
# @PRE: session belongs to the just-created review aggregate and parsed_context was produced for the same environment scope.
# @POST: Returns bootstrap imported filters, template variables, execution mappings, and updated findings without persisting them directly.
# @SIDE_EFFECT: Performs Superset reads through the extractor and may append warning findings for incomplete recovery.
def _build_recovery_bootstrap(
self,
environment,
session: DatasetReviewSession,
parsed_context: SupersetParsedContext,
findings: List[ValidationFinding],
) -> tuple[List[ImportedFilter], List[TemplateVariable], List[ExecutionMapping], List[ValidationFinding]]:
session_record = cast(Any, session)
extractor = SupersetContextExtractor(environment)
imported_filters_payload = extractor.recover_imported_filters(parsed_context)
if imported_filters_payload is None:
imported_filters_payload = []
imported_filters = [
ImportedFilter(
session_id=session_record.session_id,
filter_name=str(item.get("filter_name") or f"imported_filter_{index}"),
display_name=item.get("display_name"),
raw_value=item.get("raw_value"),
raw_value_masked=bool(item.get("raw_value_masked", False)),
normalized_value=item.get("normalized_value"),
source=FilterSource(str(item.get("source") or FilterSource.SUPERSET_URL.value)),
confidence_state=FilterConfidenceState(str(item.get("confidence_state") or FilterConfidenceState.UNRESOLVED.value)),
requires_confirmation=bool(item.get("requires_confirmation", False)),
recovery_status=FilterRecoveryStatus(str(item.get("recovery_status") or FilterRecoveryStatus.PARTIAL.value)),
notes=item.get("notes"),
)
for index, item in enumerate(imported_filters_payload)
]
template_variables: List[TemplateVariable] = []
execution_mappings: List[ExecutionMapping] = []
if session.dataset_id is not None:
try:
dataset_payload = parsed_context.dataset_payload
if not isinstance(dataset_payload, dict):
dataset_payload = extractor.client.get_dataset_detail(session_record.dataset_id)
discovered_variables = extractor.discover_template_variables(dataset_payload)
template_variables = [
TemplateVariable(
session_id=session_record.session_id,
variable_name=str(item.get("variable_name") or f"variable_{index}"),
expression_source=str(item.get("expression_source") or ""),
variable_kind=VariableKind(str(item.get("variable_kind") or VariableKind.UNKNOWN.value)),
is_required=bool(item.get("is_required", True)),
default_value=item.get("default_value"),
mapping_status=MappingStatus(str(item.get("mapping_status") or MappingStatus.UNMAPPED.value)),
)
for index, item in enumerate(discovered_variables)
]
except Exception as exc:
if "dataset_template_variable_discovery_failed" not in parsed_context.unresolved_references:
parsed_context.unresolved_references.append("dataset_template_variable_discovery_failed")
if not any(f.caused_by_ref == "dataset_template_variable_discovery_failed" for f in findings):
findings.append(
ValidationFinding(
area=FindingArea.TEMPLATE_MAPPING,
severity=FindingSeverity.WARNING,
code="TEMPLATE_VARIABLE_DISCOVERY_FAILED",
title="Template variables could not be discovered",
message="Session remains usable, but dataset template variables still need review.",
resolution_state=ResolutionState.OPEN,
caused_by_ref="dataset_template_variable_discovery_failed",
)
)
logger.explore("Template variable discovery failed during session bootstrap", extra={"session_id": session_record.session_id, "dataset_id": session_record.dataset_id, "error": str(exc)})
filter_lookup = {str(f.filter_name or "").strip().lower(): f for f in imported_filters if str(f.filter_name or "").strip()}
for tv in template_variables:
matched_filter = filter_lookup.get(str(tv.variable_name or "").strip().lower())
if matched_filter is None:
continue
requires_explicit_approval = bool(matched_filter.requires_confirmation or matched_filter.recovery_status != FilterRecoveryStatus.RECOVERED)
execution_mappings.append(
ExecutionMapping(
session_id=session_record.session_id,
filter_id=matched_filter.filter_id,
variable_id=tv.variable_id,
mapping_method=MappingMethod.DIRECT_MATCH,
raw_input_value=matched_filter.raw_value,
effective_value=matched_filter.normalized_value if matched_filter.normalized_value is not None else matched_filter.raw_value,
transformation_note="Bootstrapped from Superset recovery context",
warning_level=None,
requires_explicit_approval=requires_explicit_approval,
approval_state=ApprovalState.PENDING if requires_explicit_approval else ApprovalState.NOT_REQUIRED,
approved_by_user_id=None,
approved_at=None,
)
)
return imported_filters, template_variables, execution_mappings, findings
# [/DEF:_build_recovery_bootstrap:Function]
# [DEF:_enqueue_recovery_task:Function]
# @COMPLEXITY: 3
# @PURPOSE: Link session start to observable async recovery when task infrastructure is available.
# @PRE: session is already persisted.
# @POST: returns task identifier when a task could be enqueued, otherwise None.
# @SIDE_EFFECT: may create one background task for progressive recovery.
def _enqueue_recovery_task(
self,
command: StartSessionCommand,
session: DatasetReviewSession,
parsed_context: Optional[SupersetParsedContext],
) -> Optional[str]:
session_record = cast(Any, session)
if self.task_manager is None:
logger.reason("Dataset review session started without task manager; continuing synchronously", extra={"session_id": session_record.session_id})
return None
task_params: Dict[str, Any] = {
"session_id": session_record.session_id,
"user_id": command.user.id,
"environment_id": session_record.environment_id,
"source_kind": session_record.source_kind,
"source_input": session_record.source_input,
"dataset_ref": session_record.dataset_ref,
"dataset_id": session_record.dataset_id,
"dashboard_id": session_record.dashboard_id,
"partial_recovery": bool(parsed_context and parsed_context.partial_recovery),
}
create_task = getattr(self.task_manager, "create_task", None)
if create_task is None:
logger.explore("Task manager has no create_task method; skipping recovery enqueue")
return None
try:
task_object = create_task(plugin_id="dataset-review-recovery", params=task_params)
except TypeError:
logger.explore("Recovery task enqueue skipped because task manager create_task contract is incompatible", extra={"session_id": session_record.session_id})
return None
task_id = getattr(task_object, "id", None)
return str(task_id) if task_id else None
# [/DEF:_enqueue_recovery_task:Function]
# [/DEF:DatasetReviewOrchestrator:Class]
# [/DEF:DatasetReviewOrchestrator:Module]