diff --git a/app/core/ingestion.py b/app/core/ingestion.py index 6b3f232..5834f3d 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -4,7 +4,7 @@ DESCRIPTION: Haupt-Ingestion-Logik. FIX: Korrekte Priorisierung von Frontmatter für chunk_profile und retriever_weight. Lade Chunk-Config basierend auf dem effektiven Profil, nicht nur dem Notiz-Typ. WP-22: Integration von Content Lifecycle (Status) und Edge Registry. -VERSION: 2.8.0 (WP-22 Lifecycle & Registry) +VERSION: 2.8.1 (WP-22 Lifecycle & Registry) STATUS: Active DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.core.derive_edges, app.core.qdrant*, app.services.embeddings_client, app.services.edge_registry EXTERNAL_CONFIG: config/types.yaml @@ -159,7 +159,7 @@ class IngestionService: logger.error(f"Validation failed for {file_path}: {e}") return {**result, "error": f"Validation failed: {str(e)}"} - # --- WP-22: Content Lifecycle Gate --- + # --- WP-22: Content Lifecycle Gate (Teil A) --- status = fm.get("status", "draft").lower().strip() # Hard Skip für System-Dateien @@ -265,7 +265,7 @@ class IngestionService: except TypeError: raw_edges = build_edges_for_note(note_id, chunk_pls) - # --- WP-22: Edge Registry Validation --- + # --- WP-22: Edge Registry Validation (Teil B) --- edges = [] if raw_edges: for edge in raw_edges: diff --git a/app/core/retriever.py b/app/core/retriever.py index 05fc309..45f28a7 100644 --- a/app/core/retriever.py +++ b/app/core/retriever.py @@ -2,7 +2,7 @@ FILE: app/core/retriever.py DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability). WP-22 Update: Dynamic Edge Boosting & Lifecycle Scoring. -VERSION: 0.6.0 (WP-22 Dynamic Scoring) +VERSION: 0.6.1 (WP-22 Dynamic Scoring) STATUS: Active DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter LAST_ANALYSIS: 2025-12-18 @@ -98,16 +98,15 @@ def _semantic_hits( results.append((str(pid), float(score), dict(payload or {}))) return results -# --- WP-22 Helper: Lifecycle Multipliers --- +# --- WP-22 Helper: Lifecycle Multipliers (Teil A) --- def _get_status_multiplier(payload: Dict[str, Any]) -> float: """ - WP-22: Drafts werden bestraft, Stable Notes belohnt. + WP-22: stable (1.2), active (1.0), draft (0.5). """ - status = str(payload.get("status", "draft")).lower() + status = str(payload.get("status", "active")).lower() if status == "stable": return 1.2 if status == "active": return 1.0 - if status == "draft": return 0.8 # Malus für Entwürfe - # Fallback für andere oder leere Status + if status == "draft": return 0.5 return 1.0 def _compute_total_score( @@ -118,8 +117,7 @@ def _compute_total_score( dynamic_edge_boosts: Dict[str, float] = None ) -> Tuple[float, float, float]: """ - Berechnet total_score. - WP-22 Update: Integration von Status-Bonus und Dynamic Edge Boosts. + Berechnet total_score nach WP-22 Scoring Formel. """ raw_weight = payload.get("retriever_weight", 1.0) try: @@ -132,13 +130,13 @@ def _compute_total_score( sem_w, edge_w, cent_w = _get_scoring_weights() status_mult = _get_status_multiplier(payload) - # Dynamic Edge Boosting - # Wenn dynamische Boosts aktiv sind, erhöhen wir den Einfluss des Graphen - # Dies ist eine Vereinfachung, da der echte Boost im Subgraph passiert sein sollte. + # Dynamic Edge Boosting (Teil C) + # Globaler Bonus falls Kanten-spezifische Boosts aktiv sind (z.B. WHY Frage) + # Die kanten-spezifische Gewichtung passiert bereits im Subgraph in hybrid_retrieve. final_edge_score = edge_w * edge_bonus if dynamic_edge_boosts and edge_bonus > 0: - # Globaler Boost für Graph-Signale bei spezifischen Intents - final_edge_score *= 1.2 + # Globaler Verstärker für Graph-Signale bei spezifischen Intents + final_edge_score *= 1.5 total = (sem_w * float(semantic_score) * weight * status_mult) + final_edge_score + (cent_w * cent_bonus) return float(total), float(edge_bonus), float(cent_bonus) @@ -154,7 +152,7 @@ def _build_explanation( subgraph: Optional[ga.Subgraph], node_key: Optional[str] ) -> Explanation: - """Erstellt ein Explanation-Objekt.""" + """Erstellt ein Explanation-Objekt (WP-04b).""" sem_w, _edge_w, _cent_w = _get_scoring_weights() # Scoring weights erneut laden für Reason-Details _, edge_w_cfg, cent_w_cfg = _get_scoring_weights() @@ -189,9 +187,10 @@ def _build_explanation( msg = "Bevorzugt" if type_weight > 1.0 else "Leicht abgewertet" reasons.append(Reason(kind="type", message=f"{msg} aufgrund des Typs '{note_type}'.", score_impact=(sem_w * semantic_score * (type_weight - 1.0)))) + # WP-22: Status Grund hinzufügen if status_mult != 1.0: msg = "Status-Bonus" if status_mult > 1.0 else "Status-Malus" - reasons.append(Reason(kind="lifecycle", message=f"{msg} ({payload.get('status')}).", score_impact=0.0)) + reasons.append(Reason(kind="lifecycle", message=f"{msg} ({payload.get('status', 'unknown')}).", score_impact=0.0)) if subgraph and node_key and edge_bonus > 0: if hasattr(subgraph, "get_outgoing_edges"): @@ -226,7 +225,7 @@ def _build_explanation( def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]: - """Extrahiert depth und edge_types.""" + """Extrahiert depth und edge_types für die Graph-Expansion.""" expand = getattr(req, "expand", None) if not expand: return 0, None @@ -259,7 +258,7 @@ def _build_hits_from_semantic( explain: bool = False, dynamic_edge_boosts: Dict[str, float] = None ) -> QueryResponse: - """Baut strukturierte QueryHits.""" + """Baut strukturierte QueryHits basierend auf den berechneten Scores.""" t0 = time.time() enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = [] @@ -307,7 +306,7 @@ def _build_hits_from_semantic( results.append(QueryHit( node_id=str(pid), - note_id=payload.get("note_id"), + note_id=payload.get("note_id", "unknown"), semantic_score=float(semantic_score), edge_bonus=edge_bonus, centrality_bonus=cent_bonus, @@ -327,7 +326,7 @@ def _build_hits_from_semantic( def semantic_retrieve(req: QueryRequest) -> QueryResponse: - """Reiner semantischer Retriever.""" + """Reiner semantischer Retriever (WP-02).""" client, prefix = _get_client_and_prefix() vector = _get_query_vector(req) top_k = req.top_k or get_settings().RETRIEVER_TOP_K @@ -337,7 +336,7 @@ def semantic_retrieve(req: QueryRequest) -> QueryResponse: def hybrid_retrieve(req: QueryRequest) -> QueryResponse: - """Hybrid-Retriever: semantische Suche + optionale Edge-Expansion.""" + """Hybrid-Retriever: semantische Suche + optionale Edge-Expansion (WP-04a).""" client, prefix = _get_client_and_prefix() if req.query_vector: vector = list(req.query_vector) @@ -349,27 +348,28 @@ def hybrid_retrieve(req: QueryRequest) -> QueryResponse: depth, edge_types = _extract_expand_options(req) - # WP-22: Dynamic Boosts aus dem Request (vom Router) + # WP-22: Dynamic Boosts aus dem Request (vom Router) (Teil C) boost_edges = getattr(req, "boost_edges", {}) subgraph: ga.Subgraph | None = None if depth and depth > 0: seed_ids: List[str] = [] for _pid, _score, payload in hits: - key = payload.get("chunk_id") or payload.get("note_id") + key = payload.get("note_id") if key and key not in seed_ids: seed_ids.append(key) if seed_ids: try: - # Hier könnten wir boost_edges auch an expand übergeben, wenn ga.expand es unterstützt + # Subgraph laden subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=edge_types) - # Manuelles Boosten der Kantengewichte im Graphen falls aktiv + # --- WP-22: Kanten-Boosts im RAM-Graphen anwenden --- + # Dies manipuliert die Gewichte im Graphen, bevor der 'edge_bonus' berechnet wird. if boost_edges and subgraph and hasattr(subgraph, "graph"): for u, v, data in subgraph.graph.edges(data=True): k = data.get("kind") if k in boost_edges: - # Gewicht erhöhen für diesen Query-Kontext + # Gewicht multiplizieren (z.B. caused_by * 3.0) data["weight"] = data.get("weight", 1.0) * boost_edges[k] except Exception: @@ -387,7 +387,7 @@ def hybrid_retrieve(req: QueryRequest) -> QueryResponse: class Retriever: """ - Wrapper-Klasse für WP-05 (Chat). + Wrapper-Klasse für Suchoperationen. """ def __init__(self): pass diff --git a/app/models/dto.py b/app/models/dto.py index e5cca55..4267028 100644 --- a/app/models/dto.py +++ b/app/models/dto.py @@ -1,9 +1,10 @@ """ FILE: app/models/dto.py DESCRIPTION: Pydantic-Modelle (DTOs) für Request/Response Bodies. Definiert das API-Schema. -VERSION: 2.6.0 (WP-22 Semantic Graph Routing & Lifecycle) +VERSION: 0.6.3 (WP-22 Semantic Graph Routing & Lifecycle) STATUS: Active DEPENDENCIES: pydantic, typing, uuid +LAST_ANALYSIS: 2025-12-15 """ from __future__ import annotations @@ -11,7 +12,6 @@ from pydantic import BaseModel, Field from typing import List, Literal, Optional, Dict, Any import uuid -# WP-22: Erweiterte Kanten-Typen in EdgeKind EdgeKind = Literal["references", "references_at", "backlink", "next", "prev", "belongs_to", "depends_on", "related_to", "similar_to", "caused_by", "derived_from", "based_on", "solves", "blocks", "uses", "guides"] @@ -68,7 +68,10 @@ class FeedbackRequest(BaseModel): User-Feedback zu einem spezifischen Treffer oder der Gesamtantwort. """ query_id: str = Field(..., description="ID der ursprünglichen Suche") + # node_id ist optional: Wenn leer oder "generated_answer", gilt es für die Antwort. + # Wenn eine echte Chunk-ID, gilt es für die Quelle. node_id: str = Field(..., description="ID des bewerteten Treffers oder 'generated_answer'") + # Update: Range auf 1-5 erweitert für differenziertes Tuning score: int = Field(..., ge=1, le=5, description="1 (Irrelevant/Falsch) bis 5 (Perfekt)") comment: Optional[str] = None @@ -79,6 +82,7 @@ class ChatRequest(BaseModel): """ message: str = Field(..., description="Die Nachricht des Users") conversation_id: Optional[str] = Field(None, description="Optional: ID für Chat-Verlauf (noch nicht implementiert)") + # RAG Parameter (Override defaults) top_k: int = 5 explain: bool = False diff --git a/app/routers/chat.py b/app/routers/chat.py index d03fc43..3e5678c 100644 --- a/app/routers/chat.py +++ b/app/routers/chat.py @@ -1,7 +1,7 @@ """ FILE: app/routers/chat.py DESCRIPTION: Haupt-Chat-Interface (RAG & Interview). Enthält Intent-Router (Keywords/LLM) und Prompt-Construction. -VERSION: 2.6.0 (WP-22 Semantic Graph Routing) +VERSION: 2.7.0 (WP-22 Semantic Graph Routing) STATUS: Active DEPENDENCIES: app.config, app.models.dto, app.services.llm_service, app.core.retriever, app.services.feedback_service EXTERNAL_CONFIG: config/decision_engine.yaml, config/types.yaml @@ -187,9 +187,6 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]: return intent_name, "Keyword (Strategy)" # 2. FAST PATH B: Type Keywords (z.B. "Projekt", "Werte") -> INTERVIEW - # FIX: Wir prüfen, ob es eine Frage ist. Fragen zu Typen sollen RAG (FACT/DECISION) sein, - # keine Interviews. Wir überlassen das dann dem LLM Router (Slow Path). - if not _is_question(query_lower): types_cfg = get_types_config() types_def = types_cfg.get("types", {}) @@ -202,7 +199,6 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]: # 3. SLOW PATH: LLM Router if settings.get("llm_fallback_enabled", False): - # Nutze Prompts aus prompts.yaml (via LLM Service) router_prompt_template = llm.prompts.get("router_prompt", "") if router_prompt_template: @@ -210,11 +206,9 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]: logger.info("Keywords failed (or Question detected). Asking LLM for Intent...") try: - # Nutze priority="realtime" für den Router, damit er nicht wartet raw_response = await llm.generate_raw_response(prompt, priority="realtime") llm_output_upper = raw_response.upper() - # Zuerst INTERVIEW prüfen if "INTERVIEW" in llm_output_upper or "CREATE" in llm_output_upper: return "INTERVIEW", "LLM Router" @@ -281,7 +275,7 @@ async def chat_endpoint( inject_types = strategy.get("inject_types", []) prepend_instr = strategy.get("prepend_instruction", "") - # --- WP-22: Semantic Graph Routing --- + # --- WP-22: Semantic Graph Routing (Teil C) --- # Wir laden die konfigurierten Edge-Boosts für diesen Intent edge_boosts = strategy.get("edge_boosts", {}) if edge_boosts: diff --git a/app/services/edge_registry.py b/app/services/edge_registry.py index be63332..5c102b1 100644 --- a/app/services/edge_registry.py +++ b/app/services/edge_registry.py @@ -2,7 +2,7 @@ FILE: app/services/edge_registry.py DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_User_Manual/01_edge_vocabulary.md'. WP-22 Teil B: Registry & Validation. - FIX: Dynamische Pfad-Auflösung basierend auf MINDNET_VAULT_ROOT. + Beachtet den dynamischen Vault-Root aus ENV oder Parameter. """ import re import os @@ -10,6 +10,8 @@ import json import logging from typing import Dict, Optional, Set +from app.config import get_settings + logger = logging.getLogger(__name__) class EdgeRegistry: @@ -25,8 +27,9 @@ class EdgeRegistry: if self.initialized: return - # Priorität: 1. Parameter (Test) -> 2. ENV (dotenv) -> 3. Default - self.vault_root = vault_root or os.getenv("MINDNET_VAULT_ROOT", "./vault") + settings = get_settings() + # Priorität: 1. Parameter (Test) -> 2. Config (.env) -> 3. Default + self.vault_root = vault_root or getattr(settings, "MINDNET_VAULT_ROOT", "./vault") self.vocab_rel_path = os.path.join("01_User_Manual", "01_edge_vocabulary.md") self.unknown_log_path = "data/logs/unknown_edges.jsonl" @@ -38,11 +41,9 @@ class EdgeRegistry: def _load_vocabulary(self): """Parst die Markdown-Tabelle im Vault.""" - # Absoluten Pfad auflösen, um Verwechslungen im venv zu vermeiden full_path = os.path.abspath(os.path.join(self.vault_root, self.vocab_rel_path)) if not os.path.exists(full_path): - # Debug-Info: Zeige wo genau gesucht wurde logger.warning(f"Edge Vocabulary NOT found at: {full_path}. Registry is empty.") return @@ -66,31 +67,35 @@ class EdgeRegistry: clean_alias = alias.replace("`", "").lower().strip() self.canonical_map[clean_alias] = canonical - logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} types.") + logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} canonical types.") except Exception as e: logger.error(f"Failed to parse Edge Vocabulary at {full_path}: {e}") def resolve(self, edge_type: str) -> str: - """Normalisiert Kanten-Typen via Registry oder loggt Unbekannte.""" + """Normalisiert Kanten-Typen via Registry oder loggt Unbekannte für Review.""" if not edge_type: return "related_to" + + # Normalisierung (Kleinschreibung, Unterstriche) clean_type = edge_type.lower().strip().replace(" ", "_") + # 1. Lookup in Map (Canonical oder Alias) if clean_type in self.canonical_map: return self.canonical_map[clean_type] + # 2. Unknown Handling (Loggen aber nicht verwerfen - Learning System) self._log_unknown(clean_type) return clean_type def _log_unknown(self, edge_type: str): - """Schreibt unbekannte Typen für Review in ein Log.""" + """Schreibt unbekannte Typen für späteres Review in ein Log-File.""" try: os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) entry = {"unknown_type": edge_type, "status": "new"} with open(self.unknown_log_path, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") except Exception: - pass + pass -# Singleton Instanz +# Globale Singleton Instanz registry = EdgeRegistry() \ No newline at end of file diff --git a/tests/test_WP22_intelligence.py b/tests/test_WP22_intelligence.py index 5c75497..1809c13 100644 --- a/tests/test_WP22_intelligence.py +++ b/tests/test_WP22_intelligence.py @@ -1,97 +1,97 @@ """ -FILE: app/services/edge_registry.py -DESCRIPTION: Single Source of Truth für Kanten-Typen. Parst '01_User_Manual/01_edge_vocabulary.md'. - WP-22 Teil B: Registry & Validation. - FIX: Beachtet MINDNET_VAULT_ROOT aus .env korrekt. +FILE: tests/test_WP22_intelligence.py +DESCRIPTION: Integrationstest für WP-22. + FIX: Erzwingt Pfad-Synchronisation für Registry & Router. """ -import re +import unittest import os -import json -import logging -from typing import Dict, Optional, Set +import shutil +import yaml +import asyncio +from unittest.mock import MagicMock, patch, AsyncMock -logger = logging.getLogger(__name__) +import app.routers.chat +from app.models.dto import ChatRequest, QueryHit, QueryRequest +from app.services.edge_registry import EdgeRegistry +from app.core.retriever import _compute_total_score, _get_status_multiplier +from app.routers.chat import _classify_intent, chat_endpoint -class EdgeRegistry: - _instance = None +class TestWP22Integration(unittest.IsolatedAsyncioTestCase): - def __new__(cls, vault_root: Optional[str] = None): - if cls._instance is None: - cls._instance = super(EdgeRegistry, cls).__new__(cls) - cls._instance.initialized = False - return cls._instance - - def __init__(self, vault_root: Optional[str] = None): - if self.initialized: - return - - # Priorität 1: Übergebener Parameter (z.B. für Tests) - # Priorität 2: Environment Variable (z.B. Production ./vault_master) - # Priorität 3: Default Fallback (./vault) - self.vault_root = vault_root or os.getenv("MINDNET_VAULT_ROOT", "./vault") + async def asyncSetUp(self): + """Bereitet eine isolierte Test-Umgebung vor.""" + self.test_root = os.path.abspath("tests/temp_wp22") + self.test_vault = os.path.join(self.test_root, "vault_master") + self.test_config_dir = os.path.join(self.test_root, "config") - # Der relative Pfad ist laut Spezifikation fest definiert - self.vocab_rel_path = os.path.join("01_User_Manual", "01_edge_vocabulary.md") + os.makedirs(os.path.join(self.test_vault, "01_User_Manual"), exist_ok=True) + os.makedirs(self.test_config_dir, exist_ok=True) + os.makedirs(os.path.join(self.test_root, "data/logs"), exist_ok=True) + + # 2. Config Files schreiben + self.decision_path = os.path.join(self.test_config_dir, "decision_engine.yaml") + self.decision_config = { + "strategies": { + "FACT": {"trigger_keywords": ["was"], "edge_boosts": {"part_of": 2.0}}, + "CAUSAL": {"trigger_keywords": ["warum"], "edge_boosts": {"caused_by": 3.0}} + } + } + with open(self.decision_path, "w") as f: yaml.dump(self.decision_config, f) + + # 3. Vocabulary File am RICHTIGEN Ort + self.vocab_path = os.path.join(self.test_vault, "01_User_Manual/01_edge_vocabulary.md") + with open(self.vocab_path, "w") as f: + f.write("| System-Typ | Aliases |\n| :--- | :--- |\n| **caused_by** | ursache_ist |\n| **part_of** | teil_von |") + + # 4. MOCKING / RESETTING GLOBAL STATE + self.mock_settings = MagicMock() + self.mock_settings.DECISION_CONFIG_PATH = self.decision_path + self.mock_settings.MINDNET_VAULT_ROOT = self.test_vault + self.mock_settings.RETRIEVER_TOP_K = 5 + self.mock_settings.MODEL_NAME = "test-model" - self.unknown_log_path = "data/logs/unknown_edges.jsonl" - self.canonical_map: Dict[str, str] = {} - self.valid_types: Set[str] = set() + self.patch_settings_chat = patch('app.routers.chat.get_settings', return_value=self.mock_settings) + self.patch_settings_registry = patch('app.services.edge_registry.get_settings', return_value=self.mock_settings) - self._load_vocabulary() - self.initialized = True - - def _load_vocabulary(self): - """Parst die Markdown-Tabelle im Vault.""" - # Absoluten Pfad auflösen, um Verwirrung mit cwd zu vermeiden - full_path = os.path.abspath(os.path.join(self.vault_root, self.vocab_rel_path)) + self.patch_settings_chat.start() + self.patch_settings_registry.start() - if not os.path.exists(full_path): - logger.warning(f"Edge Vocabulary NOT found at: {full_path}. Registry is empty.") - return + app.routers.chat._DECISION_CONFIG_CACHE = None + EdgeRegistry._instance = None + self.registry = EdgeRegistry(vault_root=self.test_vault) - # Regex: | **canonical** | alias, alias | - pattern = re.compile(r"\|\s*\*\*([a-z_]+)\*\*\s*\|\s*([^|]+)\|") + async def asyncTearDown(self): + self.patch_settings_chat.stop() + self.patch_settings_registry.stop() + if os.path.exists(self.test_root): shutil.rmtree(self.test_root) + EdgeRegistry._instance = None + app.routers.chat._DECISION_CONFIG_CACHE = None - try: - with open(full_path, "r", encoding="utf-8") as f: - for line in f: - match = pattern.search(line) - if match: - canonical = match.group(1).strip() - aliases_str = match.group(2).strip() - - self.valid_types.add(canonical) - self.canonical_map[canonical] = canonical - - if aliases_str and "Kein Alias" not in aliases_str: - aliases = [a.strip() for a in aliases_str.split(",") if a.strip()] - for alias in aliases: - clean_alias = alias.replace("`", "") - self.canonical_map[clean_alias] = canonical - - logger.info(f"EdgeRegistry loaded from {full_path}: {len(self.valid_types)} types.") + def test_registry_resolution(self): + print("\n🔵 TEST 1: Registry Resolution") + self.assertTrue(len(self.registry.valid_types) > 0) + self.assertEqual(self.registry.resolve("ursache_ist"), "caused_by") + print("✅ Registry OK.") - except Exception as e: - logger.error(f"Failed to parse Edge Vocabulary at {full_path}: {e}") + def test_scoring_math(self): + print("\n🔵 TEST 2: Scoring Math (Lifecycle)") + with patch("app.core.retriever._get_scoring_weights", return_value=(1.0, 1.0, 0.0)): + self.assertEqual(_get_status_multiplier({"status": "stable"}), 1.2) + self.assertEqual(_get_status_multiplier({"status": "draft"}), 0.8) + print("✅ Scoring OK.") - def resolve(self, edge_type: str) -> str: - if not edge_type: return "related_to" - clean_type = edge_type.lower().strip().replace(" ", "_") + async def test_full_flow(self): + print("\n🔵 TEST 3: Pipeline flow") + mock_llm = AsyncMock(); mock_llm.prompts = {}; mock_llm.generate_raw_response.return_value = "Ok" + mock_ret = AsyncMock() + mock_hit = QueryHit(node_id="c1", note_id="n1", semantic_score=0.8, edge_bonus=0.0, centrality_bonus=0.0, total_score=0.8, source={"text": "t"}, payload={"status": "active"}) + mock_ret.search.return_value.results = [mock_hit] - if clean_type in self.canonical_map: - return self.canonical_map[clean_type] - - self._log_unknown(clean_type) - return clean_type + resp = await chat_endpoint(ChatRequest(message="Warum?"), llm=mock_llm, retriever=mock_ret) + self.assertEqual(resp.intent, "CAUSAL") + called_req = mock_ret.search.call_args[0][0] + self.assertEqual(called_req.boost_edges.get("caused_by"), 3.0) + print("✅ Full Flow OK.") - def _log_unknown(self, edge_type: str): - try: - os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) - entry = {"unknown_type": edge_type, "status": "new"} - with open(self.unknown_log_path, "a", encoding="utf-8") as f: - f.write(json.dumps(entry) + "\n") - except Exception: - pass - -# Default Instanz -registry = EdgeRegistry() \ No newline at end of file +if __name__ == '__main__': + unittest.main() \ No newline at end of file