Files
ss-tools/specs/028-llm-datasource-supeset/data-model.md
2026-05-08 18:01:49 +03:00

16 KiB

Data Model: LLM Table Translation Service

Feature Branch: 028-llm-datasource-supeset
Date: 2026-05-08 (updated post-review)

1. SQLAlchemy ORM Entities (dialect-aware — PostgreSQL/Greenplum, ClickHouse supported)

TranslationJob

# [DEF:TranslationJob:Class]
# @COMPLEXITY 2
# @PURPOSE Persisted configuration for a translation job.
class TranslationJob(Base):
    __tablename__ = "translation_jobs"

    id              = Column(String, primary_key=True, default=generate_uuid)
    name            = Column(String, nullable=False)
    owner_id        = Column(String, ForeignKey("users.id"), nullable=False)
    # Source configuration
    datasource_id   = Column(String, nullable=False)
    database_dialect = Column(String, nullable=False)        # Detected from Superset: postgresql, clickhouse, greenplum
    source_table    = Column(String, nullable=True)          # Optional metadata; datasource may be virtual
    translation_col = Column(String, nullable=False)
    context_cols    = Column(JSON, default=list)
    source_key_cols = Column(JSON, nullable=False)           # [col_name, ...]
    target_key_cols = Column(JSON, nullable=False)           # [col_name, ...] — mapped to source_key_cols
    # Target configuration
    target_table    = Column(String, nullable=False)
    target_col      = Column(String, nullable=False)
    upsert_strategy = Column(String, default="insert")      # insert | skip_existing | overwrite
    # LLM configuration
    provider_id     = Column(String, ForeignKey("llm_providers.id"), nullable=True)
    target_language = Column(String, nullable=False)         # BCP-47 tag, e.g., "ru", "en"
    source_language = Column(String, nullable=True)          # Optional BCP-47 tag
    prompt_template = Column(Text, nullable=True)
    batch_size      = Column(Integer, default=50)
    # Timestamps
    created_at      = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
    updated_at      = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))

    owner           = relationship("User")
    dictionaries    = relationship("TranslationJobDictionary", back_populates="job", cascade="all, delete-orphan")
    schedule        = relationship("TranslationSchedule", back_populates="job", uselist=False, cascade="all, delete-orphan")
    runs            = relationship("TranslationRun", back_populates="job")
    preview_sessions = relationship("TranslationPreviewSession", back_populates="job")
# [/DEF:TranslationJob:Class]

TranslationJobDictionary (M2M join with priority)

class TranslationJobDictionary(Base):
    __tablename__ = "translation_job_dictionaries"
    id            = Column(String, primary_key=True, default=generate_uuid)
    job_id        = Column(String, ForeignKey("translation_jobs.id"), nullable=False)
    dictionary_id = Column(String, ForeignKey("terminology_dictionaries.id"), nullable=False)
    priority      = Column(Integer, default=0)              # Lower = higher priority

    job           = relationship("TranslationJob", back_populates="dictionaries")
    dictionary    = relationship("TerminologyDictionary")

TerminologyDictionary

class TerminologyDictionary(Base):
    __tablename__ = "terminology_dictionaries"
    id              = Column(String, primary_key=True, default=generate_uuid)
    name            = Column(String, nullable=False)
    target_language = Column(String, nullable=False)         # BCP-47 tag
    source_language = Column(String, nullable=True)
    owner_id        = Column(String, ForeignKey("users.id"), nullable=False)
    created_at      = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
    updated_at      = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))

    owner           = relationship("User")
    entries         = relationship("DictionaryEntry", back_populates="dictionary", cascade="all, delete-orphan")

DictionaryEntry

class DictionaryEntry(Base):
    __tablename__ = "dictionary_entries"
    id              = Column(String, primary_key=True, default=generate_uuid)
    dictionary_id   = Column(String, ForeignKey("terminology_dictionaries.id"), nullable=False)
    source_term     = Column(String, nullable=False)
    source_term_normalized = Column(String, nullable=False)  # Lowercase, NFC normalized
    target_term     = Column(String, nullable=False)
    origin_run_id   = Column(String, ForeignKey("translation_runs.id"), nullable=True)
    origin_row_key  = Column(JSON, nullable=True)
    origin_user_id  = Column(String, ForeignKey("users.id"), nullable=True)
    created_at      = Column(DateTime(timezone=True), default=datetime.now(timezone.utc))

    dictionary      = relationship("TerminologyDictionary", back_populates="entries")
    __table_args__ = (
        UniqueConstraint("dictionary_id", "source_term_normalized", name="uq_dict_source_term_norm"),
    )

TranslationSchedule

class TranslationSchedule(Base):
    __tablename__ = "translation_schedules"
    id              = Column(String, primary_key=True, default=generate_uuid)
    job_id          = Column(String, ForeignKey("translation_jobs.id"), unique=True, nullable=False)
    schedule_type   = Column(String, nullable=False)          # cron | interval | once
    cron_expression = Column(String, nullable=True)
    interval_seconds = Column(Integer, nullable=True)
    run_at          = Column(DateTime(timezone=True), nullable=True)
    timezone        = Column(String, default="UTC")          # e.g., "Europe/Moscow"
    is_enabled      = Column(Boolean, default=True)
    concurrency     = Column(String, default="skip")         # skip | queue
    next_run_at     = Column(DateTime(timezone=True), nullable=True)
    created_at      = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
    updated_at      = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))

    job             = relationship("TranslationJob", back_populates="schedule")

