edge_registry hergestellt

This commit is contained in:
Lars 2025-12-23 16:05:40 +01:00
parent 18780e5330
commit 49b454d2ec

View File

@ -1,16 +1,15 @@
""" """
FILE: app/services/edge_registry.py FILE: app/services/edge_registry.py
DESCRIPTION: Single Source of Truth für Kanten-Typen. DESCRIPTION: Single Source of Truth für Kanten-Typen mit dynamischem Reload.
FIX: Regex angepasst auf Format **`canonical`** (Bold + Backticks). WP-22: Transparente Status-Meldungen für Dev-Umgebungen.
VERSION: 0.6.10 (Regex Precision Update) VERSION: 0.7.2 (Fix: Restore Console Visibility & Entry Counts)
""" """
import re import re
import os import os
import json import json
import logging import logging
from typing import Dict, Optional, Set import time
from typing import Dict, Optional, Set, Tuple
print(">>> MODULE_LOAD: edge_registry.py initialized <<<", flush=True)
from app.config import get_settings from app.config import get_settings
@ -18,6 +17,8 @@ logger = logging.getLogger(__name__)
class EdgeRegistry: class EdgeRegistry:
_instance = None _instance = None
# System-Kanten, die NIEMALS manuell im Markdown stehen dürfen
FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"}
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance is None: if cls._instance is None:
@ -33,31 +34,50 @@ class EdgeRegistry:
env_vocab_path = os.getenv("MINDNET_VOCAB_PATH") env_vocab_path = os.getenv("MINDNET_VOCAB_PATH")
env_vault_root = os.getenv("MINDNET_VAULT_ROOT") or getattr(settings, "MINDNET_VAULT_ROOT", "./vault") env_vault_root = os.getenv("MINDNET_VAULT_ROOT") or getattr(settings, "MINDNET_VAULT_ROOT", "./vault")
# Pfad-Priorität: 1. ENV -> 2. _system/dictionary -> 3. 01_User_Manual
if env_vocab_path: if env_vocab_path:
self.full_vocab_path = os.path.abspath(env_vocab_path) self.full_vocab_path = os.path.abspath(env_vocab_path)
else: else:
self.full_vocab_path = os.path.abspath( possible_paths = [
os.path.join(env_vault_root, "_system", "dictionary", "edge_vocabulary.md"),
os.path.join(env_vault_root, "01_User_Manual", "01_edge_vocabulary.md") os.path.join(env_vault_root, "01_User_Manual", "01_edge_vocabulary.md")
) ]
self.full_vocab_path = None
for p in possible_paths:
if os.path.exists(p):
self.full_vocab_path = os.path.abspath(p)
break
if not self.full_vocab_path:
self.full_vocab_path = os.path.abspath(possible_paths[0])
self.unknown_log_path = "data/logs/unknown_edges.jsonl" self.unknown_log_path = "data/logs/unknown_edges.jsonl"
self.canonical_map: Dict[str, str] = {} self.canonical_map: Dict[str, str] = {}
self.valid_types: Set[str] = set() self.valid_types: Set[str] = set()
self._last_mtime = 0.0
self._load_vocabulary() # Initialer Lade-Versuch mit Konsolen-Feedback
print(f"\n>>> [EDGE-REGISTRY] Initializing with Path: {self.full_vocab_path}", flush=True)
self.ensure_latest()
self.initialized = True self.initialized = True
def _load_vocabulary(self): def ensure_latest(self):
"""Parst die Markdown-Tabelle im Vault.""" """Prüft den Zeitstempel und lädt bei Bedarf neu."""
print(f">>> CHECK: Loading Vocabulary from {self.full_vocab_path}", flush=True)
if not os.path.exists(self.full_vocab_path): if not os.path.exists(self.full_vocab_path):
print(f"!!! [DICT-ERROR] File not found: {self.full_vocab_path} !!!", flush=True) print(f"!!! [EDGE-REGISTRY ERROR] File not found: {self.full_vocab_path} !!!", flush=True)
return return
# WP-22 Precision Regex: current_mtime = os.path.getmtime(self.full_vocab_path)
# Sucht nach | **`typ`** | oder | **typ** | if current_mtime > self._last_mtime:
# Die Backticks `? sind jetzt optional enthalten. self._load_vocabulary()
self._last_mtime = current_mtime
def _load_vocabulary(self):
"""Parst das Wörterbuch und meldet die Anzahl der gelesenen Einträge."""
self.canonical_map.clear()
self.valid_types.clear()
# Regex deckt | **canonical** | Aliase | ab
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|") pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|")
try: try:
@ -74,36 +94,58 @@ class EdgeRegistry:
c_types += 1 c_types += 1
if aliases_str and "Kein Alias" not in aliases_str: if aliases_str and "Kein Alias" not in aliases_str:
# Aliase säubern (entfernt Backticks auch hier)
aliases = [a.strip() for a in aliases_str.split(",") if a.strip()] aliases = [a.strip() for a in aliases_str.split(",") if a.strip()]
for alias in aliases: for alias in aliases:
# Normalisierung: Kleinschreibung und Unterstriche
clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_") clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_")
self.canonical_map[clean_alias] = canonical self.canonical_map[clean_alias] = canonical
c_aliases += 1 c_aliases += 1
if c_types == 0: # Erfolgskontrolle für das Dev-Terminal
print("!!! [DICT-WARN] Pattern mismatch! Ensure types are **`canonical`** or **canonical**. !!!", flush=True) print(f"=== [EDGE-REGISTRY SUCCESS] Loaded {c_types} Canonical Types and {c_aliases} Aliases ===", flush=True)
else: logger.info(f"Registry reloaded from {self.full_vocab_path}")
print(f"=== [DICT-SUCCESS] Registered {c_types} Canonical Types and {c_aliases} Aliases ===", flush=True)
except Exception as e: except Exception as e:
print(f"!!! [DICT-FATAL] Error reading file: {e} !!!", flush=True) print(f"!!! [EDGE-REGISTRY FATAL] Error reading file: {e} !!!", flush=True)
logger.error(f"Error reading vocabulary: {e}")
def resolve(self, edge_type: str) -> str: def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str:
"""Normalisiert Kanten-Typen via Registry oder loggt Unbekannte.""" """Validierung mit Fundort-Logging."""
self.ensure_latest()
if not edge_type: return "related_to" if not edge_type: return "related_to"
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
ctx = context or {}
# 1. Schutz der Systemkanten (Verbot für manuelle Nutzung)
if provenance == "explicit" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
self._log_issue(clean_type, "forbidden_system_usage", ctx)
return "related_to"
# 2. Akzeptanz interner Strukturkanten
if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
return clean_type
# 3. Mapping via Wörterbuch
if clean_type in self.canonical_map: if clean_type in self.canonical_map:
return self.canonical_map[clean_type] return self.canonical_map[clean_type]
self._log_unknown(clean_type) # 4. Unbekannte Kante
self._log_issue(clean_type, "unknown_type", ctx)
return clean_type return clean_type
def _log_unknown(self, edge_type: str): def _log_issue(self, edge_type: str, error_kind: str, ctx: dict):
"""Detailliertes JSONL-Logging für Debugging."""
try: try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {"unknown_type": edge_type, "status": "new"} entry = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"edge_type": edge_type,
"error": error_kind,
"file": ctx.get("file", "unknown"),
"line": ctx.get("line", "unknown"),
"note_id": ctx.get("note_id", "unknown")
}
with open(self.unknown_log_path, "a", encoding="utf-8") as f: with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n") f.write(json.dumps(entry) + "\n")
except Exception: pass except Exception: pass