mindnet/app/services/edge_registry.py

227 lines
10 KiB
Python

"""
FILE: app/services/edge_registry.py
DESCRIPTION: Single Source of Truth für Kanten-Typen, Symmetrien und Graph-Topologie.
WP-24c: Implementierung der dualen Registry (Vocabulary & Schema).
Unterstützt dynamisches Laden von Inversen und kontextuellen Vorschlägen.
VERSION: 1.0.1 (WP-24c: Verified Atomic Topology)
STATUS: Active
"""
import re
import os
import json
import logging
import time
from typing import Dict, Optional, Set, Tuple, List
from app.config import get_settings
logger = logging.getLogger(__name__)
class EdgeRegistry:
"""
Zentraler Verwalter für das Kanten-Vokabular und das Graph-Schema.
Singleton-Pattern zur Sicherstellung konsistenter Validierung.
"""
_instance = None
# SYSTEM-SCHUTZ: Diese Kanten sind für die strukturelle Integrität reserviert (v0.8.0 Erhalt)
FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(EdgeRegistry, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
if self.initialized:
return
settings = get_settings()
# --- Pfad-Konfiguration (WP-24c: Variable Pfade für Vault-Spiegelung) ---
# Das Vokabular (Semantik)
self.full_vocab_path = os.path.abspath(settings.MINDNET_VOCAB_PATH)
# Das Schema (Topologie) - Konfigurierbar via ENV: MINDNET_SCHEMA_PATH
schema_env = getattr(settings, "MINDNET_SCHEMA_PATH", None)
if schema_env:
self.full_schema_path = os.path.abspath(schema_env)
else:
# Fallback: Liegt im selben Verzeichnis wie das Vokabular
self.full_schema_path = os.path.join(os.path.dirname(self.full_vocab_path), "graph_schema.md")
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
# --- Interne Datenspeicher ---
self.canonical_map: Dict[str, str] = {}
self.inverse_map: Dict[str, str] = {}
self.valid_types: Set[str] = set()
# Topologie: source_type -> { target_type -> {"typical": set, "prohibited": set} }
self.topology: Dict[str, Dict[str, Dict[str, Set[str]]]] = {}
self._last_vocab_mtime = 0.0
self._last_schema_mtime = 0.0
logger.info(f">>> [EDGE-REGISTRY] Initializing WP-24c Dual-Engine")
logger.info(f" - Vocab-Path: {self.full_vocab_path}")
logger.info(f" - Schema-Path: {self.full_schema_path}")
self.ensure_latest()
self.initialized = True
def ensure_latest(self):
"""Prüft Zeitstempel beider Dateien und führt bei Änderung Hot-Reload durch."""
try:
# Vokabular-Reload bei Änderung
if os.path.exists(self.full_vocab_path):
v_mtime = os.path.getmtime(self.full_vocab_path)
if v_mtime > self._last_vocab_mtime:
self._load_vocabulary()
self._last_vocab_mtime = v_mtime
# Schema-Reload bei Änderung
if os.path.exists(self.full_schema_path):
s_mtime = os.path.getmtime(self.full_schema_path)
if s_mtime > self._last_schema_mtime:
self._load_schema()
self._last_schema_mtime = s_mtime
except Exception as e:
logger.error(f"!!! [EDGE-REGISTRY] Sync failure: {e}")
def _load_vocabulary(self):
"""Parst edge_vocabulary.md: | Canonical | Inverse | Aliases | Description |"""
self.canonical_map.clear()
self.inverse_map.clear()
self.valid_types.clear()
# Regex für die 4-Spalten Struktur (WP-24c konform)
# Erwartet: | **`type`** | `inverse` | alias1, alias2 | ... |
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*`?([a-zA-Z0-9_-]+)`?\s*\|\s*([^|]+)\|")
try:
with open(self.full_vocab_path, "r", encoding="utf-8") as f:
c_count = 0
for line in f:
match = pattern.search(line)
if match:
canonical = match.group(1).strip().lower()
inverse = match.group(2).strip().lower()
aliases_raw = match.group(3).strip()
self.valid_types.add(canonical)
self.canonical_map[canonical] = canonical
if inverse:
self.inverse_map[canonical] = inverse
# Aliase verarbeiten (Normalisierung auf snake_case)
if aliases_raw and "Kein Alias" not in aliases_raw:
aliases = [a.strip() for a in aliases_raw.split(",") if a.strip()]
for alias in aliases:
clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_")
if clean_alias:
self.canonical_map[clean_alias] = canonical
c_count += 1
logger.info(f"✅ [VOCAB] Loaded {c_count} edge definitions and their inverses.")
except Exception as e:
logger.error(f"❌ [VOCAB ERROR] {e}")
def _load_schema(self):
"""Parst graph_schema.md: ## Source: `type` | Target | Typical | Prohibited |"""
self.topology.clear()
current_source = None
try:
with open(self.full_schema_path, "r", encoding="utf-8") as f:
for line in f:
# Header erkennen (Atomare Sektionen)
src_match = re.search(r"## Source:\s*`?([a-zA-Z0-9_-]+)`?", line)
if src_match:
current_source = src_match.group(1).strip().lower()
if current_source not in self.topology:
self.topology[current_source] = {}
continue
# Tabellenzeilen parsen
if current_source and "|" in line and not line.startswith("|-") and "Target" not in line:
cols = [c.strip().replace("`", "").lower() for c in line.split("|")]
if len(cols) >= 4:
target_type = cols[1]
typical_edges = [e.strip() for e in cols[2].split(",") if e.strip() and e != "-"]
prohibited_edges = [e.strip() for e in cols[3].split(",") if e.strip() and e != "-"]
if target_type not in self.topology[current_source]:
self.topology[current_source][target_type] = {"typical": set(), "prohibited": set()}
self.topology[current_source][target_type]["typical"].update(typical_edges)
self.topology[current_source][target_type]["prohibited"].update(prohibited_edges)
logger.info(f"✅ [SCHEMA] Topology matrix built for {len(self.topology)} source types.")
except Exception as e:
logger.error(f"❌ [SCHEMA ERROR] {e}")
def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str:
"""
Löst Aliasse auf kanonische Namen auf und schützt System-Kanten.
Erhalt der v0.8.0 Schutz-Logik.
"""
self.ensure_latest()
if not edge_type:
return "related_to"
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
ctx = context or {}
# Sicherheits-Gate: Schutz vor unerlaubter Nutzung von System-Kanten
restricted_provenance = ["explicit", "semantic_ai", "inherited", "global_pool", "rule"]
if provenance in restricted_provenance and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
self._log_issue(clean_type, f"forbidden_system_edge_manipulation_by_{provenance}", ctx)
return "related_to"
# System-Kanten sind NUR bei struktureller Provenienz (Code-generiert) erlaubt
if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
return clean_type
# Alias-Auflösung
return self.canonical_map.get(clean_type, clean_type)
def get_inverse(self, edge_type: str) -> str:
"""WP-24c: Gibt das symmetrische Gegenstück zurück."""
canonical = self.resolve(edge_type)
return self.inverse_map.get(canonical, "related_to")
def get_topology_info(self, source_type: str, target_type: str) -> Dict[str, List[str]]:
"""
WP-24c: Liefert kontextuelle Kanten-Empfehlungen für Obsidian und das Backend.
"""
self.ensure_latest()
# Hierarchische Suche: Spezifisch -> 'any' -> Empty
src_cfg = self.topology.get(source_type, self.topology.get("any", {}))
tgt_cfg = src_cfg.get(target_type, src_cfg.get("any", {"typical": set(), "prohibited": set()}))
return {
"typical": sorted(list(tgt_cfg["typical"])),
"prohibited": sorted(list(tgt_cfg["prohibited"]))
}
def _log_issue(self, edge_type: str, error_kind: str, ctx: dict):
"""JSONL-Logging für unbekannte/verbotene Kanten (Erhalt v0.8.0)."""
try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"edge_type": edge_type,
"error": error_kind,
"note_id": ctx.get("note_id", "unknown"),
"provenance": ctx.get("provenance", "unknown")
}
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception: pass
# Singleton Export
registry = EdgeRegistry()