TranslationRun

class TranslationRun(Base):
    __tablename__ = "translation_runs"
    id              = Column(String, primary_key=True, default=generate_uuid)
    job_id          = Column(String, ForeignKey("translation_jobs.id"), nullable=False)
    trigger_type    = Column(String, nullable=False)          # manual | scheduled
    translation_status = Column(String, default="pending")   # pending|running|completed|partial|failed|cancelled|skipped
    insert_status   = Column(String, default="not_started")  # not_started|submitted|running|succeeded|failed|skipped
    # Statistics
    total_rows      = Column(Integer, default=0)
    translated_rows = Column(Integer, default=0)
    failed_rows     = Column(Integer, default=0)
    skipped_rows    = Column(Integer, default=0)
    token_count     = Column(Integer, default=0)
    estimated_cost  = Column(Float, default=0.0)
    # Hashes for idempotency and audit
    config_hash     = Column(String, nullable=True)
    dict_snapshot_hash = Column(String, nullable=True)
    # Snapshots
    config_snapshot = Column(JSON, nullable=False)
    dict_snapshot   = Column(JSON, nullable=True)
    prompt_used     = Column(Text, nullable=True)
    # SQL output
    insert_sql      = Column(Text, nullable=True)
    sql_hash        = Column(String, nullable=True)
    # Superset execution reference
    superset_query_id   = Column(String, nullable=True)
    superset_database_id = Column(String, nullable=True)
    insert_error_type   = Column(String, nullable=True)
    insert_error_message = Column(Text, nullable=True)
    rows_affected       = Column(Integer, nullable=True)
    # Timestamps
    started_at          = Column(DateTime(timezone=True), nullable=True)
    completed_at        = Column(DateTime(timezone=True), nullable=True)
    insert_started_at   = Column(DateTime(timezone=True), nullable=True)
    insert_completed_at = Column(DateTime(timezone=True), nullable=True)
    created_at          = Column(DateTime(timezone=True), default=datetime.now(timezone.utc))

    job             = relationship("TranslationJob", back_populates="runs")
    records         = relationship("TranslationRecord", back_populates="run", cascade="all, delete-orphan")
    batches         = relationship("TranslationBatch", back_populates="run", cascade="all, delete-orphan")
    events          = relationship("TranslationEvent", back_populates="run")

TranslationBatch

class TranslationBatch(Base):
    __tablename__ = "translation_batches"
    id              = Column(String, primary_key=True, default=generate_uuid)
    run_id          = Column(String, ForeignKey("translation_runs.id"), nullable=False)
    batch_index     = Column(Integer, nullable=False)
    status          = Column(String, default="pending")       # pending|running|completed|failed
    row_count       = Column(Integer, default=0)
    translated_count = Column(Integer, default=0)
    failed_count    = Column(Integer, default=0)
    skipped_count   = Column(Integer, default=0)
    token_count     = Column(Integer, default=0)
    estimated_cost  = Column(Float, default=0.0)
    latency_ms      = Column(Integer, nullable=True)
    error_type      = Column(String, nullable=True)
    error_message   = Column(Text, nullable=True)
    started_at      = Column(DateTime(timezone=True), nullable=True)
    completed_at    = Column(DateTime(timezone=True), nullable=True)

    run             = relationship("TranslationRun", back_populates="batches")
    records         = relationship("TranslationRecord", back_populates="batch")
    __table_args__ = (
        Index("idx_batch_run_idx", "run_id", "batch_index"),
    )

TranslationRecord

class TranslationRecord(Base):
    __tablename__ = "translation_records"
    id              = Column(String, primary_key=True, default=generate_uuid)
    run_id          = Column(String, ForeignKey("translation_runs.id"), nullable=False)
    batch_id        = Column(String, ForeignKey("translation_batches.id"), nullable=True)
    # Source data
    source_text     = Column(Text, nullable=True)
    context_data    = Column(JSON, nullable=True)
    key_values      = Column(JSON, nullable=False)
    key_hash        = Column(String, nullable=False)          # hash(canonical_json(key_values))
    # Translation result
    llm_translation = Column(Text, nullable=True)
    user_edit       = Column(Text, nullable=True)
    final_value     = Column(Text, nullable=True)
    status          = Column(String, default="pending")       # pending|translated|approved|edited|rejected|failed|skipped
    error_message   = Column(Text, nullable=True)

    run             = relationship("TranslationRun", back_populates="records")
    batch           = relationship("TranslationBatch", back_populates="records")
    __table_args__ = (
        Index("idx_record_key_hash", "key_hash"),
        Index("idx_record_run_key", "run_id", "key_hash"),
    )

TranslationPreviewSession

