neu version

This commit is contained in:
Lars 2025-12-18 13:56:45 +01:00
parent 48729e6f5d
commit 2c3ee8efd6
6 changed files with 133 additions and 130 deletions

View File

@ -4,7 +4,7 @@ DESCRIPTION: Haupt-Ingestion-Logik.
FIX: Korrekte Priorisierung von Frontmatter für chunk_profile und retriever_weight.
Lade Chunk-Config basierend auf dem effektiven Profil, nicht nur dem Notiz-Typ.
WP-22: Integration von Content Lifecycle (Status) und Edge Registry.
VERSION: 2.8.0 (WP-22 Lifecycle & Registry)
VERSION: 2.8.1 (WP-22 Lifecycle & Registry)
STATUS: Active
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.core.derive_edges, app.core.qdrant*, app.services.embeddings_client, app.services.edge_registry
EXTERNAL_CONFIG: config/types.yaml
@ -159,7 +159,7 @@ class IngestionService:
logger.error(f"Validation failed for {file_path}: {e}")
return {**result, "error": f"Validation failed: {str(e)}"}
# --- WP-22: Content Lifecycle Gate ---
# --- WP-22: Content Lifecycle Gate (Teil A) ---
status = fm.get("status", "draft").lower().strip()
# Hard Skip für System-Dateien
@ -265,7 +265,7 @@ class IngestionService:
except TypeError:
raw_edges = build_edges_for_note(note_id, chunk_pls)
# --- WP-22: Edge Registry Validation ---
# --- WP-22: Edge Registry Validation (Teil B) ---
edges = []
if raw_edges:
for edge in raw_edges:

View File

