diff --git a/app/services/edge_registry.py b/app/services/edge_registry.py index 6dad404..2d76c86 100644 --- a/app/services/edge_registry.py +++ b/app/services/edge_registry.py @@ -1,7 +1,8 @@ """ FILE: app/services/edge_registry.py -DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_edge_vocabulary.md'. - Implementiert WP-22 Teil B (Registry & Validation). +DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_User_Manual/01_edge_vocabulary.md'. + WP-22 Teil B: Registry & Validation. + FIX: Beachtet MINDNET_VAULT_ROOT aus .env korrekt. """ import re import os @@ -14,47 +15,46 @@ logger = logging.getLogger(__name__) class EdgeRegistry: _instance = None - def __new__(cls): + def __new__(cls, vault_root: Optional[str] = None): if cls._instance is None: cls._instance = super(EdgeRegistry, cls).__new__(cls) cls._instance.initialized = False return cls._instance - def __init__(self): - if self.initialized: return - # Pfad korrespondiert mit dem Frontmatter Pfad in 01_edge_vocabulary.md - self.vocab_path = "01_User_Manual/01_edge_vocabulary.md" + def __init__(self, vault_root: Optional[str] = None): + if self.initialized: + return + + # Priorität 1: Übergebener Parameter (z.B. für Tests) + # Priorität 2: Environment Variable (z.B. Production ./vault_master) + # Priorität 3: Default Fallback (./vault) + self.vault_root = vault_root or os.getenv("MINDNET_VAULT_ROOT", "./vault") + + # Der relative Pfad ist laut Spezifikation fest definiert + self.vocab_rel_path = os.path.join("01_User_Manual", "01_edge_vocabulary.md") + self.unknown_log_path = "data/logs/unknown_edges.jsonl" - self.canonical_map: Dict[str, str] = {} # alias -> canonical + self.canonical_map: Dict[str, str] = {} self.valid_types: Set[str] = set() + self._load_vocabulary() self.initialized = True def _load_vocabulary(self): - """Parst die Markdown-Tabelle in 01_edge_vocabulary.md""" - # Fallback Suche, falls das Skript aus Root oder app ausgeführt wird - candidates = [ - self.vocab_path, - os.path.join("..", self.vocab_path), - "vault/01_User_Manual/01_edge_vocabulary.md" - ] + """Parst die Markdown-Tabelle im Vault.""" + # Absoluten Pfad auflösen, um Verwirrung mit cwd zu vermeiden + full_path = os.path.abspath(os.path.join(self.vault_root, self.vocab_rel_path)) - found_path = None - for p in candidates: - if os.path.exists(p): - found_path = p - break - - if not found_path: - logger.warning(f"Edge Vocabulary not found (checked: {candidates}). Registry empty.") + if not os.path.exists(full_path): + # Wir loggen den vollen Pfad, damit Debugging einfacher ist + logger.warning(f"Edge Vocabulary NOT found at: {full_path}. Registry is empty.") return - # Regex für Tabellenzeilen: | **canonical** | alias, alias | ... - # Matcht: | **caused_by** | ausgelöst_durch, wegen | + # Regex: | **canonical** | alias, alias | pattern = re.compile(r"\|\s*\*\*([a-z_]+)\*\*\s*\|\s*([^|]+)\|") try: - with open(found_path, "r", encoding="utf-8") as f: + with open(full_path, "r", encoding="utf-8") as f: for line in f: match = pattern.search(line) if match: @@ -62,48 +62,37 @@ class EdgeRegistry: aliases_str = match.group(2).strip() self.valid_types.add(canonical) - self.canonical_map[canonical] = canonical # Self-ref + self.canonical_map[canonical] = canonical - # Aliases parsen if aliases_str and "Kein Alias" not in aliases_str: aliases = [a.strip() for a in aliases_str.split(",") if a.strip()] for alias in aliases: - # Clean up user inputs (e.g. remove backticks if present) clean_alias = alias.replace("`", "") self.canonical_map[clean_alias] = canonical - logger.info(f"EdgeRegistry loaded: {len(self.valid_types)} canonical types from {found_path}.") + logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} types.") except Exception as e: - logger.error(f"Failed to parse Edge Vocabulary: {e}") + logger.error(f"Failed to parse Edge Vocabulary at {full_path}: {e}") def resolve(self, edge_type: str) -> str: - """ - Normalisiert Kanten-Typen. Loggt unbekannte Typen, verwirft sie aber nicht (Learning System). - """ - if not edge_type: - return "related_to" - + if not edge_type: return "related_to" clean_type = edge_type.lower().strip().replace(" ", "_") - # 1. Lookup if clean_type in self.canonical_map: return self.canonical_map[clean_type] - # 2. Unknown Handling self._log_unknown(clean_type) - return clean_type # Pass-through (nicht verwerfen, aber loggen) + return clean_type def _log_unknown(self, edge_type: str): - """Schreibt unbekannte Typen in ein Append-Only Log für Review.""" try: os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) - # Einfaches JSONL Format entry = {"unknown_type": edge_type, "status": "new"} with open(self.unknown_log_path, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") except Exception: - pass # Silent fail bei Logging, darf Ingestion nicht stoppen + pass -# Singleton Accessor +# Default Instanz registry = EdgeRegistry() \ No newline at end of file diff --git a/tests/test_WP22_intelligence.py b/tests/test_WP22_intelligence.py index 0197f29..8df6907 100644 --- a/tests/test_WP22_intelligence.py +++ b/tests/test_WP22_intelligence.py @@ -1,95 +1,109 @@ -""" -FILE: app/services/edge_registry.py -DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_User_Manual/01_edge_vocabulary.md'. - Pfad-Logik gefixed: Nutzt MINDNET_VAULT_ROOT oder Parameter. -""" -import re +import unittest import os +import shutil import json -import logging -from typing import Dict, Optional, Set +from unittest.mock import patch -logger = logging.getLogger(__name__) +# --- FIX: Import der KORREKTEN Funktion und Klassen --- +from app.services.edge_registry import EdgeRegistry +from app.core.retriever import _compute_total_score, _get_status_multiplier -class EdgeRegistry: - _instance = None +class TestWP22Intelligence(unittest.TestCase): - def __new__(cls, vault_root: Optional[str] = None): - if cls._instance is None: - cls._instance = super(EdgeRegistry, cls).__new__(cls) - cls._instance.initialized = False - return cls._instance - - def __init__(self, vault_root: Optional[str] = None): - if self.initialized: - return + def setUp(self): + # 1. Test-Vault Struktur definieren + self.test_vault_root = os.path.abspath("tests/temp_vault") + self.user_manual_dir = os.path.join(self.test_vault_root, "01_User_Manual") + self.log_dir = os.path.abspath("tests/logs") + + # 2. Verzeichnisse erstellen + os.makedirs(self.user_manual_dir, exist_ok=True) + os.makedirs(self.log_dir, exist_ok=True) + + # 3. Dummy Vocabulary File am korrekten Ort erstellen + # Der Ort muss exakt '01_User_Manual/01_edge_vocabulary.md' relativ zum vault_root sein + self.vocab_file = os.path.join(self.user_manual_dir, "01_edge_vocabulary.md") + with open(self.vocab_file, "w", encoding="utf-8") as f: + f.write(""" +| **canonical** | Aliases | +| :--- | :--- | +| **caused_by** | ursache_ist, wegen | +| **next** | danach, folgt | + """) - # Priorität: 1. Argument -> 2. ENV -> 3. Default - self.vault_root = vault_root or os.getenv("MINDNET_VAULT_ROOT", "./vault") - - # Fester relativer Pfad laut Spec - self.vocab_rel_path = os.path.join("01_User_Manual", "01_edge_vocabulary.md") - - self.unknown_log_path = "data/logs/unknown_edges.jsonl" - self.canonical_map: Dict[str, str] = {} - self.valid_types: Set[str] = set() - - self._load_vocabulary() - self.initialized = True + # 4. Registry Reset & Init mit explizitem Vault Root + # Wir setzen das Singleton zurück, um sicherzustellen, dass es unseren Pfad nutzt + EdgeRegistry._instance = None + # Hier übergeben wir den Test-Vault-Pfad! Das Skript ignoriert jetzt die .env für den Test. + self.registry = EdgeRegistry(vault_root=self.test_vault_root) + self.registry.unknown_log_path = os.path.join(self.log_dir, "unknown_edges.jsonl") - def _load_vocabulary(self): - """Parst die Markdown-Tabelle im Vault.""" - # Absoluten Pfad auflösen - full_path = os.path.abspath(os.path.join(self.vault_root, self.vocab_rel_path)) + def tearDown(self): + if os.path.exists(self.test_vault_root): + shutil.rmtree(self.test_vault_root) + if os.path.exists("tests/logs"): + shutil.rmtree("tests/logs") + EdgeRegistry._instance = None + + def test_registry_resolution(self): + print("\n--- Test A: Registry & Alias Resolution ---") - if not os.path.exists(full_path): - logger.warning(f"Edge Vocabulary not found at: {full_path}. Registry empty.") - # Wir versuchen NICHT mehr diverse Pfade zu raten, um Konsistenz zu wahren. - return + # Prüfen ob Pfad korrekt übernommen wurde + expected_path = os.path.join(self.test_vault_root, "01_User_Manual", "01_edge_vocabulary.md") + # Da wir abspath nutzen, vergleichen wir normalized paths + self.assertTrue(os.path.exists(expected_path), "Test fixture file was not created correctly") + + if not self.registry.valid_types: + self.fail(f"Registry empty! Root used: {self.registry.vault_root}") - # Regex: | **canonical** | alias, alias | - pattern = re.compile(r"\|\s*\*\*([a-z_]+)\*\*\s*\|\s*([^|]+)\|") + self.assertEqual(self.registry.resolve("caused_by"), "caused_by") + self.assertEqual(self.registry.resolve("ursache_ist"), "caused_by") + + unknown = self.registry.resolve("mystery_link") + self.assertEqual(unknown, "mystery_link") + + # Prüfen ob Logging funktioniert + if os.path.exists(self.registry.unknown_log_path): + with open(self.registry.unknown_log_path, "r") as f: + self.assertIn("mystery_link", f.read()) + print("✅ Registry loaded from custom vault root & validated.") + else: + self.fail("Logfile was not created.") - try: - with open(full_path, "r", encoding="utf-8") as f: - for line in f: - match = pattern.search(line) - if match: - canonical = match.group(1).strip() - aliases_str = match.group(2).strip() - - self.valid_types.add(canonical) - self.canonical_map[canonical] = canonical - - if aliases_str and "Kein Alias" not in aliases_str: - aliases = [a.strip() for a in aliases_str.split(",") if a.strip()] - for alias in aliases: - clean_alias = alias.replace("`", "") - self.canonical_map[clean_alias] = canonical + def test_lifecycle_scoring(self): + print("\n--- Test B: Lifecycle Scoring Math ---") + base_sem = 0.9 + payload_draft = {"status": "draft"} + payload_stable = {"status": "stable"} + + with patch("app.core.retriever._get_scoring_weights", return_value=(1.0, 0.5, 0.0)): + mult_draft = _get_status_multiplier(payload_draft) + mult_stable = _get_status_multiplier(payload_stable) + + score_draft = base_sem * mult_draft + score_stable = base_sem * mult_stable + + self.assertLess(score_draft, base_sem) + self.assertGreater(score_stable, base_sem) + print("✅ Lifecycle scoring math verified.") + + def test_dynamic_boosting(self): + print("\n--- Test C: Dynamic Edge Boosting ---") + semantic_score = 0.5 + raw_edge_bonus = 1.0 + payload = {"status": "active"} + + with patch("app.core.retriever._get_scoring_weights", return_value=(1.0, 1.0, 0.0)): + score_normal, _, _ = _compute_total_score( + semantic_score, payload, edge_bonus=raw_edge_bonus, dynamic_edge_boosts=None + ) + boost_map = {"caused_by": 2.0} + score_boosted, _, _ = _compute_total_score( + semantic_score, payload, edge_bonus=raw_edge_bonus, dynamic_edge_boosts=boost_map + ) - logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} types.") + self.assertGreater(score_boosted, score_normal) + print("✅ Dynamic Boosting logic verified.") - except Exception as e: - logger.error(f"Failed to parse Edge Vocabulary at {full_path}: {e}") - - def resolve(self, edge_type: str) -> str: - if not edge_type: return "related_to" - clean_type = edge_type.lower().strip().replace(" ", "_") - - if clean_type in self.canonical_map: - return self.canonical_map[clean_type] - - self._log_unknown(clean_type) - return clean_type - - def _log_unknown(self, edge_type: str): - try: - os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) - entry = {"unknown_type": edge_type, "status": "new"} - with open(self.unknown_log_path, "a", encoding="utf-8") as f: - f.write(json.dumps(entry) + "\n") - except Exception: - pass - -# Default Instanz (nutzt ENV oder ./vault) -registry = EdgeRegistry() \ No newline at end of file +if __name__ == '__main__': + unittest.main() \ No newline at end of file