148 lines
5.8 KiB
Python
148 lines
5.8 KiB
Python
"""
|
|
FILE: app/services/edge_registry.py
|
|
DESCRIPTION: Single Source of Truth für Kanten-Typen mit dynamischem Reload.
|
|
WP-22: Transparente Status-Meldungen & Robuste Pfad-Auflösung.
|
|
VERSION: 0.7.3 (Fix: Path Normalization & Env Priority)
|
|
"""
|
|
import re
|
|
import os
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Dict, Optional, Set, Tuple
|
|
|
|
from app.config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EdgeRegistry:
|
|
_instance = None
|
|
FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"}
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls._instance is None:
|
|
cls._instance = super(EdgeRegistry, cls).__new__(cls)
|
|
cls._instance.initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self, vault_root: Optional[str] = None):
|
|
if self.initialized:
|
|
return
|
|
|
|
settings = get_settings()
|
|
# Hole ENV-Werte und entferne potenzielle Anführungszeichen
|
|
env_vocab_path = os.getenv("MINDNET_VOCAB_PATH")
|
|
if env_vocab_path:
|
|
env_vocab_path = env_vocab_path.strip('"').strip("'")
|
|
|
|
env_vault_root = os.getenv("MINDNET_VAULT_ROOT") or getattr(settings, "MINDNET_VAULT_ROOT", "./vault")
|
|
if env_vault_root:
|
|
env_vault_root = env_vault_root.strip('"').strip("'")
|
|
|
|
# Pfad-Priorität: 1. Direkter Pfad (ENV) -> 2. Vault-Struktur
|
|
if env_vocab_path:
|
|
self.full_vocab_path = os.path.abspath(env_vocab_path)
|
|
else:
|
|
possible_paths = [
|
|
os.path.join(env_vault_root, "_system", "dictionary", "edge_vocabulary.md"),
|
|
os.path.join(env_vault_root, "01_User_Manual", "01_edge_vocabulary.md")
|
|
]
|
|
self.full_vocab_path = None
|
|
for p in possible_paths:
|
|
if os.path.exists(p):
|
|
self.full_vocab_path = os.path.abspath(p)
|
|
break
|
|
|
|
if not self.full_vocab_path:
|
|
self.full_vocab_path = os.path.abspath(possible_paths[0])
|
|
|
|
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
|
|
self.canonical_map: Dict[str, str] = {}
|
|
self.valid_types: Set[str] = set()
|
|
self._last_mtime = 0.0
|
|
|
|
print(f"\n>>> [EDGE-REGISTRY] Initializing with Path: {self.full_vocab_path}", flush=True)
|
|
self.ensure_latest()
|
|
self.initialized = True
|
|
|
|
def ensure_latest(self):
|
|
"""Prüft den Zeitstempel und lädt bei Bedarf neu."""
|
|
if not os.path.exists(self.full_vocab_path):
|
|
print(f"!!! [EDGE-REGISTRY ERROR] File not found: {self.full_vocab_path} !!!", flush=True)
|
|
return
|
|
|
|
current_mtime = os.path.getmtime(self.full_vocab_path)
|
|
if current_mtime > self._last_mtime:
|
|
self._load_vocabulary()
|
|
self._last_mtime = current_mtime
|
|
|
|
def _load_vocabulary(self):
|
|
"""Parst das Wörterbuch[cite: 25]."""
|
|
self.canonical_map.clear()
|
|
self.valid_types.clear()
|
|
|
|
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|")
|
|
|
|
try:
|
|
with open(self.full_vocab_path, "r", encoding="utf-8") as f:
|
|
c_types, c_aliases = 0, 0
|
|
for line in f:
|
|
match = pattern.search(line)
|
|
if match:
|
|
canonical = match.group(1).strip().lower()
|
|
aliases_str = match.group(2).strip()
|
|
|
|
self.valid_types.add(canonical)
|
|
self.canonical_map[canonical] = canonical
|
|
c_types += 1
|
|
|
|
if aliases_str and "Kein Alias" not in aliases_str:
|
|
aliases = [a.strip() for a in aliases_str.split(",") if a.strip()]
|
|
for alias in aliases:
|
|
clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_")
|
|
self.canonical_map[clean_alias] = canonical
|
|
c_aliases += 1
|
|
|
|
print(f"=== [EDGE-REGISTRY SUCCESS] Loaded {c_types} Canonical Types and {c_aliases} Aliases ===", flush=True)
|
|
|
|
except Exception as e:
|
|
print(f"!!! [EDGE-REGISTRY FATAL] Error reading file: {e} !!!", flush=True)
|
|
|
|
def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str:
|
|
"""Validierung mit Fundort-Logging."""
|
|
self.ensure_latest()
|
|
if not edge_type: return "related_to"
|
|
|
|
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
|
|
ctx = context or {}
|
|
|
|
if provenance == "explicit" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
|
|
self._log_issue(clean_type, "forbidden_system_usage", ctx)
|
|
return "related_to"
|
|
|
|
if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
|
|
return clean_type
|
|
|
|
if clean_type in self.canonical_map:
|
|
return self.canonical_map[clean_type]
|
|
|
|
self._log_issue(clean_type, "unknown_type", ctx)
|
|
return clean_type
|
|
|
|
def _log_issue(self, edge_type: str, error_kind: str, ctx: dict):
|
|
"""JSONL-Logging[cite: 1]."""
|
|
try:
|
|
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
|
|
entry = {
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"edge_type": edge_type,
|
|
"error": error_kind,
|
|
"file": ctx.get("file", "unknown"),
|
|
"line": ctx.get("line", "unknown"),
|
|
"note_id": ctx.get("note_id", "unknown")
|
|
}
|
|
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
except Exception: pass
|
|
|
|
registry = EdgeRegistry() |