Move dataset review clarification into the assistant workspace and rework the review page into a chat-centric layout with execution rails. Add session-scoped assistant actions for mappings, semantic fields, and SQL preview generation. Introduce optimistic locking for dataset review mutations, propagate session versions through API responses, and mask imported filter values before assistant exposure. Refresh tests, i18n, and spec artifacts to match the new workflow. BREAKING CHANGE: dataset review mutation endpoints now require the X-Session-Version header, and clarification is no longer handled through ClarificationDialog-based flows
1516 lines
65 KiB
Python
1516 lines
65 KiB
Python
# [DEF:generate_semantic_map:Module]
|
||
# @PURPOSE: Scans the codebase to generate a Semantic Map, Module Map, and Compliance Report based on the System Standard.
|
||
# @PRE: Valid directory containing code to scan.
|
||
# @POST: Files map.json, .ai/PROJECT_MAP.md, .ai/MODULE_MAP.md, and compliance reports generated.
|
||
# @TIER: STANDARD
|
||
# @SEMANTICS: semantic_analysis, parser, map_generator, compliance_checker, tier_validation, svelte_props, data_flow, module_map
|
||
# @LAYER: DevOps/Tooling
|
||
# @INVARIANT: All DEF anchors must have matching closing anchors; TIER determines validation strictness.
|
||
# @RELATION: READS -> FileSystem
|
||
# @RELATION: PRODUCES -> semantics/semantic_map.json
|
||
# @RELATION: PRODUCES -> .ai/PROJECT_MAP.md
|
||
# @RELATION: PRODUCES -> .ai/MODULE_MAP.md
|
||
# @RELATION: PRODUCES -> semantics/reports/semantic_report_*.md
|
||
|
||
# [SECTION: IMPORTS]
|
||
import os
|
||
import re
|
||
import json
|
||
import datetime
|
||
import fnmatch
|
||
import argparse
|
||
from enum import Enum, IntEnum
|
||
from dataclasses import dataclass, field
|
||
from typing import Dict, List, Optional, Any, Pattern, Tuple, Set
|
||
|
||
# Mock belief_scope for the script itself to avoid import issues
|
||
class belief_scope:
|
||
# [DEF:__init__:Function]
|
||
# @TIER: TRIVIAL
|
||
# @PURPOSE: Mock init for self-containment.
|
||
# @PRE: name is a string.
|
||
# @POST: Instance initialized.
|
||
def __init__(self, name):
|
||
self.name = name
|
||
# [/DEF:__init__:Function]
|
||
|
||
# [DEF:__enter__:Function]
|
||
# @TIER: TRIVIAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Mock enter.
|
||
# @PRE: Instance initialized.
|
||
# @POST: Returns self.
|
||
def __enter__(self):
|
||
return self
|
||
# [/DEF:__enter__:Function]
|
||
|
||
# [DEF:__exit__:Function]
|
||
# @TIER: TRIVIAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Mock exit.
|
||
# @PRE: Context entered.
|
||
# @POST: Context exited.
|
||
def __exit__(self, *args):
|
||
pass
|
||
# [/DEF:__exit__:Function]
|
||
# [/SECTION]
|
||
|
||
# [SECTION: CONFIGURATION]
|
||
|
||
|
||
class Tier(Enum):
|
||
# [DEF:Tier:Class]
|
||
# @TIER: TRIVIAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Legacy tier buckets retained for backward-compatible reporting.
|
||
CRITICAL = "CRITICAL"
|
||
STANDARD = "STANDARD"
|
||
TRIVIAL = "TRIVIAL"
|
||
# [/DEF:Tier:Class]
|
||
|
||
|
||
class Complexity(IntEnum):
|
||
# [DEF:Complexity:Class]
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Adaptive semantic complexity scale used for validation strictness.
|
||
ONE = 1
|
||
TWO = 2
|
||
THREE = 3
|
||
FOUR = 4
|
||
FIVE = 5
|
||
# [/DEF:Complexity:Class]
|
||
|
||
|
||
class Severity(Enum):
|
||
# [DEF:Severity:Class]
|
||
# @TIER: TRIVIAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Severity levels for compliance issues.
|
||
ERROR = "ERROR"
|
||
WARNING = "WARNING"
|
||
INFO = "INFO"
|
||
# [/DEF:Severity:Class]
|
||
|
||
|
||
PROJECT_ROOT = "."
|
||
IGNORE_DIRS = {
|
||
".git", "__pycache__", "node_modules", "venv", ".pytest_cache",
|
||
".kilocode", "backups", "logs", "semantics", "specs", ".venv"
|
||
}
|
||
IGNORE_FILES = {
|
||
"package-lock.json", "poetry.lock", "yarn.lock"
|
||
}
|
||
IGNORE_PATH_PREFIXES = {
|
||
".ai/shots/"
|
||
}
|
||
IGNORE_EXACT_PATHS = {
|
||
".ai/shots"
|
||
}
|
||
OUTPUT_JSON = "semantics/semantic_map.json"
|
||
OUTPUT_COMPRESSED_MD = ".ai/structure/PROJECT_MAP.md"
|
||
OUTPUT_MODULE_MAP_MD = ".ai/structure/MODULE_MAP.md"
|
||
REPORTS_DIR = "semantics/reports"
|
||
|
||
# Complexity-based mandatory tags aligned with .ai/standards/semantics.md
|
||
LEGACY_TIER_TO_COMPLEXITY = {
|
||
"TRIVIAL": Complexity.ONE,
|
||
"STANDARD": Complexity.THREE,
|
||
"CRITICAL": Complexity.FIVE,
|
||
}
|
||
|
||
COMPLEXITY_MANDATORY_TAGS = {
|
||
Complexity.ONE: {
|
||
"Module": [],
|
||
"Component": [],
|
||
"Function": [],
|
||
"Class": [],
|
||
"Store": [],
|
||
"Block": []
|
||
},
|
||
Complexity.TWO: {
|
||
"Module": ["PURPOSE"],
|
||
"Component": ["PURPOSE"],
|
||
"Function": ["PURPOSE"],
|
||
"Class": ["PURPOSE"],
|
||
"Store": ["PURPOSE"],
|
||
"Block": ["PURPOSE"]
|
||
},
|
||
Complexity.THREE: {
|
||
"Module": ["PURPOSE", "LAYER", "SEMANTICS", "RELATION"],
|
||
"Component": ["PURPOSE", "RELATION", "UX_STATE"],
|
||
"Function": ["PURPOSE", "RELATION"],
|
||
"Class": ["PURPOSE", "RELATION"],
|
||
"Store": ["PURPOSE", "RELATION"],
|
||
"Block": ["PURPOSE"]
|
||
},
|
||
Complexity.FOUR: {
|
||
"Module": ["PURPOSE", "LAYER", "SEMANTICS", "RELATION", "PRE", "POST", "SIDE_EFFECT"],
|
||
"Component": ["PURPOSE", "RELATION", "UX_STATE", "UX_REACTIVITY", "SIDE_EFFECT"],
|
||
"Function": ["PURPOSE", "RELATION", "PRE", "POST", "SIDE_EFFECT"],
|
||
"Class": ["PURPOSE", "RELATION", "PRE", "POST", "SIDE_EFFECT"],
|
||
"Store": ["PURPOSE", "RELATION", "PRE", "POST", "SIDE_EFFECT"],
|
||
"Block": ["PURPOSE", "SIDE_EFFECT"]
|
||
},
|
||
Complexity.FIVE: {
|
||
"Module": ["PURPOSE", "LAYER", "SEMANTICS", "RELATION", "INVARIANT", "PRE", "POST", "SIDE_EFFECT", "DATA_CONTRACT"],
|
||
"Component": ["PURPOSE", "RELATION", "INVARIANT", "UX_STATE", "UX_FEEDBACK", "UX_RECOVERY", "UX_REACTIVITY", "SIDE_EFFECT", "DATA_CONTRACT"],
|
||
"Function": ["PURPOSE", "RELATION", "PRE", "POST", "SIDE_EFFECT", "DATA_CONTRACT"],
|
||
"Class": ["PURPOSE", "RELATION", "INVARIANT", "PRE", "POST", "SIDE_EFFECT", "DATA_CONTRACT"],
|
||
"Store": ["PURPOSE", "RELATION", "INVARIANT", "PRE", "POST", "SIDE_EFFECT", "DATA_CONTRACT"],
|
||
"Block": ["PURPOSE", "SIDE_EFFECT"]
|
||
}
|
||
}
|
||
|
||
ALLOWED_RELATION_PREDICATES = {
|
||
"DEPENDS_ON", "CALLS", "INHERITS", "IMPLEMENTS", "DISPATCHES", "BINDS_TO"
|
||
}
|
||
|
||
# Complexity-based belief state requirements
|
||
COMPLEXITY_BELIEF_REQUIRED = {
|
||
Complexity.ONE: False,
|
||
Complexity.TWO: False,
|
||
Complexity.THREE: False,
|
||
Complexity.FOUR: True,
|
||
Complexity.FIVE: True,
|
||
}
|
||
|
||
# [/SECTION]
|
||
|
||
# [DEF:ComplianceIssue:Class]
|
||
# @TIER: TRIVIAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Represents a single compliance issue with severity.
|
||
@dataclass
|
||
class ComplianceIssue:
|
||
message: str
|
||
severity: Severity
|
||
line_number: Optional[int] = None
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
"message": self.message,
|
||
"severity": self.severity.value,
|
||
"line_number": self.line_number
|
||
}
|
||
# [/DEF:ComplianceIssue:Class]
|
||
|
||
|
||
# [DEF:SemanticEntity:Class]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Represents a code entity (Module, Function, Component) found during parsing.
|
||
# @INVARIANT: start_line is always set; end_line is set upon closure; complexity defaults to 1 unless explicitly raised.
|
||
class SemanticEntity:
|
||
# [DEF:__init__:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Initializes a new SemanticEntity instance.
|
||
# @PRE: name, type_, start_line, file_path are provided.
|
||
# @POST: Instance is initialized with default values.
|
||
def __init__(self, name: str, type_: str, start_line: int, file_path: str):
|
||
with belief_scope("__init__"):
|
||
self.name = name
|
||
self.type = type_
|
||
self.start_line = start_line
|
||
self.end_line: Optional[int] = None
|
||
self.file_path = file_path
|
||
self.tags: Dict[str, str] = {}
|
||
self.relations: List[Dict[str, str]] = []
|
||
self.children: List['SemanticEntity'] = []
|
||
self.parent: Optional['SemanticEntity'] = None
|
||
self.compliance_issues: List[ComplianceIssue] = []
|
||
self.has_belief_scope: bool = False
|
||
self.has_console_log: bool = False
|
||
# New fields for enhanced Svelte analysis
|
||
self.props: List[Dict[str, Any]] = []
|
||
self.events: List[str] = []
|
||
self.data_flow: List[Dict[str, str]] = []
|
||
self.has_export_let: bool = False
|
||
self.has_reactive_label: bool = False
|
||
self.has_runes: bool = False
|
||
# [/DEF:__init__:Function]
|
||
|
||
# [DEF:has_explicit_complexity:Function]
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Returns whether the entity explicitly declares complexity metadata.
|
||
def has_explicit_complexity(self) -> bool:
|
||
return any(tag.upper() in {"COMPLEXITY", "C", "TIER"} for tag in self.tags)
|
||
|
||
# [DEF:get_complexity:Function]
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Returns effective complexity with backward compatibility for legacy tiers.
|
||
# @PRE: tags dictionary is accessible.
|
||
# @POST: Returns Complexity enum value.
|
||
def get_complexity(self) -> Complexity:
|
||
with belief_scope("get_complexity"):
|
||
complexity_value = self.tags.get("COMPLEXITY") or self.tags.get("C")
|
||
if complexity_value is not None:
|
||
try:
|
||
base_complexity = Complexity(int(str(complexity_value).strip()))
|
||
except (ValueError, TypeError):
|
||
base_complexity = Complexity.ONE
|
||
else:
|
||
legacy_tier = str(self.tags.get("TIER", "")).upper().strip()
|
||
base_complexity = LEGACY_TIER_TO_COMPLEXITY.get(legacy_tier, Complexity.ONE)
|
||
|
||
file_path_lower = self.file_path.lower()
|
||
is_test_entity = (
|
||
"test" in file_path_lower
|
||
or "/__tests__/" in self.file_path
|
||
or self.name.startswith("test_")
|
||
)
|
||
|
||
if is_test_entity and base_complexity > Complexity.THREE:
|
||
base_complexity = Complexity.THREE
|
||
|
||
if self.file_path.endswith(".svelte"):
|
||
is_route_level_svelte = any(
|
||
marker in self.name for marker in ["+page", "+layout", "Page", "Layout"]
|
||
)
|
||
if not is_route_level_svelte and base_complexity > Complexity.THREE:
|
||
base_complexity = Complexity.THREE
|
||
|
||
if ("scripts/" in self.file_path or "_tui.py" in self.file_path) and base_complexity > Complexity.THREE:
|
||
base_complexity = Complexity.THREE
|
||
|
||
critical_keywords = ["auth", "security", "jwt", "database", "migration", "config", "session"]
|
||
module_like_types = {"Module", "Class", "Store"}
|
||
if (
|
||
not self.has_explicit_complexity()
|
||
and self.type in module_like_types
|
||
and any(keyword in file_path_lower for keyword in critical_keywords)
|
||
and not is_test_entity
|
||
):
|
||
return Complexity.FIVE
|
||
|
||
return base_complexity
|
||
|
||
# [DEF:get_tier:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Returns legacy tier bucket derived from effective complexity.
|
||
# @PRE: tags dictionary is accessible.
|
||
# @POST: Returns Tier enum value.
|
||
def get_tier(self) -> Tier:
|
||
with belief_scope("get_tier"):
|
||
complexity = self.get_complexity()
|
||
if complexity >= Complexity.FIVE:
|
||
return Tier.CRITICAL
|
||
if complexity >= Complexity.THREE:
|
||
return Tier.STANDARD
|
||
return Tier.TRIVIAL
|
||
# [/DEF:get_tier:Function]
|
||
|
||
# [DEF:to_dict:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Serializes the entity to a dictionary for JSON output.
|
||
# @PRE: Entity is fully populated.
|
||
# @POST: Returns a dictionary representation.
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
with belief_scope("to_dict"):
|
||
result = {
|
||
"name": self.name,
|
||
"type": self.type,
|
||
"complexity": int(self.get_complexity()),
|
||
"tier": self.get_tier().value,
|
||
"start_line": self.start_line,
|
||
"end_line": self.end_line,
|
||
"tags": self.tags,
|
||
"relations": self.relations,
|
||
"children": [c.to_dict() for c in self.children],
|
||
"compliance": {
|
||
"valid": len([i for i in self.compliance_issues if i.severity == Severity.ERROR]) == 0,
|
||
"issues": [i.to_dict() for i in self.compliance_issues],
|
||
"score": self.get_score()
|
||
}
|
||
}
|
||
if self.props:
|
||
result["props"] = self.props
|
||
if self.events:
|
||
result["events"] = self.events
|
||
if self.data_flow:
|
||
result["data_flow"] = self.data_flow
|
||
return result
|
||
# [/DEF:to_dict:Function]
|
||
|
||
# [DEF:validate:Function]
|
||
# @TIER: CRITICAL
|
||
# @PURPOSE: Checks for semantic compliance based on complexity requirements.
|
||
# @PRE: Entity structure is complete; complexity is determined.
|
||
# @POST: Populates self.compliance_issues with severity levels.
|
||
# @SIDE_EFFECT: Modifies self.compliance_issues list.
|
||
def validate(self):
|
||
with belief_scope("validate"):
|
||
complexity = self.get_complexity()
|
||
tier = self.get_tier()
|
||
|
||
# 1. Check Closure (required for ALL complexity levels)
|
||
if self.end_line is None:
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
f"Unclosed Anchor: [DEF:{self.name}:{self.type}] started at line {self.start_line}",
|
||
Severity.ERROR,
|
||
self.start_line
|
||
))
|
||
|
||
# 2. Check Mandatory Tags based on complexity
|
||
required = COMPLEXITY_MANDATORY_TAGS.get(complexity, {}).get(self.type, [])
|
||
for req_tag in required:
|
||
found = False
|
||
if req_tag == "RELATION" and len(self.relations) > 0:
|
||
found = True
|
||
else:
|
||
for existing_tag in self.tags:
|
||
if existing_tag.upper() == req_tag:
|
||
found = True
|
||
break
|
||
if not found:
|
||
severity = Severity.ERROR if complexity >= Complexity.FOUR else Severity.WARNING
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
f"Missing Mandatory Tag: @{req_tag} (required for complexity {int(complexity)})",
|
||
severity,
|
||
self.start_line
|
||
))
|
||
|
||
# 3. Validate relation predicates against GRACE-Poly allowlist
|
||
for rel in self.relations:
|
||
rel_type = rel.get("type", "").upper()
|
||
if rel_type and rel_type not in ALLOWED_RELATION_PREDICATES:
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
f"Invalid @RELATION predicate: {rel_type}. Allowed: {', '.join(sorted(ALLOWED_RELATION_PREDICATES))}",
|
||
Severity.ERROR if complexity >= Complexity.FOUR else Severity.WARNING,
|
||
self.start_line
|
||
))
|
||
|
||
# 4. Check for Belief State Logging based on complexity
|
||
if self.type == "Function":
|
||
belief_required = COMPLEXITY_BELIEF_REQUIRED.get(complexity, False)
|
||
if belief_required:
|
||
is_python = self.file_path.endswith(".py")
|
||
has_belief = self.has_belief_scope if is_python else self.has_console_log
|
||
|
||
if not has_belief:
|
||
if "logger.py" not in self.file_path and "__" not in self.name:
|
||
severity = Severity.ERROR if complexity >= Complexity.FOUR else Severity.WARNING
|
||
log_type = "belief_scope / molecular methods" if is_python else "console.log with [ID][STATE]"
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
f"Missing Belief State Logging: Function should use {log_type} (required for complexity {int(complexity)})",
|
||
severity,
|
||
self.start_line
|
||
))
|
||
|
||
# 5. Check for @INVARIANT in maximum complexity
|
||
if complexity == Complexity.FIVE and self.type in ["Module", "Component", "Class"]:
|
||
if "INVARIANT" not in [k.upper() for k in self.tags.keys()]:
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
"Missing @INVARIANT tag (required for complexity 5)",
|
||
Severity.ERROR,
|
||
self.start_line
|
||
))
|
||
|
||
# 6. Validate modern Svelte reactivity protocol
|
||
if self.type == "Component" and self.file_path.endswith(".svelte"):
|
||
strict_severity = Severity.ERROR if complexity >= Complexity.FOUR else Severity.WARNING
|
||
|
||
if self.has_export_let:
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
"Svelte protocol violation: `export let` is forbidden; use `$props()`",
|
||
strict_severity,
|
||
self.start_line
|
||
))
|
||
|
||
if self.has_reactive_label:
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
"Svelte protocol violation: `$:` reactive label is forbidden; use runes `$state/$derived/$effect`",
|
||
strict_severity,
|
||
self.start_line
|
||
))
|
||
|
||
# 7. Validate module length limit
|
||
if self.type == "Module" and self.end_line is not None:
|
||
module_length = self.end_line - self.start_line + 1
|
||
if module_length >= 300:
|
||
self.compliance_issues.append(ComplianceIssue(
|
||
f"Fractal limit warning: Module length is {module_length} lines (must be < 300)",
|
||
Severity.WARNING,
|
||
self.start_line
|
||
))
|
||
|
||
# Recursive validation
|
||
for child in self.children:
|
||
child.validate()
|
||
# [/DEF:validate:Function]
|
||
|
||
# [DEF:get_score:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Calculates a compliance score (0.0 to 1.0) based on complexity requirements.
|
||
# @PRE: validate() has been called.
|
||
# @POST: Returns a float score.
|
||
def get_score(self) -> float:
|
||
with belief_scope("get_score"):
|
||
if self.end_line is None:
|
||
return 0.0
|
||
|
||
complexity = self.get_complexity()
|
||
score = 1.0
|
||
|
||
# Dynamic penalties based on complexity
|
||
error_penalty = 0.5 if complexity >= Complexity.FOUR else 0.3
|
||
warning_penalty = 0.15
|
||
|
||
# Count issues by severity
|
||
errors = len([i for i in self.compliance_issues if i.severity == Severity.ERROR])
|
||
warnings = len([i for i in self.compliance_issues if i.severity == Severity.WARNING])
|
||
|
||
# Penalties
|
||
score -= errors * error_penalty
|
||
score -= warnings * warning_penalty
|
||
|
||
# Check mandatory tags
|
||
required = COMPLEXITY_MANDATORY_TAGS.get(complexity, {}).get(self.type, [])
|
||
if required:
|
||
found_count = 0
|
||
for req_tag in required:
|
||
found = False
|
||
if req_tag == "RELATION" and len(self.relations) > 0:
|
||
found = True
|
||
else:
|
||
for existing_tag in self.tags:
|
||
if existing_tag.upper() == req_tag:
|
||
found_count += 1
|
||
found = True
|
||
break
|
||
if found_count < len(required):
|
||
missing_ratio = 1 - (found_count / len(required))
|
||
score -= 0.3 * missing_ratio
|
||
|
||
return max(0.0, score)
|
||
# [/DEF:get_score:Function]
|
||
# [/DEF:SemanticEntity:Class]
|
||
|
||
|
||
# [DEF:get_patterns:Function]
|
||
# @TIER: STANDARD
|
||
# @PURPOSE: Returns regex patterns for a specific language.
|
||
# @PRE: lang is either 'python' or 'svelte_js'.
|
||
# @POST: Returns a dictionary of compiled regex patterns.
|
||
# @PARAM: lang (str) - 'python' or 'svelte_js'
|
||
def get_patterns(lang: str) -> Dict[str, Pattern]:
|
||
with belief_scope("get_patterns"):
|
||
if lang == "python":
|
||
return {
|
||
"anchor_start": re.compile(r"#\s*\[DEF:(?P<name>[-\w\.]+):(?P<type>\w+)\]"),
|
||
"anchor_end": re.compile(r"#\s*\[/DEF:(?P<name>[-\w\.]+):(?P<type>\w+)\]"),
|
||
"tag": re.compile(r"#\s*@(?P<tag>[A-Z_]+):\s*(?P<value>.*)"),
|
||
"relation": re.compile(r"#\s*@RELATION:\s*\[?(?P<type>\w+)\]?\s*->\s*\[?(?P<target>[^\]]+)\]?"),
|
||
"func_def": re.compile(r"^\s*(async\s+)?def\s+(?P<name>\w+)"),
|
||
"belief_scope": re.compile(r"with\s+(\w+\.)?belief_scope\(|@believed\("),
|
||
"molecular_log": re.compile(r"logger\.(explore|reason|reflect)\("),
|
||
}
|
||
else:
|
||
return {
|
||
"html_anchor_start": re.compile(r"<!--\s*\[DEF:(?P<name>[-\w\.]+):(?P<type>\w+)\]\s*-->"),
|
||
"html_anchor_end": re.compile(r"<!--\s*\[/DEF:(?P<name>[-\w\.]+):(?P<type>\w+)\]\s*-->"),
|
||
"js_anchor_start": re.compile(r"//\s*\[DEF:(?P<name>[-\w\.]+):(?P<type>\w+)\]"),
|
||
"js_anchor_end": re.compile(r"//\s*\[/DEF:(?P<name>[-\w\.]+):(?P<type>\w+)\]"),
|
||
"html_tag": re.compile(r"@(?P<tag>[A-Z_]+):\s*(?P<value>.*)"),
|
||
"jsdoc_tag": re.compile(r"\*\s*@(?P<tag>[A-Za-z_]+)\s*:?\s*(?P<value>.*)"),
|
||
"relation": re.compile(r"(?:<!--\s*|//\s*|\*\s*|\s*)@RELATION:\s*\[?(?P<type>\w+)\]?\s*->\s*\[?(?P<target>[^\]\n]+)\]?"),
|
||
"func_def": re.compile(r"^\s*(export\s+)?(async\s+)?function\s+(?P<name>\w+)"),
|
||
"console_log": re.compile(r"console\.(info|warn|debug)\s*\(\s*['\"`]\[[\w\.-]+\]\[(EXPLORE|REASON|REFLECT|[A-Za-z0-9_:]+)\]"),
|
||
# Svelte-specific patterns
|
||
"export_let": re.compile(r"export\s+let\s+(?P<name>\w+)(?:\s*:\s*(?P<type>[\w\[\]|<>]+))?(?:\s*=\s*(?P<default>[^;]+))?"),
|
||
"reactive_label": re.compile(r"^\s*\$:\s*"),
|
||
"runes_usage": re.compile(r"\$(state|derived|effect|props)\s*\("),
|
||
"create_event_dispatcher": re.compile(r"createEventDispatcher\s*<\s*\{\s*(?P<events>[^}]+)\s*\}\s*\>"),
|
||
"dispatch_call": re.compile(r"dispatch\s*\(\s*['\"](?P<event>\w+)['\"]"),
|
||
"store_subscription": re.compile(r"\$(?P<store>\w+)"),
|
||
"store_import": re.compile(r"import\s*\{[^}]*\b(?P<store>\w+Store|store)\b[^}]*\}\s*from\s*['\"][^'\"]*stores?[^'\"]*['\"]"),
|
||
}
|
||
# [/DEF:get_patterns:Function]
|
||
|
||
|
||
# [DEF:extract_svelte_props:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Extracts props from Svelte component script section.
|
||
# @PRE: lines is a list of file lines, start_idx is the starting line index.
|
||
# @POST: Returns list of prop definitions.
|
||
def extract_svelte_props(lines: List[str], start_idx: int) -> List[Dict[str, Any]]:
|
||
with belief_scope("extract_svelte_props"):
|
||
props = []
|
||
pattern = re.compile(r"export\s+let\s+(?P<name>\w+)(?:\s*:\s*(?P<type>[\w\[\]|<>\s]+))?(?:\s*=\s*(?P<default>[^;]+))?;")
|
||
|
||
for i in range(start_idx, min(start_idx + 100, len(lines))): # Look ahead 100 lines
|
||
line = lines[i].strip()
|
||
# Stop at script end or function definitions
|
||
if line == "</script>" or line.startswith("function ") or line.startswith("const "):
|
||
break
|
||
|
||
match = pattern.search(line)
|
||
if match:
|
||
prop = {
|
||
"name": match.group("name"),
|
||
"type": match.group("type") if match.group("type") else "any",
|
||
"default": match.group("default").strip() if match.group("default") else None
|
||
}
|
||
props.append(prop)
|
||
|
||
return props
|
||
# [/DEF:extract_svelte_props:Function]
|
||
|
||
|
||
# [DEF:extract_svelte_events:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Extracts dispatched events from Svelte component.
|
||
# @PRE: lines is a list of file lines.
|
||
# @POST: Returns list of event names.
|
||
def extract_svelte_events(lines: List[str]) -> List[str]:
|
||
with belief_scope("extract_svelte_events"):
|
||
events = set()
|
||
|
||
# Pattern 1: createEventDispatcher with type definition
|
||
dispatcher_pattern = re.compile(r"createEventDispatcher\s*<\s*\{\s*([^}]+)\s*\}\s*\>")
|
||
# Pattern 2: dispatch('eventName')
|
||
dispatch_pattern = re.compile(r"dispatch\s*\(\s*['\"](\w+)['\"]")
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
|
||
# Check for typed dispatcher
|
||
match = dispatcher_pattern.search(line)
|
||
if match:
|
||
events_str = match.group(1)
|
||
# Extract event names from type definition like: submit: Type; cancel: Type
|
||
for event_def in events_str.split(";"):
|
||
if ":" in event_def:
|
||
event_name = event_def.split(":")[0].strip()
|
||
if event_name:
|
||
events.add(event_name)
|
||
|
||
# Check for dispatch calls
|
||
match = dispatch_pattern.search(line)
|
||
if match:
|
||
events.add(match.group(1))
|
||
|
||
return sorted(list(events))
|
||
# [/DEF:extract_svelte_events:Function]
|
||
|
||
|
||
# [DEF:extract_data_flow:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Extracts store subscriptions and data flow from Svelte component.
|
||
# @PRE: lines is a list of file lines.
|
||
# @POST: Returns list of data flow descriptors.
|
||
def extract_data_flow(lines: List[str]) -> List[Dict[str, str]]:
|
||
with belief_scope("extract_data_flow"):
|
||
data_flow = []
|
||
|
||
# Pattern for store subscriptions: $storeName
|
||
subscription_pattern = re.compile(r"\$(?P<store>\w+)")
|
||
# Pattern for store imports
|
||
import_pattern = re.compile(r"import\s*\{[^}]*\}\s*from\s*['\"][^'\"]*stores?[^'\"]*['\"]")
|
||
|
||
store_names = set()
|
||
|
||
# First pass: find store imports
|
||
for line in lines:
|
||
if import_pattern.search(line):
|
||
# Extract imported names
|
||
match = re.search(r"import\s*\{([^}]+)\}", line)
|
||
if match:
|
||
imports = match.group(1).split(",")
|
||
for imp in imports:
|
||
store_names.add(imp.strip().split()[0])
|
||
|
||
# Second pass: find subscriptions
|
||
for i, line in enumerate(lines):
|
||
line_stripped = line.strip()
|
||
|
||
# Skip comments
|
||
if line_stripped.startswith("//") or line_stripped.startswith("*"):
|
||
continue
|
||
|
||
# Find store subscriptions
|
||
for match in subscription_pattern.finditer(line):
|
||
store_name = match.group("store")
|
||
if store_name not in ["if", "while", "for", "switch"]:
|
||
flow_type = "READS_FROM"
|
||
# Check if it's an assignment (write)
|
||
if "=" in line and line.index("$") > line.index("="):
|
||
flow_type = "WRITES_TO"
|
||
|
||
data_flow.append({
|
||
"store": store_name,
|
||
"type": flow_type,
|
||
"line": i + 1
|
||
})
|
||
|
||
return data_flow
|
||
# [/DEF:extract_data_flow:Function]
|
||
|
||
|
||
# [DEF:parse_file:Function]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Parses a single file to extract semantic entities with tier awareness and enhanced Svelte analysis.
|
||
# @PRE: full_path, rel_path, lang are valid strings.
|
||
# @POST: Returns extracted entities and list of issues.
|
||
# @INVARIANT: Every opened anchor must have a matching closing anchor for valid compliance.
|
||
# @PARAM: full_path - Absolute path to file.
|
||
# @PARAM: rel_path - Relative path from project root.
|
||
# @PARAM: lang - Language identifier.
|
||
def parse_file(full_path: str, rel_path: str, lang: str) -> Tuple[List[SemanticEntity], List[ComplianceIssue]]:
|
||
with belief_scope("parse_file"):
|
||
issues: List[ComplianceIssue] = []
|
||
try:
|
||
with open(full_path, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
except Exception as e:
|
||
return [], [ComplianceIssue(f"Could not read file {rel_path}: {e}", Severity.ERROR)]
|
||
|
||
stack: List[SemanticEntity] = []
|
||
file_entities: List[SemanticEntity] = []
|
||
orphan_functions: List[SemanticEntity] = []
|
||
patterns = get_patterns(lang)
|
||
|
||
# Track current module for grouping orphans
|
||
current_module: Optional[SemanticEntity] = None
|
||
|
||
for i, line in enumerate(lines):
|
||
lineno = i + 1
|
||
line_stripped = line.strip()
|
||
|
||
# 1. Check for Anchor Start
|
||
match_start = None
|
||
if lang == "python":
|
||
match_start = patterns["anchor_start"].search(line_stripped)
|
||
else:
|
||
match_start = patterns["html_anchor_start"].search(line_stripped) or patterns["js_anchor_start"].search(line_stripped)
|
||
|
||
if match_start:
|
||
name = match_start.group("name")
|
||
type_ = match_start.group("type")
|
||
entity = SemanticEntity(name, type_, lineno, rel_path)
|
||
|
||
# Track module-level entities
|
||
if type_ == "Module" and not stack:
|
||
current_module = entity
|
||
|
||
if stack:
|
||
parent = stack[-1]
|
||
parent.children.append(entity)
|
||
entity.parent = parent
|
||
else:
|
||
file_entities.append(entity)
|
||
|
||
stack.append(entity)
|
||
continue
|
||
|
||
# 2. Check for Anchor End
|
||
match_end = None
|
||
if lang == "python":
|
||
match_end = patterns["anchor_end"].search(line_stripped)
|
||
else:
|
||
match_end = patterns["html_anchor_end"].search(line_stripped) or patterns["js_anchor_end"].search(line_stripped)
|
||
|
||
if match_end:
|
||
name = match_end.group("name")
|
||
type_ = match_end.group("type")
|
||
|
||
if not stack:
|
||
issues.append(ComplianceIssue(
|
||
f"{rel_path}:{lineno} Found closing anchor [/DEF:{name}:{type_}] without opening anchor.",
|
||
Severity.ERROR,
|
||
lineno
|
||
))
|
||
continue
|
||
|
||
top = stack[-1]
|
||
if top.name == name and top.type == type_:
|
||
top.end_line = lineno
|
||
stack.pop()
|
||
else:
|
||
issues.append(ComplianceIssue(
|
||
f"{rel_path}:{lineno} Mismatched closing anchor. Expected [/DEF:{top.name}:{top.type}], found [/DEF:{name}:{type_}].",
|
||
Severity.ERROR,
|
||
lineno
|
||
))
|
||
continue
|
||
|
||
# 3. Check for Naked Functions (Missing Contracts) - track as orphans
|
||
if "func_def" in patterns:
|
||
match_func = patterns["func_def"].search(line_stripped)
|
||
if match_func:
|
||
func_name = match_func.group("name")
|
||
is_covered = False
|
||
if stack:
|
||
current = stack[-1]
|
||
if current.type == "Function" and current.name == func_name:
|
||
is_covered = True
|
||
|
||
if not is_covered:
|
||
# Create orphan function entity
|
||
orphan = SemanticEntity(func_name, "Function", lineno, rel_path)
|
||
orphan.tags["PURPOSE"] = f"Auto-detected function (orphan)"
|
||
orphan.tags["COMPLEXITY"] = "1"
|
||
orphan.end_line = lineno # Mark as closed immediately
|
||
orphan_functions.append(orphan)
|
||
|
||
# 4. Check for Tags/Relations
|
||
if stack:
|
||
current = stack[-1]
|
||
|
||
match_rel = patterns["relation"].search(line_stripped)
|
||
if match_rel:
|
||
current.relations.append({
|
||
"type": match_rel.group("type"),
|
||
"target": match_rel.group("target")
|
||
})
|
||
continue
|
||
|
||
match_tag = None
|
||
if lang == "python":
|
||
match_tag = patterns["tag"].search(line_stripped)
|
||
elif lang == "svelte_js":
|
||
match_tag = patterns["html_tag"].search(line_stripped)
|
||
if not match_tag and ("/*" in line_stripped or "*" in line_stripped or "//" in line_stripped):
|
||
match_tag = patterns["jsdoc_tag"].search(line_stripped)
|
||
|
||
if match_tag:
|
||
tag_name = match_tag.group("tag").upper()
|
||
tag_value = match_tag.group("value").strip()
|
||
current.tags[tag_name] = tag_value
|
||
|
||
# Check for belief scope in implementation
|
||
if lang == "python":
|
||
if "belief_scope" in patterns and patterns["belief_scope"].search(line):
|
||
current.has_belief_scope = True
|
||
elif "molecular_log" in patterns and patterns["molecular_log"].search(line):
|
||
current.has_belief_scope = True
|
||
|
||
# Check for console logging + forbidden/required Svelte reactivity signals
|
||
if lang == "svelte_js":
|
||
if "console_log" in patterns and patterns["console_log"].search(line):
|
||
current.has_console_log = True
|
||
if "export_let" in patterns and patterns["export_let"].search(line):
|
||
current.has_export_let = True
|
||
if "reactive_label" in patterns and patterns["reactive_label"].search(line):
|
||
current.has_reactive_label = True
|
||
if "runes_usage" in patterns and patterns["runes_usage"].search(line):
|
||
current.has_runes = True
|
||
|
||
# End of file check
|
||
if stack:
|
||
for unclosed in stack:
|
||
issues.append(ComplianceIssue(
|
||
f"{rel_path}: Unclosed Anchor [DEF:{unclosed.name}:{unclosed.type}] at end of file (started line {unclosed.start_line})",
|
||
Severity.ERROR,
|
||
unclosed.start_line
|
||
))
|
||
if unclosed.parent is None and unclosed not in file_entities:
|
||
file_entities.append(unclosed)
|
||
|
||
# Post-processing for Svelte files
|
||
if lang == "svelte_js":
|
||
for entity in file_entities:
|
||
if entity.type == "Component":
|
||
# Extract props, events, and data flow
|
||
entity.props = extract_svelte_props(lines, entity.start_line)
|
||
entity.events = extract_svelte_events(lines)
|
||
entity.data_flow = extract_data_flow(lines)
|
||
|
||
# Group orphan functions under their module
|
||
if orphan_functions:
|
||
if current_module:
|
||
# Add orphans as children of the module
|
||
for orphan in orphan_functions:
|
||
orphan.parent = current_module
|
||
current_module.children.append(orphan)
|
||
else:
|
||
# Create a synthetic module for orphans
|
||
synthetic_module = SemanticEntity(
|
||
os.path.splitext(os.path.basename(rel_path))[0],
|
||
"Module",
|
||
1,
|
||
rel_path
|
||
)
|
||
synthetic_module.tags["PURPOSE"] = f"Auto-generated module for {rel_path}"
|
||
synthetic_module.tags["COMPLEXITY"] = "1"
|
||
synthetic_module.tags["LAYER"] = "Unknown"
|
||
synthetic_module.end_line = len(lines)
|
||
|
||
for orphan in orphan_functions:
|
||
orphan.parent = synthetic_module
|
||
synthetic_module.children.append(orphan)
|
||
|
||
file_entities.append(synthetic_module)
|
||
|
||
return file_entities, issues
|
||
# [/DEF:parse_file:Function]
|
||
|
||
|
||
# [DEF:SemanticMapGenerator:Class]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Orchestrates the mapping process with tier-based validation.
|
||
# @INVARIANT: All entities are validated according to their TIER requirements.
|
||
class SemanticMapGenerator:
|
||
# [DEF:__init__:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Initializes the generator with a root directory.
|
||
# @PRE: root_dir is a valid path string.
|
||
# @POST: Generator instance is ready.
|
||
def __init__(self, root_dir: str):
|
||
with belief_scope("__init__"):
|
||
self.root_dir = root_dir
|
||
self.entities: List[SemanticEntity] = []
|
||
self.file_scores: Dict[str, float] = {}
|
||
self.global_issues: List[ComplianceIssue] = []
|
||
self.ignored_patterns = self._load_gitignore()
|
||
# [/DEF:__init__:Function]
|
||
|
||
# [DEF:_load_gitignore:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Loads patterns from .gitignore file.
|
||
# @PRE: .gitignore exists in root_dir.
|
||
# @POST: Returns set of ignore patterns.
|
||
def _load_gitignore(self) -> Set[str]:
|
||
with belief_scope("_load_gitignore"):
|
||
patterns = set()
|
||
ignore_file = os.path.join(self.root_dir, ".gitignore")
|
||
if os.path.exists(ignore_file):
|
||
with open(ignore_file, 'r') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith("#"):
|
||
patterns.add(line)
|
||
return patterns
|
||
# [/DEF:_load_gitignore:Function]
|
||
|
||
# [DEF:_is_ignored:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Checks if a path should be ignored based on .gitignore or hardcoded defaults.
|
||
# @PRE: rel_path is a valid relative path string.
|
||
# @POST: Returns True if the path should be ignored.
|
||
def _is_ignored(self, rel_path: str) -> bool:
|
||
with belief_scope("_is_ignored"):
|
||
rel_path = rel_path.replace(os.sep, '/')
|
||
|
||
if rel_path in IGNORE_EXACT_PATHS:
|
||
return True
|
||
|
||
for prefix in IGNORE_PATH_PREFIXES:
|
||
if rel_path.startswith(prefix):
|
||
return True
|
||
|
||
parts = rel_path.split('/')
|
||
for part in parts:
|
||
if part in IGNORE_DIRS:
|
||
return True
|
||
|
||
if os.path.basename(rel_path) in IGNORE_FILES:
|
||
return True
|
||
|
||
for pattern in self.ignored_patterns:
|
||
if pattern.endswith('/'):
|
||
dir_pattern = pattern.rstrip('/')
|
||
if rel_path == dir_pattern or rel_path.startswith(pattern):
|
||
return True
|
||
|
||
if rel_path.startswith("frontend/") and fnmatch.fnmatch(rel_path[9:], pattern):
|
||
return True
|
||
if rel_path.startswith("backend/") and fnmatch.fnmatch(rel_path[8:], pattern):
|
||
return True
|
||
|
||
if fnmatch.fnmatch(rel_path, pattern) or \
|
||
fnmatch.fnmatch(os.path.basename(rel_path), pattern) or \
|
||
any(fnmatch.fnmatch(part, pattern) for part in parts):
|
||
return True
|
||
|
||
return False
|
||
# [/DEF:_is_ignored:Function]
|
||
|
||
# [DEF:run:Function]
|
||
# @TIER: CRITICAL
|
||
# @PURPOSE: Main execution flow.
|
||
# @PRE: Generator is initialized.
|
||
# @POST: Semantic map and reports are generated.
|
||
# @RELATION: CALLS -> _walk_and_parse
|
||
# @RELATION: CALLS -> _generate_artifacts
|
||
def run(self):
|
||
with belief_scope("run"):
|
||
print(f"Starting Semantic Map Generation in {self.root_dir}...")
|
||
self._walk_and_parse()
|
||
self._generate_artifacts()
|
||
print("Done.")
|
||
# [/DEF:run:Function]
|
||
|
||
# [DEF:_walk_and_parse:Function]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Recursively walks directories and triggers parsing.
|
||
# @PRE: root_dir exists.
|
||
# @POST: All files are scanned and entities extracted.
|
||
def _walk_and_parse(self):
|
||
with belief_scope("_walk_and_parse"):
|
||
for root, dirs, files in os.walk(self.root_dir):
|
||
dirs[:] = [d for d in dirs if not self._is_ignored(os.path.relpath(os.path.join(root, d), self.root_dir) + "/")]
|
||
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
rel_path = os.path.relpath(file_path, self.root_dir)
|
||
|
||
if self._is_ignored(rel_path):
|
||
continue
|
||
|
||
lang = None
|
||
if file.endswith(".py"):
|
||
lang = "python"
|
||
elif file.endswith((".svelte", ".js", ".ts")):
|
||
lang = "svelte_js"
|
||
|
||
if lang:
|
||
entities, issues = parse_file(file_path, rel_path, lang)
|
||
self.global_issues.extend(issues)
|
||
|
||
if entities:
|
||
self._process_file_results(rel_path, entities)
|
||
# [/DEF:_walk_and_parse:Function]
|
||
|
||
# [DEF:_process_file_results:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Validates entities and calculates file scores with tier awareness.
|
||
# @PRE: Entities have been parsed from the file.
|
||
# @POST: File score is calculated and issues collected.
|
||
def _process_file_results(self, rel_path: str, entities: List[SemanticEntity]):
|
||
with belief_scope("_process_file_results"):
|
||
total_score = 0
|
||
count = 0
|
||
module_max_tier = Tier.TRIVIAL
|
||
|
||
# [DEF:validate_recursive:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Calculate score and determine module's max tier for weighted global score
|
||
# @PRE: Entities exist
|
||
# @POST: Entities are validated
|
||
def validate_recursive(ent_list):
|
||
with belief_scope("validate_recursive"):
|
||
nonlocal total_score, count, module_max_tier
|
||
for e in ent_list:
|
||
e.validate()
|
||
total_score += e.get_score()
|
||
count += 1
|
||
|
||
# Determine dominant tier for file
|
||
e_tier = e.get_tier()
|
||
if e_tier == Tier.CRITICAL:
|
||
module_max_tier = Tier.CRITICAL
|
||
elif e_tier == Tier.STANDARD and module_max_tier != Tier.CRITICAL:
|
||
module_max_tier = Tier.STANDARD
|
||
|
||
validate_recursive(e.children)
|
||
# [/DEF:validate_recursive:Function]
|
||
|
||
validate_recursive(entities)
|
||
|
||
self.entities.extend(entities)
|
||
|
||
# Store both the score and the dominating tier for weighted global calculation
|
||
file_score = (total_score / count) if count > 0 else 0.0
|
||
self.file_scores[rel_path] = {"score": file_score, "tier": module_max_tier}
|
||
|
||
# [/DEF:_process_file_results:Function]
|
||
|
||
# [DEF:_generate_artifacts:Function]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Writes output files with tier-based compliance data.
|
||
# @PRE: Parsing and validation are complete.
|
||
# @POST: JSON and Markdown artifacts are written to disk.
|
||
def _generate_artifacts(self):
|
||
with belief_scope("_generate_artifacts"):
|
||
full_map = {
|
||
"project_root": self.root_dir,
|
||
"generated_at": datetime.datetime.now().isoformat(),
|
||
"modules": [e.to_dict() for e in self.entities]
|
||
}
|
||
|
||
os.makedirs(os.path.dirname(OUTPUT_JSON), exist_ok=True)
|
||
with open(OUTPUT_JSON, 'w', encoding='utf-8') as f:
|
||
json.dump(full_map, f, indent=2)
|
||
print(f"Generated {OUTPUT_JSON}")
|
||
|
||
self._generate_report()
|
||
self._generate_compressed_map()
|
||
self._generate_module_map()
|
||
# [/DEF:_generate_artifacts:Function]
|
||
|
||
# [DEF:_print_agent_report:Function]
|
||
# @TIER: STANDARD
|
||
# @PURPOSE: Prints a JSON report optimized for AI agent orchestration and control.
|
||
# @PRE: Validation and artifact generation are complete.
|
||
# @POST: JSON report printed to stdout.
|
||
def _print_agent_report(self):
|
||
with belief_scope("_print_agent_report"):
|
||
# Calculate global score (re-using logic from _generate_report)
|
||
total_weighted_score = 0
|
||
total_weight = 0
|
||
for file_path, data in self.file_scores.items():
|
||
tier = data["tier"]
|
||
score = data["score"]
|
||
weight = 3 if tier == Tier.CRITICAL else (2 if tier == Tier.STANDARD else 1)
|
||
total_weighted_score += score * weight
|
||
total_weight += weight
|
||
gs = total_weighted_score / total_weight if total_weight > 0 else 0
|
||
|
||
# Flatten entities to get per-file issues
|
||
file_data = {}
|
||
def collect_recursive(entities):
|
||
for e in entities:
|
||
path = e.file_path
|
||
if path not in file_data:
|
||
file_data[path] = {"issues": [], "tier": e.get_tier().value, "score": self.file_scores.get(path, {}).get("score", 0)}
|
||
file_data[path]["issues"].extend([i.to_dict() for i in e.compliance_issues])
|
||
collect_recursive(e.children)
|
||
collect_recursive(self.entities)
|
||
|
||
# Critical parsing errors
|
||
cpe = []
|
||
for path, data in file_data.items():
|
||
for i in data["issues"]:
|
||
msg = i.get("message", "").lower()
|
||
sev = i.get("severity", "").lower()
|
||
if "parsing" in msg and (sev == "error" or "critical" in msg):
|
||
cpe.append({"file": path, "severity": i.get("severity"), "message": i.get("message")})
|
||
|
||
# <0.7 by tier
|
||
lt = {"CRITICAL": 0, "STANDARD": 0, "TRIVIAL": 0, "UNKNOWN": 0}
|
||
for path, data in file_data.items():
|
||
if data["score"] < 0.7:
|
||
tier = data["tier"]
|
||
lt[tier if tier in lt else "UNKNOWN"] += 1
|
||
|
||
# Priority counts
|
||
p2 = 0
|
||
p3 = 0
|
||
for path, data in file_data.items():
|
||
tier = data["tier"]
|
||
issues = data["issues"]
|
||
if tier == "CRITICAL" and any("Missing Mandatory Tag" in i.get("message", "") for i in issues):
|
||
p2 += 1
|
||
if tier == "STANDARD" and any("@RELATION" in i.get("message", "") and "Missing Mandatory Tag" in i.get("message", "") for i in issues):
|
||
p3 += 1
|
||
|
||
# Target files status
|
||
targets = [
|
||
'frontend/src/routes/migration/+page.svelte',
|
||
'frontend/src/routes/migration/mappings/+page.svelte',
|
||
'frontend/src/components/auth/ProtectedRoute.svelte',
|
||
'backend/src/core/auth/repository.py',
|
||
'backend/src/core/migration/risk_assessor.py',
|
||
'backend/src/api/routes/migration.py',
|
||
'backend/src/models/config.py',
|
||
'backend/src/services/auth_service.py',
|
||
'backend/src/core/config_manager.py',
|
||
'backend/src/core/migration_engine.py'
|
||
]
|
||
status = []
|
||
for t in targets:
|
||
f = file_data.get(t)
|
||
if not f:
|
||
status.append({"path": t, "found": False})
|
||
continue
|
||
sc = f["score"]
|
||
status.append({
|
||
"path": t,
|
||
"found": True,
|
||
"score": sc,
|
||
"tier": f["tier"],
|
||
"under_0_7": sc < 0.7,
|
||
"violations": len(f["issues"]) > 0,
|
||
"issues_count": len(f["issues"])
|
||
})
|
||
|
||
out = {
|
||
"global_score": gs,
|
||
"critical_parsing_errors_count": len(cpe),
|
||
"critical_parsing_errors": cpe[:50],
|
||
"lt_0_7_by_tier": lt,
|
||
"priority_1_blockers": len(cpe),
|
||
"priority_2_tier1_critical_missing_mandatory_tags_files": p2,
|
||
"priority_3_tier2_standard_missing_relation_files": p3,
|
||
"targets": status,
|
||
"total_files": len(file_data)
|
||
}
|
||
print(json.dumps(out, ensure_ascii=False))
|
||
# [/DEF:_print_agent_report:Function]
|
||
|
||
# [DEF:_generate_report:Function]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Generates the Markdown compliance report with severity levels.
|
||
# @PRE: File scores and issues are available.
|
||
# @POST: Markdown report is created in reports directory.
|
||
def _generate_report(self):
|
||
with belief_scope("_generate_report"):
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
report_path = os.path.join(REPORTS_DIR, f"semantic_report_{timestamp}.md")
|
||
os.makedirs(REPORTS_DIR, exist_ok=True)
|
||
|
||
total_files = len(self.file_scores)
|
||
|
||
total_weighted_score = 0
|
||
total_weight = 0
|
||
|
||
for file_path, data in self.file_scores.items():
|
||
tier = data["tier"]
|
||
score = data["score"]
|
||
weight = 3 if tier == Tier.CRITICAL else (2 if tier == Tier.STANDARD else 1)
|
||
|
||
total_weighted_score += score * weight
|
||
total_weight += weight
|
||
|
||
avg_score = total_weighted_score / total_weight if total_weight > 0 else 0
|
||
|
||
# Count issues by severity
|
||
error_count = len([i for i in self.global_issues if i.severity == Severity.ERROR])
|
||
warning_count = len([i for i in self.global_issues if i.severity == Severity.WARNING])
|
||
|
||
with open(report_path, 'w', encoding='utf-8') as f:
|
||
f.write(f"# Semantic Compliance Report\n\n")
|
||
f.write(f"**Generated At:** {datetime.datetime.now().isoformat()}\n")
|
||
f.write(f"**Global Compliance Score:** {avg_score:.1%}\n")
|
||
f.write(f"**Scanned Files:** {total_files}\n")
|
||
f.write(f"**Global Errors:** {error_count} | **Warnings:** {warning_count}\n\n")
|
||
|
||
if self.global_issues:
|
||
f.write("## Critical Parsing Errors\n")
|
||
for issue in self.global_issues:
|
||
icon = "🔴" if issue.severity == Severity.ERROR else "🟡" if issue.severity == Severity.WARNING else "ℹ️"
|
||
f.write(f"- {icon} {issue.message}\n")
|
||
f.write("\n")
|
||
|
||
f.write("## File Compliance Status\n")
|
||
f.write("| File | Score | Tier | Issues |\n")
|
||
f.write("|------|-------|------|--------|\n")
|
||
|
||
# Sort logically: Critical first, then by score
|
||
sorted_files = sorted(self.file_scores.items(), key=lambda x: (
|
||
0 if x[1]["tier"] == Tier.CRITICAL else (1 if x[1]["tier"] == Tier.STANDARD else 2),
|
||
x[1]["score"]
|
||
))
|
||
|
||
for file_path, data in sorted_files:
|
||
score = data["score"]
|
||
issues = []
|
||
tier = "N/A"
|
||
self._collect_issues(self.entities, file_path, issues, tier)
|
||
# Override Display Tier with the dominant tier we computed
|
||
tier = data["tier"].value
|
||
|
||
status_icon = "🟢" if score == 1.0 else "🟡" if score > 0.5 else "🔴"
|
||
issue_text = "<br>".join([f"{'🔴' if i.severity == Severity.ERROR else '🟡'} {i.message}" for i in issues[:3]])
|
||
if len(issues) > 3:
|
||
issue_text += f"<br>... and {len(issues) - 3} more"
|
||
if not issues:
|
||
issue_text = "OK"
|
||
|
||
f.write(f"| {file_path} | {status_icon} {score:.0%} | {tier} | {issue_text} |\n")
|
||
|
||
print(f"Generated {report_path}")
|
||
# [/DEF:_generate_report:Function]
|
||
|
||
# [DEF:_collect_issues:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Helper to collect issues for a specific file from the entity tree.
|
||
# @PRE: entities list and file_path are valid.
|
||
# @POST: issues list is populated with compliance issues.
|
||
def _collect_issues(self, entities: List[SemanticEntity], file_path: str, issues: List[ComplianceIssue], tier: str):
|
||
with belief_scope("_collect_issues"):
|
||
for e in entities:
|
||
if e.file_path == file_path:
|
||
issues.extend(e.compliance_issues)
|
||
tier = e.get_tier().value
|
||
self._collect_issues(e.children, file_path, issues, tier)
|
||
# [/DEF:_collect_issues:Function]
|
||
|
||
# [DEF:_generate_compressed_map:Function]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Generates the token-optimized project map with enhanced Svelte details.
|
||
# @PRE: Entities have been processed.
|
||
# @POST: Markdown project map is written.
|
||
def _generate_compressed_map(self):
|
||
with belief_scope("_generate_compressed_map"):
|
||
os.makedirs(os.path.dirname(OUTPUT_COMPRESSED_MD), exist_ok=True)
|
||
|
||
with open(OUTPUT_COMPRESSED_MD, 'w', encoding='utf-8') as f:
|
||
f.write("# Project Semantic Map\n\n")
|
||
f.write("> Compressed view for AI Context. Generated automatically.\n\n")
|
||
|
||
for entity in self.entities:
|
||
self._write_entity_md(f, entity, level=0)
|
||
|
||
print(f"Generated {OUTPUT_COMPRESSED_MD}")
|
||
# [/DEF:_generate_compressed_map:Function]
|
||
|
||
# [DEF:_write_entity_md:Function]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Recursive helper to write entity tree to Markdown with tier badges and enhanced details.
|
||
# @PRE: f is an open file handle, entity is valid.
|
||
# @POST: Entity details are written to the file.
|
||
def _write_entity_md(self, f, entity: SemanticEntity, level: int):
|
||
with belief_scope("_write_entity_md"):
|
||
indent = " " * level
|
||
|
||
icon = "📦"
|
||
if entity.type == "Component": icon = "🧩"
|
||
elif entity.type == "Function": icon = "ƒ"
|
||
elif entity.type == "Class": icon = "ℂ"
|
||
elif entity.type == "Store": icon = "🗄️"
|
||
elif entity.type == "Block": icon = "▦"
|
||
|
||
tier_badge = ""
|
||
tier = entity.get_tier()
|
||
if tier == Tier.CRITICAL:
|
||
tier_badge = " `[CRITICAL]`"
|
||
elif tier == Tier.TRIVIAL:
|
||
tier_badge = " `[TRIVIAL]`"
|
||
|
||
f.write(f"{indent}- {icon} **{entity.name}** (`{entity.type}`){tier_badge}\n")
|
||
|
||
purpose = entity.tags.get("PURPOSE") or entity.tags.get("purpose")
|
||
layer = entity.tags.get("LAYER") or entity.tags.get("layer")
|
||
invariant = entity.tags.get("INVARIANT")
|
||
|
||
if purpose:
|
||
f.write(f"{indent} - 📝 {purpose}\n")
|
||
if layer:
|
||
f.write(f"{indent} - 🏗️ Layer: {layer}\n")
|
||
if invariant:
|
||
f.write(f"{indent} - 🔒 Invariant: {invariant}\n")
|
||
|
||
# Write Props for Components
|
||
if entity.props:
|
||
props_str = ", ".join([f"{p['name']}: {p['type']}" for p in entity.props[:5]])
|
||
if len(entity.props) > 5:
|
||
props_str += f"... (+{len(entity.props) - 5})"
|
||
f.write(f"{indent} - 📥 Props: {props_str}\n")
|
||
|
||
# Write Events for Components
|
||
if entity.events:
|
||
events_str = ", ".join(entity.events[:5])
|
||
if len(entity.events) > 5:
|
||
events_str += f"... (+{len(entity.events) - 5})"
|
||
f.write(f"{indent} - ⚡ Events: {events_str}\n")
|
||
|
||
# Write Data Flow
|
||
if entity.data_flow:
|
||
unique_flows = {}
|
||
for flow in entity.data_flow:
|
||
key = f"{flow['type']} -> {flow['store']}"
|
||
unique_flows[key] = flow
|
||
|
||
for flow_key, flow in list(unique_flows.items())[:3]:
|
||
arrow = "⬅️" if flow['type'] == "READS_FROM" else "➡️"
|
||
f.write(f"{indent} - {arrow} {flow['type']} `{flow['store']}`\n")
|
||
|
||
# Write Relations
|
||
for rel in entity.relations:
|
||
if rel['type'] in ['DEPENDS_ON', 'CALLS', 'INHERITS', 'IMPLEMENTS', 'DISPATCHES', 'BINDS_TO']:
|
||
f.write(f"{indent} - 🔗 {rel['type']} -> `{rel['target']}`\n")
|
||
|
||
if level < 3:
|
||
for child in entity.children:
|
||
self._write_entity_md(f, child, level + 1)
|
||
# [/DEF:_write_entity_md:Function]
|
||
|
||
# [DEF:_generate_module_map:Function]
|
||
# @TIER: CRITICAL
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Generates a module-centric map grouping entities by directory structure.
|
||
# @PRE: Entities have been processed.
|
||
# @POST: Markdown module map is written to .ai/MODULE_MAP.md.
|
||
def _generate_module_map(self):
|
||
with belief_scope("_generate_module_map"):
|
||
os.makedirs(os.path.dirname(OUTPUT_MODULE_MAP_MD), exist_ok=True)
|
||
|
||
# Group entities by directory/module
|
||
modules: Dict[str, Dict[str, Any]] = {}
|
||
|
||
# [DEF:_get_module_path:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Extracts the module path from a file path.
|
||
# @PRE: file_path is a valid relative path.
|
||
# @POST: Returns a module path string.
|
||
def _get_module_path(file_path: str) -> str:
|
||
with belief_scope("_get_module_path"):
|
||
# Convert file path to module-like path
|
||
parts = file_path.replace(os.sep, '/').split('/')
|
||
# Remove filename
|
||
if len(parts) > 1:
|
||
return '/'.join(parts[:-1])
|
||
return 'root'
|
||
# [/DEF:_get_module_path:Function]
|
||
|
||
# [DEF:_collect_all_entities:Function]
|
||
# @TIER: STANDARD
|
||
# @RELATION: [DEPENDS_ON] builtin
|
||
# @PURPOSE: Flattens entity tree for easier grouping.
|
||
# @PRE: entity list is valid.
|
||
# @POST: Returns flat list of all entities with their hierarchy.
|
||
def _collect_all_entities(entities: List[SemanticEntity], result: List[Tuple[str, SemanticEntity]]):
|
||
with belief_scope("_collect_all_entities"):
|
||
for e in entities:
|
||
result.append((_get_module_path(e.file_path), e))
|
||
_collect_all_entities(e.children, result)
|
||
# [/DEF:_collect_all_entities:Function]
|
||
|
||
# Collect all entities
|
||
all_entities: List[Tuple[str, SemanticEntity]] = []
|
||
_collect_all_entities(self.entities, all_entities)
|
||
|
||
# Group by module path
|
||
for module_path, entity in all_entities:
|
||
if module_path not in modules:
|
||
modules[module_path] = {
|
||
'entities': [],
|
||
'files': set(),
|
||
'layers': set(),
|
||
'tiers': {'CRITICAL': 0, 'STANDARD': 0, 'TRIVIAL': 0},
|
||
'relations': []
|
||
}
|
||
modules[module_path]['entities'].append(entity)
|
||
modules[module_path]['files'].add(entity.file_path)
|
||
if entity.tags.get('LAYER'):
|
||
modules[module_path]['layers'].add(entity.tags.get('LAYER'))
|
||
tier = entity.get_tier().value
|
||
modules[module_path]['tiers'][tier] = modules[module_path]['tiers'].get(tier, 0) + 1
|
||
for rel in entity.relations:
|
||
modules[module_path]['relations'].append(rel)
|
||
|
||
# Write module map
|
||
with open(OUTPUT_MODULE_MAP_MD, 'w', encoding='utf-8') as f:
|
||
f.write("# Module Map\n\n")
|
||
f.write("> High-level module structure for AI Context. Generated automatically.\n\n")
|
||
f.write(f"**Generated:** {datetime.datetime.now().isoformat()}\n\n")
|
||
|
||
# Summary statistics
|
||
total_modules = len(modules)
|
||
total_entities = len(all_entities)
|
||
f.write("## Summary\n\n")
|
||
f.write(f"- **Total Modules:** {total_modules}\n")
|
||
f.write(f"- **Total Entities:** {total_entities}\n\n")
|
||
|
||
# Module hierarchy
|
||
f.write("## Module Hierarchy\n\n")
|
||
|
||
# Sort modules by path for consistent output
|
||
sorted_modules = sorted(modules.items(), key=lambda x: x[0])
|
||
|
||
for module_path, data in sorted_modules:
|
||
# Calculate module depth for indentation
|
||
depth = module_path.count('/')
|
||
indent = " " * depth
|
||
|
||
# Module header
|
||
module_name = module_path.split('/')[-1] if module_path != 'root' else 'root'
|
||
f.write(f"{indent}### 📁 `{module_name}/`\n\n")
|
||
|
||
# Module metadata
|
||
if data['layers']:
|
||
layers_str = ", ".join(sorted(data['layers']))
|
||
f.write(f"{indent}- 🏗️ **Layers:** {layers_str}\n")
|
||
|
||
tiers_summary = []
|
||
for tier_name, count in data['tiers'].items():
|
||
if count > 0:
|
||
tiers_summary.append(f"{tier_name}: {count}")
|
||
if tiers_summary:
|
||
f.write(f"{indent}- 📊 **Tiers:** {', '.join(tiers_summary)}\n")
|
||
|
||
f.write(f"{indent}- 📄 **Files:** {len(data['files'])}\n")
|
||
f.write(f"{indent}- 📦 **Entities:** {len(data['entities'])}\n")
|
||
|
||
# List key entities (Modules, Classes, Components only)
|
||
key_entities = [e for e in data['entities'] if e.type in ['Module', 'Class', 'Component', 'Store']]
|
||
if key_entities:
|
||
f.write(f"\n{indent}**Key Entities:**\n\n")
|
||
for entity in sorted(key_entities, key=lambda x: (x.type, x.name))[:10]:
|
||
icon = "📦" if entity.type == "Module" else "ℂ" if entity.type == "Class" else "🧩" if entity.type == "Component" else "🗄️"
|
||
tier_badge = ""
|
||
if entity.get_tier() == Tier.CRITICAL:
|
||
tier_badge = " `[CRITICAL]`"
|
||
elif entity.get_tier() == Tier.TRIVIAL:
|
||
tier_badge = " `[TRIVIAL]`"
|
||
purpose = entity.tags.get('PURPOSE', '')[:60] + "..." if entity.tags.get('PURPOSE') and len(entity.tags.get('PURPOSE', '')) > 60 else entity.tags.get('PURPOSE', '')
|
||
f.write(f"{indent} - {icon} **{entity.name}** ({entity.type}){tier_badge}\n")
|
||
if purpose:
|
||
f.write(f"{indent} - {purpose}\n")
|
||
|
||
# External relations
|
||
external_relations = [r for r in data['relations'] if r['type'] in ['DEPENDS_ON', 'IMPLEMENTS', 'INHERITS']]
|
||
if external_relations:
|
||
unique_deps = {}
|
||
for rel in external_relations:
|
||
key = f"{rel['type']} -> {rel['target']}"
|
||
unique_deps[key] = rel
|
||
f.write(f"\n{indent}**Dependencies:**\n\n")
|
||
for rel_str in sorted(unique_deps.keys())[:5]:
|
||
f.write(f"{indent} - 🔗 {rel_str}\n")
|
||
|
||
f.write("\n")
|
||
|
||
# Cross-module dependency graph
|
||
f.write("## Cross-Module Dependencies\n\n")
|
||
f.write("```mermaid\n")
|
||
f.write("graph TD\n")
|
||
|
||
# Find inter-module dependencies
|
||
for module_path, data in sorted_modules:
|
||
module_name = module_path.split('/')[-1] if module_path != 'root' else 'root'
|
||
safe_name = module_name.replace('-', '_').replace('.', '_')
|
||
|
||
for rel in data['relations']:
|
||
target = rel.get('target', '')
|
||
# Check if target references another module
|
||
for other_module in modules:
|
||
if other_module != module_path and other_module in target:
|
||
other_name = other_module.split('/')[-1]
|
||
safe_other = other_name.replace('-', '_').replace('.', '_')
|
||
f.write(f" {safe_name}-->|{rel['type']}|{safe_other}\n")
|
||
break
|
||
|
||
f.write("```\n")
|
||
|
||
print(f"Generated {OUTPUT_MODULE_MAP_MD}")
|
||
# [/DEF:_generate_module_map:Function]
|
||
|
||
# [/DEF:SemanticMapGenerator:Class]
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="Generate Semantic Map and Compliance Reports")
|
||
parser.add_argument("--agent-report", action="store_true", help="Output JSON report for AI agents")
|
||
args = parser.parse_args()
|
||
|
||
generator = SemanticMapGenerator(PROJECT_ROOT)
|
||
generator.run()
|
||
|
||
if args.agent_report:
|
||
generator._print_agent_report()
|
||
|
||
# [/DEF:generate_semantic_map:Module]
|