""" FILE: app/core/retriever.py DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability). WP-22 Update: Dynamic Edge Boosting, Lifecycle Scoring & Provenance Awareness. Enthält detaillierte Debug-Informationen für die mathematische Verifizierung. VERSION: 0.6.8 (WP-22 Debug & Verifiability) STATUS: Active DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter LAST_ANALYSIS: 2025-12-18 """ from __future__ import annotations import os import time import logging from functools import lru_cache from typing import Any, Dict, List, Tuple, Iterable, Optional from app.config import get_settings from app.models.dto import ( QueryRequest, QueryResponse, QueryHit, Explanation, ScoreBreakdown, Reason, EdgeDTO ) import app.core.qdrant as qdr import app.core.qdrant_points as qp import app.services.embeddings_client as ec import app.core.graph_adapter as ga try: import yaml # type: ignore[import] except Exception: # pragma: no cover yaml = None # type: ignore[assignment] logger = logging.getLogger(__name__) @lru_cache def _get_scoring_weights() -> Tuple[float, float, float]: """ Liefert die Basis-Gewichtung (semantic_weight, edge_weight, centrality_weight) aus der Config. Priorität: 1. retriever.yaml -> 2. Environment/Settings -> 3. Hardcoded Defaults """ settings = get_settings() sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0)) edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0)) cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0)) config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml") if yaml is None: return sem, edge, cent try: if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} scoring = data.get("scoring", {}) or {} sem = float(scoring.get("semantic_weight", sem)) edge = float(scoring.get("edge_weight", edge)) cent = float(scoring.get("centrality_weight", cent)) except Exception as e: logger.warning(f"Failed to load retriever weights from {config_path}: {e}") return sem, edge, cent return sem, edge, cent def _get_client_and_prefix() -> Tuple[Any, str]: """Liefert das initialisierte Qdrant-Client-Objekt und das aktuelle Collection-Präfix.""" cfg = qdr.QdrantConfig.from_env() client = qdr.get_client(cfg) return client, cfg.prefix def _get_query_vector(req: QueryRequest) -> List[float]: """ Stellt sicher, dass ein Query-Vektor vorhanden ist. Wandelt Text-Queries via EmbeddingsClient um, falls kein Vektor im Request liegt. """ if req.query_vector: return list(req.query_vector) if not req.query: raise ValueError("QueryRequest benötigt entweder 'query' oder 'query_vector'") settings = get_settings() model_name = settings.MODEL_NAME try: # Versuch mit modernem Interface (WP-03 kompatibel) return ec.embed_text(req.query, model_name=model_name) except TypeError: # Fallback für ältere EmbeddingsClient-Signaturen return ec.embed_text(req.query) def _semantic_hits( client: Any, prefix: str, vector: List[float], top_k: int, filters: Dict[str, Any] | None = None, ) -> List[Tuple[str, float, Dict[str, Any]]]: """Führt eine reine Vektorsuche in Qdrant aus und gibt die Roh-Treffer zurück.""" flt = filters or None raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt) results: List[Tuple[str, float, Dict[str, Any]]] = [] for pid, score, payload in raw_hits: results.append((str(pid), float(score), dict(payload or {}))) return results # --- WP-22 Helper: Lifecycle Multipliers (Teil A) --- def _get_status_multiplier(payload: Dict[str, Any]) -> float: """ Ermittelt den Multiplikator basierend auf dem Content-Status. - stable: 1.2 (Belohnung für validiertes Wissen) - active/default: 1.0 - draft: 0.5 (Bestrafung für Unfertiges) """ status = str(payload.get("status", "active")).lower().strip() if status == "stable": return 1.2 if status == "draft": return 0.5 return 1.0 # --- WP-22: Dynamic Scoring Formula (Teil C) --- def _compute_total_score( semantic_score: float, payload: Dict[str, Any], edge_bonus_raw: float = 0.0, cent_bonus_raw: float = 0.0, dynamic_edge_boosts: Dict[str, float] = None ) -> Dict[str, Any]: """ Die zentrale mathematische Scoring-Formel von WP-22. FORMEL: Score = (SemanticScore * StatusMultiplier) * (1 + (Weight-1) + DynamicGraphBoost) Hierbei gilt: - BaseScore: semantic_similarity * status_multiplier - TypeImpact: retriever_weight (z.B. 1.1 für Decisions) - DynamicBoost: (EdgeW * EdgeBonus) + (CentW * CentBonus) """ # 1. Basis-Parameter laden _sem_w, edge_w_cfg, cent_w_cfg = _get_scoring_weights() status_mult = _get_status_multiplier(payload) node_weight = float(payload.get("retriever_weight", 1.0)) # 2. Base Score (Semantik gewichtet durch Lifecycle) base_val = float(semantic_score) * status_mult # 3. Graph-Intelligence Boost (WP-22 C) # Globaler Verstärker für Graph-Signale bei spezifischen Intents (z.B. WHY/EMPATHY) graph_boost_factor = 1.5 if dynamic_edge_boosts and (edge_bonus_raw > 0 or cent_bonus_raw > 0) else 1.0 edge_contribution_raw = edge_w_cfg * edge_bonus_raw cent_contribution_raw = cent_w_cfg * cent_bonus_raw dynamic_graph_impact = (edge_contribution_raw + cent_contribution_raw) * graph_boost_factor # 4. Zusammenführung (Die "Dicke" des Knotens und die Verknüpfung) # (node_weight - 1.0) ermöglicht negative oder positive Type-Impacts relativ zu 1.0 total = base_val * (1.0 + (node_weight - 1.0) + dynamic_graph_impact) # Schutz vor negativen Scores (Floor) final_score = max(0.001, float(total)) # Debug-Daten für den Explanation-Layer sammeln return { "total": final_score, "edge_bonus": float(edge_bonus_raw), "cent_bonus": float(cent_bonus_raw), "status_multiplier": status_mult, "graph_boost_factor": graph_boost_factor, "type_impact": node_weight - 1.0, "base_val": base_val } # --- WP-04b Explanation Logic --- def _build_explanation( semantic_score: float, payload: Dict[str, Any], scoring_debug: Dict[str, Any], subgraph: Optional[ga.Subgraph], target_note_id: Optional[str], applied_boosts: Optional[Dict[str, float]] = None ) -> Explanation: """ Erstellt ein detailliertes Explanation-Objekt für maximale Transparenz (WP-04b). Enthält nun WP-22 Debug-Metriken wie StatusMultiplier und GraphBoostFactor. """ _, edge_w_cfg, cent_w_cfg = _get_scoring_weights() type_weight = float(payload.get("retriever_weight", 1.0)) status_mult = scoring_debug["status_multiplier"] graph_bf = scoring_debug["graph_boost_factor"] note_type = payload.get("type", "unknown") base_val = scoring_debug["base_val"] # 1. Score Breakdown Objekt breakdown = ScoreBreakdown( semantic_contribution=base_val, edge_contribution=base_val * (edge_w_cfg * scoring_debug["edge_bonus"] * graph_bf), centrality_contribution=base_val * (cent_w_cfg * scoring_debug["cent_bonus"] * graph_bf), raw_semantic=semantic_score, raw_edge_bonus=scoring_debug["edge_bonus"], raw_centrality=scoring_debug["cent_bonus"], node_weight=type_weight, status_multiplier=status_mult, graph_boost_factor=graph_bf ) reasons: List[Reason] = [] edges_dto: List[EdgeDTO] = [] # 2. Gründe generieren if semantic_score > 0.85: reasons.append(Reason(kind="semantic", message="Herausragende inhaltliche Übereinstimmung.", score_impact=base_val)) elif semantic_score > 0.70: reasons.append(Reason(kind="semantic", message="Gute inhaltliche Übereinstimmung.", score_impact=base_val)) if type_weight != 1.0: direction = "Bevorzugt" if type_weight > 1.0 else "Abgewertet" reasons.append(Reason(kind="type", message=f"{direction} durch Typ-Profil '{note_type}'.", score_impact=base_val * (type_weight - 1.0))) if status_mult != 1.0: impact_txt = "Belohnt" if status_mult > 1.0 else "Zurückgestellt" reasons.append(Reason(kind="lifecycle", message=f"{impact_txt} (Status: {payload.get('status', 'draft')}).", score_impact=0.0)) # 3. Kanten-Details extrahieren (Incoming + Outgoing für volle Sichtbarkeit) if subgraph and target_note_id and scoring_debug["edge_bonus"] > 0: raw_edges = [] if hasattr(subgraph, "get_incoming_edges"): raw_edges.extend(subgraph.get_incoming_edges(target_note_id) or []) if hasattr(subgraph, "get_outgoing_edges"): raw_edges.extend(subgraph.get_outgoing_edges(target_note_id) or []) for edge in raw_edges: src, tgt = edge.get("source"), edge.get("target") k = edge.get("kind", "edge") prov = edge.get("provenance", "rule") conf = float(edge.get("confidence", 1.0)) # Richtung und Nachbar bestimmen is_incoming = (tgt == target_note_id) neighbor = src if is_incoming else tgt edge_obj = EdgeDTO( id=f"{src}->{tgt}:{k}", kind=k, source=src, target=tgt, weight=conf, direction="in" if is_incoming else "out", provenance=prov, confidence=conf ) edges_dto.append(edge_obj) # Die 3 stärksten Signale als Gründe formulieren top_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True) for e in top_edges[:3]: prov_label = "Explizite" if e.provenance == "explicit" else "Heuristische" boost_label = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else "" msg = f"{prov_label} Verbindung ({e.kind}){boost_label} zu '{neighbor}'." reasons.append(Reason(kind="edge", message=msg, score_impact=edge_w_cfg * e.confidence)) if scoring_debug["cent_bonus"] > 0.01: reasons.append(Reason(kind="centrality", message="Knoten ist ein zentraler Hub im Kontext.", score_impact=breakdown.centrality_contribution)) return Explanation( breakdown=breakdown, reasons=reasons, related_edges=edges_dto if edges_dto else None, applied_intent=getattr(ga, "_LAST_INTENT", "UNKNOWN"), # Debugging-Zweck applied_boosts=applied_boosts ) def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]: """Extrahiert Expansion-Tiefe und Kanten-Filter aus dem Request.""" expand = getattr(req, "expand", None) if not expand: return 0, None depth = 1 edge_types = None if isinstance(expand, dict): depth = int(expand.get("depth", 1)) edge_types = expand.get("edge_types") if edge_types: edge_types = list(edge_types) return depth, edge_types # Fallback für Pydantic Objekte if hasattr(expand, "depth"): return int(getattr(expand, "depth", 1)), getattr(expand, "edge_types", None) return 0, None def _build_hits_from_semantic( hits: Iterable[Tuple[str, float, Dict[str, Any]]], top_k: int, used_mode: str, subgraph: ga.Subgraph | None = None, explain: bool = False, dynamic_edge_boosts: Dict[str, float] = None ) -> QueryResponse: """ Wandelt semantische Roh-Treffer in strukturierte QueryHits um. Berechnet den finalen Score pro Hit unter Einbeziehung des Subgraphen. """ t0 = time.time() enriched = [] for pid, semantic_score, payload in hits: edge_bonus = 0.0 cent_bonus = 0.0 # Graph-Abfrage erfolgt IMMER über die Note-ID target_note_id = payload.get("note_id") if subgraph is not None and target_note_id: try: edge_bonus = float(subgraph.edge_bonus(target_note_id)) cent_bonus = float(subgraph.centrality_bonus(target_note_id)) except Exception as e: logger.debug(f"Graph signal failed for {target_note_id}: {e}") # Messbare Scoring-Daten via WP-22 Formel debug_data = _compute_total_score( semantic_score, payload, edge_bonus_raw=edge_bonus, cent_bonus_raw=cent_bonus, dynamic_edge_boosts=dynamic_edge_boosts ) enriched.append((pid, float(semantic_score), payload, debug_data)) # Sortierung nach berechnetem Total Score enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True) limited_hits = enriched_sorted[: max(1, top_k)] results: List[QueryHit] = [] for pid, semantic_score, payload, debug in limited_hits: explanation_obj = None if explain: explanation_obj = _build_explanation( semantic_score=float(semantic_score), payload=payload, scoring_debug=debug, subgraph=subgraph, target_note_id=payload.get("note_id"), applied_boosts=dynamic_edge_boosts ) text_content = payload.get("page_content") or payload.get("text") or payload.get("content") results.append(QueryHit( node_id=str(pid), note_id=payload.get("note_id", "unknown"), semantic_score=float(semantic_score), edge_bonus=debug["edge_bonus"], centrality_bonus=debug["cent_bonus"], total_score=debug["total"], source={ "path": payload.get("path"), "section": payload.get("section") or payload.get("section_title"), "text": text_content }, payload=payload, explanation=explanation_obj )) dt_ms = int((time.time() - t0) * 1000) return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt_ms) def semantic_retrieve(req: QueryRequest) -> QueryResponse: """Standard-Vektorsuche ohne Graph-Einfluss (WP-02).""" client, prefix = _get_client_and_prefix() vector = _get_query_vector(req) top_k = req.top_k or 10 hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None, explain=req.explain) def hybrid_retrieve(req: QueryRequest) -> QueryResponse: """ Hybrid-Suche: Kombiniert Semantik mit WP-22 Graph Intelligence. Führt Expansion durch, gewichtet nach Provenance und appliziert Intent-Boosts. """ client, prefix = _get_client_and_prefix() vector = list(req.query_vector) if req.query_vector else _get_query_vector(req) top_k = req.top_k or 10 # 1. Semantische Seed-Suche hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) # 2. Graph Expansion & Custom Weighting expand_depth, edge_types = _extract_expand_options(req) boost_edges = getattr(req, "boost_edges", {}) or {} subgraph: ga.Subgraph | None = None if expand_depth > 0 and hits: # Extrahiere Note-IDs der Treffer als Startpunkte für den Graphen seed_ids = list({h[2].get("note_id") for h in hits if h[2].get("note_id")}) if seed_ids: try: # Subgraph aus Qdrant laden subgraph = ga.expand(client, prefix, seed_ids, depth=expand_depth, edge_types=edge_types) # WP-22: Transformation der Gewichte im RAM-Graphen vor Bonus-Berechnung if subgraph and hasattr(subgraph, "graph"): for u, v, data in subgraph.graph.edges(data=True): # A. Provenance Weighting (WP-22 Herkunfts-Bonus) prov = data.get("provenance", "rule") # Explicit=1.0, Smart=0.9, Rule=0.7 prov_w = 1.0 if prov == "explicit" else (0.9 if prov == "smart" else 0.7) # B. Intent Boost Multiplikator (Vom Router geladen) k = data.get("kind") intent_multiplier = boost_edges.get(k, 1.0) # Finales Kanten-Gewicht im Graphen setzen data["weight"] = data.get("weight", 1.0) * prov_w * intent_multiplier except Exception as e: logger.error(f"Graph expansion failed: {e}") subgraph = None # 3. Scoring & Explanation Generierung return _build_hits_from_semantic( hits, top_k, "hybrid", subgraph, req.explain, boost_edges ) class Retriever: """Wrapper-Klasse für die konsolidierte Retrieval-Logik.""" async def search(self, request: QueryRequest) -> QueryResponse: """Führt eine hybride Suche aus. Asynchron für FastAPI-Integration.""" return hybrid_retrieve(request)