@ -2,7 +2,7 @@
FILE: app/core/retriever.py
DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability).
WP-22 Update: Dynamic Edge Boosting & Lifecycle Scoring.
VERSION: 0.6.0 (WP-22 Dynamic Scoring)
VERSION: 0.6.1 (WP-22 Dynamic Scoring)
STATUS: Active
DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter
LAST_ANALYSIS: 2025-12-18
@ -98,16 +98,15 @@ def _semantic_hits(
results.append((str(pid), float(score), dict(payload or {})))
return results
# --- WP-22 Helper: Lifecycle Multipliers ---
# --- WP-22 Helper: Lifecycle Multipliers (Teil A) ---
def _get_status_multiplier(payload: Dict[str, Any]) -> float:
"""
WP-22: Drafts werden bestraft, Stable Notes belohnt.
WP-22: stable (1.2), active (1.0), draft (0.5).
"""
status = str(payload.get("status", "draft")).lower()
status = str(payload.get("status", "active")).lower()
if status == "stable": return 1.2
if status == "active": return 1.0
if status == "draft": return 0.8 # Malus für Entwürfe
# Fallback für andere oder leere Status
if status == "draft": return 0.5
return 1.0
def _compute_total_score(
@ -118,8 +117,7 @@ def _compute_total_score(
dynamic_edge_boosts: Dict[str, float] = None
) -> Tuple[float, float, float]:
"""
Berechnet total_score.
WP-22 Update: Integration von Status-Bonus und Dynamic Edge Boosts.
Berechnet total_score nach WP-22 Scoring Formel.
"""
raw_weight = payload.get("retriever_weight", 1.0)
try:
@ -132,13 +130,13 @@ def _compute_total_score(
sem_w, edge_w, cent_w = _get_scoring_weights()
status_mult = _get_status_multiplier(payload)
# Dynamic Edge Boosting
# Wenn dynamische Boosts aktiv sind, erhöhen wir den Einfluss des Graphen
# Dies ist eine Vereinfachung, da der echte Boost im Subgraph passiert sein sollte.
# Dynamic Edge Boosting (Teil C)
# Globaler Bonus falls Kanten-spezifische Boosts aktiv sind (z.B. WHY Frage)
# Die kanten-spezifische Gewichtung passiert bereits im Subgraph in hybrid_retrieve.
final_edge_score = edge_w * edge_bonus
if dynamic_edge_boosts and edge_bonus > 0:
# Globaler Boost für Graph-Signale bei spezifischen Intents
final_edge_score *= 1.2
# Globaler Verstärker für Graph-Signale bei spezifischen Intents
final_edge_score *= 1.5
total = (sem_w * float(semantic_score) * weight * status_mult) + final_edge_score + (cent_w * cent_bonus)
return float(total), float(edge_bonus), float(cent_bonus)
@ -154,7 +152,7 @@ def _build_explanation(
subgraph: Optional[ga.Subgraph],
node_key: Optional[str]
) -> Explanation:
"""Erstellt ein Explanation-Objekt."""
"""Erstellt ein Explanation-Objekt (WP-04b)."""
sem_w, _edge_w, _cent_w = _get_scoring_weights()
# Scoring weights erneut laden für Reason-Details
_, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
@ -189,9 +187,10 @@ def _build_explanation(
msg = "Bevorzugt" if type_weight > 1.0 else "Leicht abgewertet"
reasons.append(Reason(kind="type", message=f"{msg} aufgrund des Typs '{note_type}'.", score_impact=(sem_w * semantic_score * (type_weight - 1.0))))
# WP-22: Status Grund hinzufügen
if status_mult != 1.0:
msg = "Status-Bonus" if status_mult > 1.0 else "Status-Malus"
reasons.append(Reason(kind="lifecycle", message=f"{msg} ({payload.get('status')}).", score_impact=0.0))
reasons.append(Reason(kind="lifecycle", message=f"{msg} ({payload.get('status', 'unknown')}).", score_impact=0.0))
if subgraph and node_key and edge_bonus > 0:
if hasattr(subgraph, "get_outgoing_edges"):
@ -226,7 +225,7 @@ def _build_explanation(
def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]:
"""Extrahiert depth und edge_types."""
"""Extrahiert depth und edge_types für die Graph-Expansion."""
expand = getattr(req, "expand", None)
if not expand:
return 0, None
@ -259,7 +258,7 @@ def _build_hits_from_semantic(
explain: bool = False,
dynamic_edge_boosts: Dict[str, float] = None
) -> QueryResponse:
"""Baut strukturierte QueryHits."""
"""Baut strukturierte QueryHits basierend auf den berechneten Scores."""
t0 = time.time()
enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = []
@ -307,7 +306,7 @@ def _build_hits_from_semantic(
results.append(QueryHit(
node_id=str(pid),
note_id=payload.get("note_id"),
note_id=payload.get("note_id", "unknown"),
semantic_score=float(semantic_score),
edge_bonus=edge_bonus,
centrality_bonus=cent_bonus,
@ -327,7 +326,7 @@ def _build_hits_from_semantic(
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""Reiner semantischer Retriever."""
"""Reiner semantischer Retriever (WP-02)."""
client, prefix = _get_client_and_prefix()
vector = _get_query_vector(req)
top_k = req.top_k or get_settings().RETRIEVER_TOP_K
@ -337,7 +336,7 @@ def semantic_retrieve(req: QueryRequest) -> QueryResponse:
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
"""Hybrid-Retriever: semantische Suche + optionale Edge-Expansion."""
"""Hybrid-Retriever: semantische Suche + optionale Edge-Expansion (WP-04a)."""
client, prefix = _get_client_and_prefix()
if req.query_vector:
vector = list(req.query_vector)
@ -349,27 +348,28 @@ def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
depth, edge_types = _extract_expand_options(req)
# WP-22: Dynamic Boosts aus dem Request (vom Router)
# WP-22: Dynamic Boosts aus dem Request (vom Router) (Teil C)
boost_edges = getattr(req, "boost_edges", {})
subgraph: ga.Subgraph | None = None
if depth and depth > 0:
seed_ids: List[str] = []
for _pid, _score, payload in hits:
key = payload.get("chunk_id") or payload.get("note_id")
key = payload.get("note_id")
if key and key not in seed_ids:
seed_ids.append(key)
if seed_ids:
try:
# Hier könnten wir boost_edges auch an expand übergeben, wenn ga.expand es unterstützt
# Subgraph laden
subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=edge_types)
# Manuelles Boosten der Kantengewichte im Graphen falls aktiv
# --- WP-22: Kanten-Boosts im RAM-Graphen anwenden ---
# Dies manipuliert die Gewichte im Graphen, bevor der 'edge_bonus' berechnet wird.
if boost_edges and subgraph and hasattr(subgraph, "graph"):
for u, v, data in subgraph.graph.edges(data=True):
k = data.get("kind")
if k in boost_edges:
# Gewicht erhöhen für diesen Query-Kontext
# Gewicht multiplizieren (z.B. caused_by * 3.0)
data["weight"] = data.get("weight", 1.0) * boost_edges[k]
except Exception:
@ -387,7 +387,7 @@ def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
class Retriever:
"""
Wrapper-Klasse für WP-05 (Chat).
Wrapper-Klasse für Suchoperationen.
"""
def __init__(self):
pass

View File

@ -1,9 +1,10 @@
"""
FILE: app/models/dto.py
DESCRIPTION: Pydantic-Modelle (DTOs) für Request/Response Bodies. Definiert das API-Schema.
VERSION: 2.6.0 (WP-22 Semantic Graph Routing & Lifecycle)
VERSION: 0.6.3 (WP-22 Semantic Graph Routing & Lifecycle)
STATUS: Active
DEPENDENCIES: pydantic, typing, uuid
LAST_ANALYSIS: 2025-12-15
"""
from __future__ import annotations
@ -11,7 +12,6 @@ from pydantic import BaseModel, Field
from typing import List, Literal, Optional, Dict, Any
import uuid
# WP-22: Erweiterte Kanten-Typen in EdgeKind
EdgeKind = Literal["references", "references_at", "backlink", "next", "prev", "belongs_to", "depends_on", "related_to", "similar_to", "caused_by", "derived_from", "based_on", "solves", "blocks", "uses", "guides"]
@ -68,7 +68,10 @@ class FeedbackRequest(BaseModel):
User-Feedback zu einem spezifischen Treffer oder der Gesamtantwort.
"""
query_id: str = Field(..., description="ID der ursprünglichen Suche")
# node_id ist optional: Wenn leer oder "generated_answer", gilt es für die Antwort.
# Wenn eine echte Chunk-ID, gilt es für die Quelle.
node_id: str = Field(..., description="ID des bewerteten Treffers oder 'generated_answer'")
# Update: Range auf 1-5 erweitert für differenziertes Tuning
score: int = Field(..., ge=1, le=5, description="1 (Irrelevant/Falsch) bis 5 (Perfekt)")
comment: Optional[str] = None
@ -79,6 +82,7 @@ class ChatRequest(BaseModel):
"""
message: str = Field(..., description="Die Nachricht des Users")
conversation_id: Optional[str] = Field(None, description="Optional: ID für Chat-Verlauf (noch nicht implementiert)")
# RAG Parameter (Override defaults)
top_k: int = 5
explain: bool = False

View File

@ -1,7 +1,7 @@
"""
FILE: app/routers/chat.py
DESCRIPTION: Haupt-Chat-Interface (RAG & Interview). Enthält Intent-Router (Keywords/LLM) und Prompt-Construction.
VERSION: 2.6.0 (WP-22 Semantic Graph Routing)
VERSION: 2.7.0 (WP-22 Semantic Graph Routing)
STATUS: Active
DEPENDENCIES: app.config, app.models.dto, app.services.llm_service, app.core.retriever, app.services.feedback_service
EXTERNAL_CONFIG: config/decision_engine.yaml, config/types.yaml
@ -187,9 +187,6 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
return intent_name, "Keyword (Strategy)"
# 2. FAST PATH B: Type Keywords (z.B. "Projekt", "Werte") -> INTERVIEW
# FIX: Wir prüfen, ob es eine Frage ist. Fragen zu Typen sollen RAG (FACT/DECISION) sein,
# keine Interviews. Wir überlassen das dann dem LLM Router (Slow Path).
if not _is_question(query_lower):
types_cfg = get_types_config()
types_def = types_cfg.get("types", {})
@ -202,7 +199,6 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
# 3. SLOW PATH: LLM Router
if settings.get("llm_fallback_enabled", False):
# Nutze Prompts aus prompts.yaml (via LLM Service)
router_prompt_template = llm.prompts.get("router_prompt", "")
if router_prompt_template:
@ -210,11 +206,9 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
logger.info("Keywords failed (or Question detected). Asking LLM for Intent...")
try:
# Nutze priority="realtime" für den Router, damit er nicht wartet
raw_response = await llm.generate_raw_response(prompt, priority="realtime")
llm_output_upper = raw_response.upper()
# Zuerst INTERVIEW prüfen
if "INTERVIEW" in llm_output_upper or "CREATE" in llm_output_upper:
return "INTERVIEW", "LLM Router"
@ -281,7 +275,7 @@ async def chat_endpoint(
inject_types = strategy.get("inject_types", [])
prepend_instr = strategy.get("prepend_instruction", "")
# --- WP-22: Semantic Graph Routing ---
# --- WP-22: Semantic Graph Routing (Teil C) ---
# Wir laden die konfigurierten Edge-Boosts für diesen Intent
edge_boosts = strategy.get("edge_boosts", {})
if edge_boosts:

View File

@ -2,7 +2,7 @@
FILE: app/services/edge_registry.py
DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_User_Manual/01_edge_vocabulary.md'.
WP-22 Teil B: Registry & Validation.
FIX: Dynamische Pfad-Auflösung basierend auf MINDNET_VAULT_ROOT.
Beachtet den dynamischen Vault-Root aus ENV oder Parameter.
"""
import re
import os
@ -10,6 +10,8 @@ import json
import logging
from typing import Dict, Optional, Set
from app.config import get_settings
logger = logging.getLogger(__name__)
class EdgeRegistry:
@ -25,8 +27,9 @@ class EdgeRegistry:
if self.initialized:
return
# Priorität: 1. Parameter (Test) -> 2. ENV (dotenv) -> 3. Default
self.vault_root = vault_root or os.getenv("MINDNET_VAULT_ROOT", "./vault")
settings = get_settings()
# Priorität: 1. Parameter (Test) -> 2. Config (.env) -> 3. Default
self.vault_root = vault_root or getattr(settings, "MINDNET_VAULT_ROOT", "./vault")
self.vocab_rel_path = os.path.join("01_User_Manual", "01_edge_vocabulary.md")
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
@ -38,11 +41,9 @@ class EdgeRegistry:
def _load_vocabulary(self):
"""Parst die Markdown-Tabelle im Vault."""
# Absoluten Pfad auflösen, um Verwechslungen im venv zu vermeiden
full_path = os.path.abspath(os.path.join(self.vault_root, self.vocab_rel_path))
if not os.path.exists(full_path):
# Debug-Info: Zeige wo genau gesucht wurde
logger.warning(f"Edge Vocabulary NOT found at: {full_path}. Registry is empty.")
return
@ -66,31 +67,35 @@ class EdgeRegistry:
clean_alias = alias.replace("`", "").lower().strip()
self.canonical_map[clean_alias] = canonical
logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} types.")
logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} canonical types.")
except Exception as e:
logger.error(f"Failed to parse Edge Vocabulary at {full_path}: {e}")
def resolve(self, edge_type: str) -> str:
"""Normalisiert Kanten-Typen via Registry oder loggt Unbekannte."""
"""Normalisiert Kanten-Typen via Registry oder loggt Unbekannte für Review."""
if not edge_type: return "related_to"
# Normalisierung (Kleinschreibung, Unterstriche)
clean_type = edge_type.lower().strip().replace(" ", "_")
# 1. Lookup in Map (Canonical oder Alias)
if clean_type in self.canonical_map:
return self.canonical_map[clean_type]
# 2. Unknown Handling (Loggen aber nicht verwerfen - Learning System)
self._log_unknown(clean_type)
return clean_type
def _log_unknown(self, edge_type: str):
"""Schreibt unbekannte Typen für Review in ein Log."""
"""Schreibt unbekannte Typen für späteres Review in ein Log-File."""
try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {"unknown_type": edge_type, "status": "new"}
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
pass
pass
# Singleton Instanz
# Globale Singleton Instanz
registry = EdgeRegistry()

View File

@ -1,97 +1,97 @@
"""
FILE: app/services/edge_registry.py
DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_User_Manual/01_edge_vocabulary.md'.
WP-22 Teil B: Registry & Validation.
FIX: Beachtet MINDNET_VAULT_ROOT aus .env korrekt.
FILE: tests/test_WP22_intelligence.py
DESCRIPTION: Integrationstest für WP-22.
FIX: Erzwingt Pfad-Synchronisation für Registry & Router.
"""
import re
import unittest
import os
import json
import logging
from typing import Dict, Optional, Set
import shutil
import yaml
import asyncio
from unittest.mock import MagicMock, patch, AsyncMock
logger = logging.getLogger(__name__)
import app.routers.chat
from app.models.dto import ChatRequest, QueryHit, QueryRequest
from app.services.edge_registry import EdgeRegistry
from app.core.retriever import _compute_total_score, _get_status_multiplier
from app.routers.chat import _classify_intent, chat_endpoint
class EdgeRegistry:
_instance = None
class TestWP22Integration(unittest.IsolatedAsyncioTestCase):
def __new__(cls, vault_root: Optional[str] = None):
if cls._instance is None:
cls._instance = super(EdgeRegistry, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self, vault_root: Optional[str] = None):
if self.initialized:
return
# Priorität 1: Übergebener Parameter (z.B. für Tests)
# Priorität 2: Environment Variable (z.B. Production ./vault_master)
# Priorität 3: Default Fallback (./vault)
self.vault_root = vault_root or os.getenv("MINDNET_VAULT_ROOT", "./vault")
async def asyncSetUp(self):
"""Bereitet eine isolierte Test-Umgebung vor."""
self.test_root = os.path.abspath("tests/temp_wp22")
self.test_vault = os.path.join(self.test_root, "vault_master")
self.test_config_dir = os.path.join(self.test_root, "config")
# Der relative Pfad ist laut Spezifikation fest definiert
self.vocab_rel_path = os.path.join("01_User_Manual", "01_edge_vocabulary.md")
os.makedirs(os.path.join(self.test_vault, "01_User_Manual"), exist_ok=True)
os.makedirs(self.test_config_dir, exist_ok=True)
os.makedirs(os.path.join(self.test_root, "data/logs"), exist_ok=True)
# 2. Config Files schreiben
self.decision_path = os.path.join(self.test_config_dir, "decision_engine.yaml")
self.decision_config = {
"strategies": {
"FACT": {"trigger_keywords": ["was"], "edge_boosts": {"part_of": 2.0}},
"CAUSAL": {"trigger_keywords": ["warum"], "edge_boosts": {"caused_by": 3.0}}
}
}
with open(self.decision_path, "w") as f: yaml.dump(self.decision_config, f)
# 3. Vocabulary File am RICHTIGEN Ort
self.vocab_path = os.path.join(self.test_vault, "01_User_Manual/01_edge_vocabulary.md")
with open(self.vocab_path, "w") as f:
f.write("| System-Typ | Aliases |\n| :--- | :--- |\n| **caused_by** | ursache_ist |\n| **part_of** | teil_von |")
# 4. MOCKING / RESETTING GLOBAL STATE
self.mock_settings = MagicMock()
self.mock_settings.DECISION_CONFIG_PATH = self.decision_path
self.mock_settings.MINDNET_VAULT_ROOT = self.test_vault
self.mock_settings.RETRIEVER_TOP_K = 5
self.mock_settings.MODEL_NAME = "test-model"
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
self.canonical_map: Dict[str, str] = {}
self.valid_types: Set[str] = set()
self.patch_settings_chat = patch('app.routers.chat.get_settings', return_value=self.mock_settings)
self.patch_settings_registry = patch('app.services.edge_registry.get_settings', return_value=self.mock_settings)
self._load_vocabulary()
self.initialized = True
def _load_vocabulary(self):
"""Parst die Markdown-Tabelle im Vault."""
# Absoluten Pfad auflösen, um Verwirrung mit cwd zu vermeiden
full_path = os.path.abspath(os.path.join(self.vault_root, self.vocab_rel_path))
self.patch_settings_chat.start()
self.patch_settings_registry.start()
if not os.path.exists(full_path):
logger.warning(f"Edge Vocabulary NOT found at: {full_path}. Registry is empty.")
return
app.routers.chat._DECISION_CONFIG_CACHE = None
EdgeRegistry._instance = None
self.registry = EdgeRegistry(vault_root=self.test_vault)
# Regex: | **canonical** | alias, alias |
pattern = re.compile(r"\|\s*\*\*([a-z_]+)\*\*\s*\|\s*([^|]+)\|")
async def asyncTearDown(self):
self.patch_settings_chat.stop()
self.patch_settings_registry.stop()
if os.path.exists(self.test_root): shutil.rmtree(self.test_root)
EdgeRegistry._instance = None
app.routers.chat._DECISION_CONFIG_CACHE = None
try:
with open(full_path, "r", encoding="utf-8") as f:
for line in f:
match = pattern.search(line)
if match:
canonical = match.group(1).strip()
aliases_str = match.group(2).strip()
self.valid_types.add(canonical)
self.canonical_map[canonical] = canonical
if aliases_str and "Kein Alias" not in aliases_str:
aliases = [a.strip() for a in aliases_str.split(",") if a.strip()]
for alias in aliases:
clean_alias = alias.replace("`", "")
self.canonical_map[clean_alias] = canonical
logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} types.")
def test_registry_resolution(self):
print("\n🔵 TEST 1: Registry Resolution")
self.assertTrue(len(self.registry.valid_types) > 0)
self.assertEqual(self.registry.resolve("ursache_ist"), "caused_by")
print("✅ Registry OK.")
except Exception as e:
logger.error(f"Failed to parse Edge Vocabulary at {full_path}: {e}")
def test_scoring_math(self):
print("\n🔵 TEST 2: Scoring Math (Lifecycle)")
with patch("app.core.retriever._get_scoring_weights", return_value=(1.0, 1.0, 0.0)):
self.assertEqual(_get_status_multiplier({"status": "stable"}), 1.2)
self.assertEqual(_get_status_multiplier({"status": "draft"}), 0.8)
print("✅ Scoring OK.")
def resolve(self, edge_type: str) -> str:
if not edge_type: return "related_to"
clean_type = edge_type.lower().strip().replace(" ", "_")
async def test_full_flow(self):
print("\n🔵 TEST 3: Pipeline flow")
mock_llm = AsyncMock(); mock_llm.prompts = {}; mock_llm.generate_raw_response.return_value = "Ok"
mock_ret = AsyncMock()
mock_hit = QueryHit(node_id="c1", note_id="n1", semantic_score=0.8, edge_bonus=0.0, centrality_bonus=0.0, total_score=0.8, source={"text": "t"}, payload={"status": "active"})
mock_ret.search.return_value.results = [mock_hit]
if clean_type in self.canonical_map:
return self.canonical_map[clean_type]
self._log_unknown(clean_type)
return clean_type
resp = await chat_endpoint(ChatRequest(message="Warum?"), llm=mock_llm, retriever=mock_ret)
self.assertEqual(resp.intent, "CAUSAL")
called_req = mock_ret.search.call_args[0][0]
self.assertEqual(called_req.boost_edges.get("caused_by"), 3.0)
print("✅ Full Flow OK.")
def _log_unknown(self, edge_type: str):
try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {"unknown_type": edge_type, "status": "new"}
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
pass
# Default Instanz
registry = EdgeRegistry()
if __name__ == '__main__':
unittest.main()