mindnet/app/services/edge_registry.py
2025-12-23 18:11:05 +01:00

155 lines
6.2 KiB
Python

"""
FILE: app/services/edge_registry.py
DESCRIPTION: Single Source of Truth für Kanten-Typen mit dynamischem Reload.
WP-22: Fix für absolute Pfade außerhalb des Vaults (Prod-Dictionary).
VERSION: 0.7.4 (Fix: Absolute Path Escaping & Quote Stripping)
"""
import re
import os
import json
import logging
import time
from typing import Dict, Optional, Set, Tuple
from app.config import get_settings
logger = logging.getLogger(__name__)
class EdgeRegistry:
_instance = None
FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(EdgeRegistry, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self, vault_root: Optional[str] = None):
if self.initialized:
return
settings = get_settings()
# 1. ENV-Werte laden und von Anführungszeichen bereinigen
env_vocab_path = os.getenv("MINDNET_VOCAB_PATH")
if env_vocab_path:
env_vocab_path = env_vocab_path.strip('"').strip("'")
env_vault_root = os.getenv("MINDNET_VAULT_ROOT") or getattr(settings, "MINDNET_VAULT_ROOT", "./vault")
if env_vault_root:
env_vault_root = env_vault_root.strip('"').strip("'")
# 2. Pfad-Priorität: Wenn absolut (/), dann direkt nutzen!
if env_vocab_path:
if env_vocab_path.startswith("/"):
# Absoluter Pfad (z.B. aus Produktion)
self.full_vocab_path = env_vocab_path
else:
# Relativer Pfad zum aktuellen Verzeichnis oder Vault
self.full_vocab_path = os.path.abspath(env_vocab_path)
else:
# Fallback: Suche im Vault-Verzeichnis
possible_paths = [
os.path.join(env_vault_root, "_system", "dictionary", "edge_vocabulary.md"),
os.path.join(env_vault_root, "01_User_Manual", "01_edge_vocabulary.md")
]
self.full_vocab_path = None
for p in possible_paths:
if os.path.exists(p):
self.full_vocab_path = os.path.abspath(p)
break
if not self.full_vocab_path:
self.full_vocab_path = os.path.abspath(possible_paths[0])
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
self.canonical_map: Dict[str, str] = {}
self.valid_types: Set[str] = set()
self._last_mtime = 0.0
print(f"\n>>> [EDGE-REGISTRY] Initializing with Path: {self.full_vocab_path}", flush=True)
self.ensure_latest()
self.initialized = True
def ensure_latest(self):
"""Prüft den Zeitstempel und lädt bei Bedarf neu."""
if not os.path.exists(self.full_vocab_path):
print(f"!!! [EDGE-REGISTRY ERROR] File not found: {self.full_vocab_path} !!!", flush=True)
return
current_mtime = os.path.getmtime(self.full_vocab_path)
if current_mtime > self._last_mtime:
self._load_vocabulary()
self._last_mtime = current_mtime
def _load_vocabulary(self):
"""Parst das Wörterbuch."""
self.canonical_map.clear()
self.valid_types.clear()
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|")
try:
with open(self.full_vocab_path, "r", encoding="utf-8") as f:
c_types, c_aliases = 0, 0
for line in f:
match = pattern.search(line)
if match:
canonical = match.group(1).strip().lower()
aliases_str = match.group(2).strip()
self.valid_types.add(canonical)
self.canonical_map[canonical] = canonical
c_types += 1
if aliases_str and "Kein Alias" not in aliases_str:
aliases = [a.strip() for a in aliases_str.split(",") if a.strip()]
for alias in aliases:
clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_")
self.canonical_map[clean_alias] = canonical
c_aliases += 1
print(f"=== [EDGE-REGISTRY SUCCESS] Loaded {c_types} Canonical Types and {c_aliases} Aliases ===", flush=True)
except Exception as e:
print(f"!!! [EDGE-REGISTRY FATAL] Error reading file: {e} !!!", flush=True)
def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str:
"""Validierung mit Fundort-Logging."""
self.ensure_latest()
if not edge_type: return "related_to"
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
ctx = context or {}
if provenance == "explicit" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
self._log_issue(clean_type, "forbidden_system_usage", ctx)
return "related_to"
if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
return clean_type
if clean_type in self.canonical_map:
return self.canonical_map[clean_type]
self._log_issue(clean_type, "unknown_type", ctx)
return clean_type
def _log_issue(self, edge_type: str, error_kind: str, ctx: dict):
"""Detailliertes JSONL-Logging für Debugging."""
try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"edge_type": edge_type,
"error": error_kind,
"file": ctx.get("file", "unknown"),
"line": ctx.get("line", "unknown"),
"note_id": ctx.get("note_id", "unknown")
}
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception: pass
registry = EdgeRegistry()