mindnet/app/services/edge_registry.py
2025-12-23 07:31:33 +01:00

127 lines
5.0 KiB
Python

import re
import os
import json
import logging
import time
from typing import Dict, Optional, Set, Tuple
from app.config import get_settings
logger = logging.getLogger(__name__)
class EdgeRegistry:
_instance = None
# System-Kanten, die NIEMALS manuell im Markdown stehen dürfen
FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(EdgeRegistry, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self, vault_root: Optional[str] = None):
if self.initialized:
return
settings = get_settings()
env_vocab_path = os.getenv("MINDNET_VOCAB_PATH")
env_vault_root = os.getenv("MINDNET_VAULT_ROOT") or getattr(settings, "MINDNET_VAULT_ROOT", "./vault")
if env_vocab_path:
self.full_vocab_path = os.path.abspath(env_vocab_path)
else:
self.full_vocab_path = os.path.abspath(
os.path.join(env_vault_root, "01_User_Manual", "01_edge_vocabulary.md")
)
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
self.canonical_map: Dict[str, str] = {}
self.valid_types: Set[str] = set()
self._last_mtime = 0.0 # Für dynamisches Neuladen
self.ensure_latest()
self.initialized = True
def ensure_latest(self):
"""Prüft den Zeitstempel der Datei und lädt ggf. neu."""
if not os.path.exists(self.full_vocab_path):
return
current_mtime = os.path.getmtime(self.full_vocab_path)
if current_mtime > self._last_mtime:
self._load_vocabulary()
self._last_mtime = current_mtime
def _load_vocabulary(self):
"""Parst das Wörterbuch und baut die Canonical-Map auf."""
self.canonical_map.clear()
self.valid_types.clear()
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|")
try:
with open(self.full_vocab_path, "r", encoding="utf-8") as f:
for line in f:
match = pattern.search(line)
if match:
canonical = match.group(1).strip().lower()
aliases_str = match.group(2).strip()
self.valid_types.add(canonical)
self.canonical_map[canonical] = canonical
if aliases_str and "Kein Alias" not in aliases_str:
aliases = [a.strip() for a in aliases_str.split(",") if a.strip()]
for alias in aliases:
clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_")
self.canonical_map[clean_alias] = canonical
logger.info(f"EdgeRegistry reloaded: {len(self.valid_types)} types.")
except Exception as e:
logger.error(f"Error reading vocabulary: {e}")
def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str:
"""
Validiert Kanten gegen System-Regeln und Wörterbuch.
- provenance: 'explicit' (User/Markdown) oder 'structure' (System-Pipeline)
- context: {'file': str, 'line': int}
"""
self.ensure_latest()
if not edge_type: return "related_to"
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
ctx = context or {}
# 1. Check auf "verbotene" manuelle Nutzung von Systemkanten
if provenance == "explicit" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
self._log_issue(clean_type, "forbidden_system_usage", ctx)
return "related_to" # Fallback, um System-Integrität zu schützen
# 2. Stillschweigende Akzeptanz für echte System-Struktur
if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
return clean_type
# 3. Mapping gegen das Wörterbuch
if clean_type in self.canonical_map:
return self.canonical_map[clean_type]
# 4. Unbekannte Kante loggen
self._log_issue(clean_type, "unknown_type", ctx)
return clean_type
def _log_issue(self, edge_type: str, error_kind: str, ctx: dict):
"""Schreibt detaillierte Fehler mit Fundort."""
try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"edge_type": edge_type,
"error": error_kind,
"file": ctx.get("file", "unknown"),
"line": ctx.get("line", "unknown")
}
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception: pass
registry = EdgeRegistry()