""" FILE: app/services/edge_registry.py DESCRIPTION: Single Source of Truth für Kanten-Typen, Symmetrien und Graph-Topologie. WP-24c: Implementierung der dualen Registry (Vocabulary & Schema). Unterstützt dynamisches Laden von Inversen und kontextuellen Vorschlägen. VERSION: 1.0.1 (WP-24c: Verified Atomic Topology) STATUS: Active """ import re import os import json import logging import time from typing import Dict, Optional, Set, Tuple, List from app.config import get_settings logger = logging.getLogger(__name__) class EdgeRegistry: """ Zentraler Verwalter für das Kanten-Vokabular und das Graph-Schema. Singleton-Pattern zur Sicherstellung konsistenter Validierung. """ _instance = None # SYSTEM-SCHUTZ: Diese Kanten sind für die strukturelle Integrität reserviert (v0.8.0 Erhalt) FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"} def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(EdgeRegistry, cls).__new__(cls) cls._instance.initialized = False return cls._instance def __init__(self): if self.initialized: return settings = get_settings() # --- Pfad-Konfiguration (WP-24c: Variable Pfade für Vault-Spiegelung) --- # Das Vokabular (Semantik) self.full_vocab_path = os.path.abspath(settings.MINDNET_VOCAB_PATH) # Das Schema (Topologie) - Konfigurierbar via ENV: MINDNET_SCHEMA_PATH schema_env = getattr(settings, "MINDNET_SCHEMA_PATH", None) if schema_env: self.full_schema_path = os.path.abspath(schema_env) else: # Fallback: Liegt im selben Verzeichnis wie das Vokabular self.full_schema_path = os.path.join(os.path.dirname(self.full_vocab_path), "graph_schema.md") self.unknown_log_path = "data/logs/unknown_edges.jsonl" # --- Interne Datenspeicher --- self.canonical_map: Dict[str, str] = {} self.inverse_map: Dict[str, str] = {} self.valid_types: Set[str] = set() # Topologie: source_type -> { target_type -> {"typical": set, "prohibited": set} } self.topology: Dict[str, Dict[str, Dict[str, Set[str]]]] = {} self._last_vocab_mtime = 0.0 self._last_schema_mtime = 0.0 logger.info(f">>> [EDGE-REGISTRY] Initializing WP-24c Dual-Engine") logger.info(f" - Vocab-Path: {self.full_vocab_path}") logger.info(f" - Schema-Path: {self.full_schema_path}") self.ensure_latest() self.initialized = True def ensure_latest(self): """Prüft Zeitstempel beider Dateien und führt bei Änderung Hot-Reload durch.""" try: # Vokabular-Reload bei Änderung if os.path.exists(self.full_vocab_path): v_mtime = os.path.getmtime(self.full_vocab_path) if v_mtime > self._last_vocab_mtime: self._load_vocabulary() self._last_vocab_mtime = v_mtime # Schema-Reload bei Änderung if os.path.exists(self.full_schema_path): s_mtime = os.path.getmtime(self.full_schema_path) if s_mtime > self._last_schema_mtime: self._load_schema() self._last_schema_mtime = s_mtime except Exception as e: logger.error(f"!!! [EDGE-REGISTRY] Sync failure: {e}") def _load_vocabulary(self): """Parst edge_vocabulary.md: | Canonical | Inverse | Aliases | Description |""" self.canonical_map.clear() self.inverse_map.clear() self.valid_types.clear() # Regex für die 4-Spalten Struktur (WP-24c konform) # Erwartet: | **`type`** | `inverse` | alias1, alias2 | ... | pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*`?([a-zA-Z0-9_-]+)`?\s*\|\s*([^|]+)\|") try: with open(self.full_vocab_path, "r", encoding="utf-8") as f: c_count = 0 for line in f: match = pattern.search(line) if match: canonical = match.group(1).strip().lower() inverse = match.group(2).strip().lower() aliases_raw = match.group(3).strip() self.valid_types.add(canonical) self.canonical_map[canonical] = canonical if inverse: self.inverse_map[canonical] = inverse # Aliase verarbeiten (Normalisierung auf snake_case) if aliases_raw and "Kein Alias" not in aliases_raw: aliases = [a.strip() for a in aliases_raw.split(",") if a.strip()] for alias in aliases: clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_") if clean_alias: self.canonical_map[clean_alias] = canonical c_count += 1 logger.info(f"✅ [VOCAB] Loaded {c_count} edge definitions and their inverses.") except Exception as e: logger.error(f"❌ [VOCAB ERROR] {e}") def _load_schema(self): """Parst graph_schema.md: ## Source: `type` | Target | Typical | Prohibited |""" self.topology.clear() current_source = None try: with open(self.full_schema_path, "r", encoding="utf-8") as f: for line in f: # Header erkennen (Atomare Sektionen) src_match = re.search(r"## Source:\s*`?([a-zA-Z0-9_-]+)`?", line) if src_match: current_source = src_match.group(1).strip().lower() if current_source not in self.topology: self.topology[current_source] = {} continue # Tabellenzeilen parsen if current_source and "|" in line and not line.startswith("|-") and "Target" not in line: cols = [c.strip().replace("`", "").lower() for c in line.split("|")] if len(cols) >= 4: target_type = cols[1] typical_edges = [e.strip() for e in cols[2].split(",") if e.strip() and e != "-"] prohibited_edges = [e.strip() for e in cols[3].split(",") if e.strip() and e != "-"] if target_type not in self.topology[current_source]: self.topology[current_source][target_type] = {"typical": set(), "prohibited": set()} self.topology[current_source][target_type]["typical"].update(typical_edges) self.topology[current_source][target_type]["prohibited"].update(prohibited_edges) logger.info(f"✅ [SCHEMA] Topology matrix built for {len(self.topology)} source types.") except Exception as e: logger.error(f"❌ [SCHEMA ERROR] {e}") def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str: """ Löst Aliasse auf kanonische Namen auf und schützt System-Kanten. Erhalt der v0.8.0 Schutz-Logik. """ self.ensure_latest() if not edge_type: return "related_to" clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_") ctx = context or {} # Sicherheits-Gate: Schutz vor unerlaubter Nutzung von System-Kanten restricted_provenance = ["explicit", "semantic_ai", "inherited", "global_pool", "rule"] if provenance in restricted_provenance and clean_type in self.FORBIDDEN_SYSTEM_EDGES: self._log_issue(clean_type, f"forbidden_system_edge_manipulation_by_{provenance}", ctx) return "related_to" # System-Kanten sind NUR bei struktureller Provenienz (Code-generiert) erlaubt if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES: return clean_type # Alias-Auflösung return self.canonical_map.get(clean_type, clean_type) def get_inverse(self, edge_type: str) -> str: """WP-24c: Gibt das symmetrische Gegenstück zurück.""" canonical = self.resolve(edge_type) return self.inverse_map.get(canonical, "related_to") def get_topology_info(self, source_type: str, target_type: str) -> Dict[str, List[str]]: """ WP-24c: Liefert kontextuelle Kanten-Empfehlungen für Obsidian und das Backend. """ self.ensure_latest() # Hierarchische Suche: Spezifisch -> 'any' -> Empty src_cfg = self.topology.get(source_type, self.topology.get("any", {})) tgt_cfg = src_cfg.get(target_type, src_cfg.get("any", {"typical": set(), "prohibited": set()})) return { "typical": sorted(list(tgt_cfg["typical"])), "prohibited": sorted(list(tgt_cfg["prohibited"])) } def _log_issue(self, edge_type: str, error_kind: str, ctx: dict): """JSONL-Logging für unbekannte/verbotene Kanten (Erhalt v0.8.0).""" try: os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) entry = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "edge_type": edge_type, "error": error_kind, "note_id": ctx.get("note_id", "unknown"), "provenance": ctx.get("provenance", "unknown") } with open(self.unknown_log_path, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") except Exception: pass # Singleton Export registry = EdgeRegistry()