class TranslationPreviewSession(Base):
    __tablename__ = "translation_preview_sessions"
    id                  = Column(String, primary_key=True, default=generate_uuid)
    job_id              = Column(String, ForeignKey("translation_jobs.id"), nullable=False)
    user_id             = Column(String, ForeignKey("users.id"), nullable=False)
    config_hash         = Column(String, nullable=False)
    dict_snapshot_hash  = Column(String, nullable=True)
    sample_size         = Column(Integer, default=10)
    status              = Column(String, default="pending")   # pending|accepted|rejected
    created_at          = Column(DateTime(timezone=True), default=datetime.now(timezone.utc))
    accepted_at         = Column(DateTime(timezone=True), nullable=True)
    expires_at          = Column(DateTime(timezone=True), nullable=True)

    job             = relationship("TranslationJob", back_populates="preview_sessions")
    rows            = relationship("TranslationPreviewRecord", back_populates="session", cascade="all, delete-orphan")

TranslationPreviewRecord

class TranslationPreviewRecord(Base):
    __tablename__ = "translation_preview_records"
    id              = Column(String, primary_key=True, default=generate_uuid)
    session_id      = Column(String, ForeignKey("translation_preview_sessions.id"), nullable=False)
    source_text     = Column(Text, nullable=True)
    context_data    = Column(JSON, nullable=True)
    key_values      = Column(JSON, nullable=False)
    key_hash        = Column(String, nullable=False)
    llm_translation = Column(Text, nullable=True)
    user_edit       = Column(Text, nullable=True)
    final_value     = Column(Text, nullable=True)
    status          = Column(String, default="pending")       # pending|approved|edited|rejected

    session         = relationship("TranslationPreviewSession", back_populates="rows")

TranslationEvent

class TranslationEvent(Base):
    __tablename__ = "translation_events"
    id              = Column(String, primary_key=True, default=generate_uuid)
    run_id          = Column(String, ForeignKey("translation_runs.id"), nullable=True)  # NULL for pre-run events
    job_id          = Column(String, ForeignKey("translation_jobs.id"), nullable=False)
    event_type      = Column(String, nullable=False)
    # schedule_triggered|schedule_skipped|schedule_failed|
    # run_started|batch_started|batch_completed|batch_failed|
    # run_succeeded|run_partial|run_failed|run_cancelled|run_skipped|run_noop|
    # insert_submitted|insert_succeeded|insert_failed
    timestamp       = Column(DateTime(timezone=True), default=datetime.now(timezone.utc), nullable=False)
    payload         = Column(JSON, default=dict)

    run             = relationship("TranslationRun", back_populates="events")
    __table_args__ = (
        Index("idx_event_job_ts", "job_id", "timestamp"),
        Index("idx_event_type", "event_type"),
    )

MetricSnapshot

class MetricSnapshot(Base):
    __tablename__ = "translation_metric_snapshots"
    id              = Column(String, primary_key=True, default=generate_uuid)
    job_id          = Column(String, ForeignKey("translation_jobs.id"), nullable=False)
    snapshot_date   = Column(Date, nullable=False)
    cumulative_tokens = Column(Integer, default=0)
    cumulative_cost   = Column(Float, default=0.0)
    covers_events_before = Column(DateTime(timezone=True), nullable=False)  # Cutoff for event coverage
    total_runs        = Column(Integer, default=0)
    success_runs      = Column(Integer, default=0)
    failed_runs       = Column(Integer, default=0)
    partial_runs      = Column(Integer, default=0)
    avg_batch_latency_ms = Column(Integer, nullable=True)
    created_at      = Column(DateTime(timezone=True), default=datetime.now(timezone.utc))

    __table_args__ = (
        UniqueConstraint("job_id", "snapshot_date", name="uq_metric_snapshot_date"),
    )

2. Pydantic Schemas (API DTOs)

Key changes from original:

  • TranslateJobCreate/Update: added source_key_cols, target_key_cols (explicit mapping), source_language
  • TermCorrectionSubmit: added source_term, incorrect_target_term, corrected_target_term
  • ScheduleConfig: added timezone
  • TranslationRunResponse: split into translation_status + insert_status, added Superset execution fields
  • Added: TranslationBatchResponse, PreviewSessionResponse, MetricsSnapshotResponse

3. Entity Relationship Summary

TranslationJob (1) ──< (N) TranslationJobDictionary >── (1) TerminologyDictionary
TranslationJob (1) ──< (N) TranslationRun
TranslationJob (1) ──< (N) TranslationPreviewSession
TranslationJob (1) ──── (0..1) TranslationSchedule
TranslationRun (1) ──< (N) TranslationBatch
TranslationRun (1) ──< (N) TranslationRecord
TranslationRun (1) ──< (N) TranslationEvent (nullable run_id for pre-run events)
TranslationBatch (1) ──< (N) TranslationRecord
TranslationPreviewSession (1) ──< (N) TranslationPreviewRecord
TerminologyDictionary (1) ──< (N) DictionaryEntry

All UUID PKs, timezone-aware UTC timestamps, JSON columns for dynamic data, hash columns for efficient comparison.