mindnet/app/services/edge_registry.py

126 lines
4.8 KiB
Python

"""
FILE: app/services/edge_registry.py
DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_edge_vocabulary.md'.
WP-22 Teil B: Registry & Validation.
FIX: Beachtet MINDNET_VOCAB_PATH und MINDNET_VAULT_ROOT aus .env sicher.
VERSION: 0.6.5 (Path Stability Update)
"""
import re
import os
import json
import logging
from typing import Dict, Optional, Set
from app.config import get_settings
logger = logging.getLogger(__name__)
class EdgeRegistry:
_instance = None
def __new__(cls, vault_root: Optional[str] = None):
if cls._instance is None:
cls._instance = super(EdgeRegistry, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self, vault_root: Optional[str] = None):
if self.initialized:
return
settings = get_settings()
# --- WP-22 Pfad-Priorisierung ---
# 1. Expliziter Vokabular-Pfad (höchste Prio)
vocab_env = os.getenv("MINDNET_VOCAB_PATH")
# 2. Vault Root Auflösung
# Wir prüfen os.getenv direkt, falls Pydantic Settings (settings.MINDNET_VAULT_ROOT)
# das Feld nicht definiert haben.
self.vault_root = (
vault_root or
vocab_env or
os.getenv("MINDNET_VAULT_ROOT") or
getattr(settings, "MINDNET_VAULT_ROOT", "./vault")
)
# Falls vocab_env eine komplette Datei ist, nutzen wir sie direkt,
# ansonsten bauen wir den Pfad relativ zur vault_root.
if vocab_env and vocab_env.endswith(".md"):
self.full_vocab_path = os.path.abspath(vocab_env)
else:
self.full_vocab_path = os.path.abspath(
os.path.join(self.vault_root, "01_User_Manual", "01_edge_vocabulary.md")
)
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
self.canonical_map: Dict[str, str] = {}
self.valid_types: Set[str] = set()
self._load_vocabulary()
self.initialized = True
def _load_vocabulary(self):
"""Parst die Markdown-Tabelle im Vault."""
if not os.path.exists(self.full_vocab_path):
logger.warning(f"Edge Vocabulary NOT found at: {self.full_vocab_path}. Registry is empty.")
return
# Regex für Markdown Tabellen: | **canonical** | Aliases | ...
pattern = re.compile(r"\|\s*\*\*([a-z_]+)\*\*\s*\|\s*([^|]+)\|")
try:
with open(self.full_vocab_path, "r", encoding="utf-8") as f:
count_types = 0
count_aliases = 0
for line in f:
match = pattern.search(line)
if match:
canonical = match.group(1).strip()
aliases_str = match.group(2).strip()
self.valid_types.add(canonical)
self.canonical_map[canonical] = canonical
count_types += 1
if aliases_str and "Kein Alias" not in aliases_str:
# Aliase säubern und mappen
aliases = [a.strip() for a in aliases_str.split(",") if a.strip()]
for alias in aliases:
clean_alias = alias.replace("`", "").lower().strip()
self.canonical_map[clean_alias] = canonical
count_aliases += 1
logger.info(f"EdgeRegistry: Loaded {count_types} types and {count_aliases} aliases from {self.full_vocab_path}.")
except Exception as e:
logger.error(f"Failed to parse Edge Vocabulary at {self.full_vocab_path}: {e}")
def resolve(self, edge_type: str) -> str:
"""Normalisiert Kanten-Typen via Registry oder loggt Unbekannte."""
if not edge_type:
return "related_to"
# Normalisierung: Kleinschreibung und Unterstriche statt Leerzeichen
clean_type = edge_type.lower().strip().replace(" ", "_")
# 1. Direkter Match (Kanonisch oder Alias)
if clean_type in self.canonical_map:
return self.canonical_map[clean_type]
# 2. Unbekannt -> Logging & Fallback
self._log_unknown(clean_type)
return clean_type
def _log_unknown(self, edge_type: str):
"""Schreibt unbekannte Typen für Review in ein Log."""
try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {"unknown_type": edge_type, "status": "new"}
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
pass
# Singleton Instanz
registry = EdgeRegistry()