227 lines
10 KiB
Python
227 lines
10 KiB
Python
"""
|
|
FILE: app/services/edge_registry.py
|
|
DESCRIPTION: Single Source of Truth für Kanten-Typen, Symmetrien und Graph-Topologie.
|
|
WP-24c: Implementierung der dualen Registry (Vocabulary & Schema).
|
|
Unterstützt dynamisches Laden von Inversen und kontextuellen Vorschlägen.
|
|
VERSION: 1.0.1 (WP-24c: Verified Atomic Topology)
|
|
STATUS: Active
|
|
"""
|
|
import re
|
|
import os
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Dict, Optional, Set, Tuple, List
|
|
|
|
from app.config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EdgeRegistry:
|
|
"""
|
|
Zentraler Verwalter für das Kanten-Vokabular und das Graph-Schema.
|
|
Singleton-Pattern zur Sicherstellung konsistenter Validierung.
|
|
"""
|
|
_instance = None
|
|
|
|
# SYSTEM-SCHUTZ: Diese Kanten sind für die strukturelle Integrität reserviert (v0.8.0 Erhalt)
|
|
FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"}
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls._instance is None:
|
|
cls._instance = super(EdgeRegistry, cls).__new__(cls)
|
|
cls._instance.initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if self.initialized:
|
|
return
|
|
|
|
settings = get_settings()
|
|
|
|
# --- Pfad-Konfiguration (WP-24c: Variable Pfade für Vault-Spiegelung) ---
|
|
# Das Vokabular (Semantik)
|
|
self.full_vocab_path = os.path.abspath(settings.MINDNET_VOCAB_PATH)
|
|
|
|
# Das Schema (Topologie) - Konfigurierbar via ENV: MINDNET_SCHEMA_PATH
|
|
schema_env = getattr(settings, "MINDNET_SCHEMA_PATH", None)
|
|
if schema_env:
|
|
self.full_schema_path = os.path.abspath(schema_env)
|
|
else:
|
|
# Fallback: Liegt im selben Verzeichnis wie das Vokabular
|
|
self.full_schema_path = os.path.join(os.path.dirname(self.full_vocab_path), "graph_schema.md")
|
|
|
|
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
|
|
|
|
# --- Interne Datenspeicher ---
|
|
self.canonical_map: Dict[str, str] = {}
|
|
self.inverse_map: Dict[str, str] = {}
|
|
self.valid_types: Set[str] = set()
|
|
|
|
# Topologie: source_type -> { target_type -> {"typical": set, "prohibited": set} }
|
|
self.topology: Dict[str, Dict[str, Dict[str, Set[str]]]] = {}
|
|
|
|
self._last_vocab_mtime = 0.0
|
|
self._last_schema_mtime = 0.0
|
|
|
|
logger.info(f">>> [EDGE-REGISTRY] Initializing WP-24c Dual-Engine")
|
|
logger.info(f" - Vocab-Path: {self.full_vocab_path}")
|
|
logger.info(f" - Schema-Path: {self.full_schema_path}")
|
|
|
|
self.ensure_latest()
|
|
self.initialized = True
|
|
|
|
def ensure_latest(self):
|
|
"""Prüft Zeitstempel beider Dateien und führt bei Änderung Hot-Reload durch."""
|
|
try:
|
|
# Vokabular-Reload bei Änderung
|
|
if os.path.exists(self.full_vocab_path):
|
|
v_mtime = os.path.getmtime(self.full_vocab_path)
|
|
if v_mtime > self._last_vocab_mtime:
|
|
self._load_vocabulary()
|
|
self._last_vocab_mtime = v_mtime
|
|
|
|
# Schema-Reload bei Änderung
|
|
if os.path.exists(self.full_schema_path):
|
|
s_mtime = os.path.getmtime(self.full_schema_path)
|
|
if s_mtime > self._last_schema_mtime:
|
|
self._load_schema()
|
|
self._last_schema_mtime = s_mtime
|
|
|
|
except Exception as e:
|
|
logger.error(f"!!! [EDGE-REGISTRY] Sync failure: {e}")
|
|
|
|
def _load_vocabulary(self):
|
|
"""Parst edge_vocabulary.md: | Canonical | Inverse | Aliases | Description |"""
|
|
self.canonical_map.clear()
|
|
self.inverse_map.clear()
|
|
self.valid_types.clear()
|
|
|
|
# Regex für die 4-Spalten Struktur (WP-24c konform)
|
|
# Erwartet: | **`type`** | `inverse` | alias1, alias2 | ... |
|
|
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*`?([a-zA-Z0-9_-]+)`?\s*\|\s*([^|]+)\|")
|
|
|
|
try:
|
|
with open(self.full_vocab_path, "r", encoding="utf-8") as f:
|
|
c_count = 0
|
|
for line in f:
|
|
match = pattern.search(line)
|
|
if match:
|
|
canonical = match.group(1).strip().lower()
|
|
inverse = match.group(2).strip().lower()
|
|
aliases_raw = match.group(3).strip()
|
|
|
|
self.valid_types.add(canonical)
|
|
self.canonical_map[canonical] = canonical
|
|
if inverse:
|
|
self.inverse_map[canonical] = inverse
|
|
|
|
# Aliase verarbeiten (Normalisierung auf snake_case)
|
|
if aliases_raw and "Kein Alias" not in aliases_raw:
|
|
aliases = [a.strip() for a in aliases_raw.split(",") if a.strip()]
|
|
for alias in aliases:
|
|
clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_")
|
|
if clean_alias:
|
|
self.canonical_map[clean_alias] = canonical
|
|
c_count += 1
|
|
|
|
logger.info(f"✅ [VOCAB] Loaded {c_count} edge definitions and their inverses.")
|
|
except Exception as e:
|
|
logger.error(f"❌ [VOCAB ERROR] {e}")
|
|
|
|
def _load_schema(self):
|
|
"""Parst graph_schema.md: ## Source: `type` | Target | Typical | Prohibited |"""
|
|
self.topology.clear()
|
|
current_source = None
|
|
|
|
try:
|
|
with open(self.full_schema_path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
# Header erkennen (Atomare Sektionen)
|
|
src_match = re.search(r"## Source:\s*`?([a-zA-Z0-9_-]+)`?", line)
|
|
if src_match:
|
|
current_source = src_match.group(1).strip().lower()
|
|
if current_source not in self.topology:
|
|
self.topology[current_source] = {}
|
|
continue
|
|
|
|
# Tabellenzeilen parsen
|
|
if current_source and "|" in line and not line.startswith("|-") and "Target" not in line:
|
|
cols = [c.strip().replace("`", "").lower() for c in line.split("|")]
|
|
if len(cols) >= 4:
|
|
target_type = cols[1]
|
|
typical_edges = [e.strip() for e in cols[2].split(",") if e.strip() and e != "-"]
|
|
prohibited_edges = [e.strip() for e in cols[3].split(",") if e.strip() and e != "-"]
|
|
|
|
if target_type not in self.topology[current_source]:
|
|
self.topology[current_source][target_type] = {"typical": set(), "prohibited": set()}
|
|
|
|
self.topology[current_source][target_type]["typical"].update(typical_edges)
|
|
self.topology[current_source][target_type]["prohibited"].update(prohibited_edges)
|
|
|
|
logger.info(f"✅ [SCHEMA] Topology matrix built for {len(self.topology)} source types.")
|
|
except Exception as e:
|
|
logger.error(f"❌ [SCHEMA ERROR] {e}")
|
|
|
|
def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str:
|
|
"""
|
|
Löst Aliasse auf kanonische Namen auf und schützt System-Kanten.
|
|
Erhalt der v0.8.0 Schutz-Logik.
|
|
"""
|
|
self.ensure_latest()
|
|
if not edge_type:
|
|
return "related_to"
|
|
|
|
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
|
|
ctx = context or {}
|
|
|
|
# Sicherheits-Gate: Schutz vor unerlaubter Nutzung von System-Kanten
|
|
restricted_provenance = ["explicit", "semantic_ai", "inherited", "global_pool", "rule"]
|
|
if provenance in restricted_provenance and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
|
|
self._log_issue(clean_type, f"forbidden_system_edge_manipulation_by_{provenance}", ctx)
|
|
return "related_to"
|
|
|
|
# System-Kanten sind NUR bei struktureller Provenienz (Code-generiert) erlaubt
|
|
if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
|
|
return clean_type
|
|
|
|
# Alias-Auflösung
|
|
return self.canonical_map.get(clean_type, clean_type)
|
|
|
|
def get_inverse(self, edge_type: str) -> str:
|
|
"""WP-24c: Gibt das symmetrische Gegenstück zurück."""
|
|
canonical = self.resolve(edge_type)
|
|
return self.inverse_map.get(canonical, "related_to")
|
|
|
|
def get_topology_info(self, source_type: str, target_type: str) -> Dict[str, List[str]]:
|
|
"""
|
|
WP-24c: Liefert kontextuelle Kanten-Empfehlungen für Obsidian und das Backend.
|
|
"""
|
|
self.ensure_latest()
|
|
|
|
# Hierarchische Suche: Spezifisch -> 'any' -> Empty
|
|
src_cfg = self.topology.get(source_type, self.topology.get("any", {}))
|
|
tgt_cfg = src_cfg.get(target_type, src_cfg.get("any", {"typical": set(), "prohibited": set()}))
|
|
|
|
return {
|
|
"typical": sorted(list(tgt_cfg["typical"])),
|
|
"prohibited": sorted(list(tgt_cfg["prohibited"]))
|
|
}
|
|
|
|
def _log_issue(self, edge_type: str, error_kind: str, ctx: dict):
|
|
"""JSONL-Logging für unbekannte/verbotene Kanten (Erhalt v0.8.0)."""
|
|
try:
|
|
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
|
|
entry = {
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"edge_type": edge_type,
|
|
"error": error_kind,
|
|
"note_id": ctx.get("note_id", "unknown"),
|
|
"provenance": ctx.get("provenance", "unknown")
|
|
}
|
|
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
except Exception: pass
|
|
|
|
# Singleton Export
|
|
registry = EdgeRegistry() |