""" FILE: app/core/retriever.py DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability). WP-22 Update: Dynamic Edge Boosting & Lifecycle Scoring. VERSION: 0.6.0 (WP-22 Dynamic Scoring) STATUS: Active DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter LAST_ANALYSIS: 2025-12-18 """ from __future__ import annotations import os import time from functools import lru_cache from typing import Any, Dict, List, Tuple, Iterable, Optional from app.config import get_settings from app.models.dto import ( QueryRequest, QueryResponse, QueryHit, Explanation, ScoreBreakdown, Reason, EdgeDTO ) import app.core.qdrant as qdr import app.core.qdrant_points as qp import app.services.embeddings_client as ec import app.core.graph_adapter as ga try: import yaml # type: ignore[import] except Exception: # pragma: no cover yaml = None # type: ignore[assignment] @lru_cache def _get_scoring_weights() -> Tuple[float, float, float]: """Liefert (semantic_weight, edge_weight, centrality_weight) für den Retriever.""" settings = get_settings() sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0)) edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0)) cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0)) config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml") if yaml is None: return sem, edge, cent try: if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} scoring = data.get("scoring", {}) or {} sem = float(scoring.get("semantic_weight", sem)) edge = float(scoring.get("edge_weight", edge)) cent = float(scoring.get("centrality_weight", cent)) except Exception: return sem, edge, cent return sem, edge, cent def _get_client_and_prefix() -> Tuple[Any, str]: """Liefert (QdrantClient, prefix).""" cfg = qdr.QdrantConfig.from_env() client = qdr.get_client(cfg) return client, cfg.prefix def _get_query_vector(req: QueryRequest) -> List[float]: """Liefert den Query-Vektor aus dem Request.""" if req.query_vector: return list(req.query_vector) if not req.query: raise ValueError("QueryRequest benötigt entweder query oder query_vector") settings = get_settings() model_name = settings.MODEL_NAME try: return ec.embed_text(req.query, model_name=model_name) except TypeError: return ec.embed_text(req.query) def _semantic_hits( client: Any, prefix: str, vector: List[float], top_k: int, filters: Dict[str, Any] | None = None, ) -> List[Tuple[str, float, Dict[str, Any]]]: """Führt eine semantische Suche aus.""" flt = filters or None raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt) results: List[Tuple[str, float, Dict[str, Any]]] = [] for pid, score, payload in raw_hits: results.append((str(pid), float(score), dict(payload or {}))) return results # --- WP-22 Helper: Lifecycle Multipliers (Teil A) --- def _get_status_multiplier(payload: Dict[str, Any]) -> float: """ WP-22: Drafts werden bestraft, Stable Notes belohnt. """ status = str(payload.get("status", "draft")).lower() if status == "stable": return 1.2 if status == "active": return 1.0 if status == "draft": return 0.5 # Malus für Entwürfe # Fallback für andere oder leere Status return 1.0 # --- WP-22: Dynamic Scoring Formula (Teil C) --- def _compute_total_score( semantic_score: float, payload: Dict[str, Any], edge_bonus: float = 0.0, cent_bonus: float = 0.0, dynamic_edge_boosts: Dict[str, float] = None ) -> Tuple[float, float, float]: """ Berechnet total_score nach WP-22 Formel. Score = (Sem * Type * Status) + (Weighted_Edge + Cent) """ raw_weight = payload.get("retriever_weight", 1.0) try: weight = float(raw_weight) except (TypeError, ValueError): weight = 1.0 if weight < 0.0: weight = 0.0 sem_w, edge_w, cent_w = _get_scoring_weights() status_mult = _get_status_multiplier(payload) # Dynamic Edge Boosting (Teil C) # Wenn dynamische Boosts aktiv sind (durch den Router), verstärken wir den Graph-Bonus global. # Der konkrete kanten-spezifische Boost passiert bereits im Subgraph (hybrid_retrieve). final_edge_score = edge_w * edge_bonus if dynamic_edge_boosts and edge_bonus > 0: # Globaler Boost-Faktor falls Intention (z.B. WHY) vorliegt final_edge_score *= 1.5 total = (sem_w * float(semantic_score) * weight * status_mult) + final_edge_score + (cent_w * cent_bonus) return float(total), float(edge_bonus), float(cent_bonus) # --- WP-04b Explanation Logic --- def _build_explanation( semantic_score: float, payload: Dict[str, Any], edge_bonus: float, cent_bonus: float, subgraph: Optional[ga.Subgraph], node_key: Optional[str] ) -> Explanation: """Erstellt ein Explanation-Objekt (WP-04b).""" sem_w, _edge_w, _cent_w = _get_scoring_weights() _, edge_w_cfg, cent_w_cfg = _get_scoring_weights() try: type_weight = float(payload.get("retriever_weight", 1.0)) except (TypeError, ValueError): type_weight = 1.0 status_mult = _get_status_multiplier(payload) note_type = payload.get("type", "unknown") # Breakdown Berechnung (muss mit _compute_total_score korrelieren) breakdown = ScoreBreakdown( semantic_contribution=(sem_w * semantic_score * type_weight * status_mult), edge_contribution=(edge_w_cfg * edge_bonus), centrality_contribution=(cent_w_cfg * cent_bonus), raw_semantic=semantic_score, raw_edge_bonus=edge_bonus, raw_centrality=cent_bonus, node_weight=type_weight ) reasons: List[Reason] = [] edges_dto: List[EdgeDTO] = [] # Reason Generation Logik (WP-04b) if semantic_score > 0.85: reasons.append(Reason(kind="semantic", message="Sehr hohe textuelle Übereinstimmung.", score_impact=breakdown.semantic_contribution)) elif semantic_score > 0.70: reasons.append(Reason(kind="semantic", message="Gute textuelle Übereinstimmung.", score_impact=breakdown.semantic_contribution)) if type_weight != 1.0: msg = "Bevorzugt" if type_weight > 1.0 else "Leicht abgewertet" reasons.append(Reason(kind="type", message=f"{msg} aufgrund des Typs '{note_type}'.", score_impact=(sem_w * semantic_score * (type_weight - 1.0)))) # NEU: WP-22 Status Reason if status_mult != 1.0: msg = "Status-Bonus" if status_mult > 1.0 else "Status-Malus" reasons.append(Reason(kind="lifecycle", message=f"{msg} ({payload.get('status')}).", score_impact=0.0)) if subgraph and node_key and edge_bonus > 0: # Extrahiere Top-Kanten für die Erklärung if hasattr(subgraph, "get_outgoing_edges"): outgoing = subgraph.get_outgoing_edges(node_key) for edge in outgoing: target = edge.get("target", "Unknown") kind = edge.get("kind", "edge") weight = edge.get("weight", 0.0) if weight > 0.05: edges_dto.append(EdgeDTO(id=f"{node_key}->{target}:{kind}", kind=kind, source=node_key, target=target, weight=weight, direction="out")) if hasattr(subgraph, "get_incoming_edges"): incoming = subgraph.get_incoming_edges(node_key) for edge in incoming: src = edge.get("source", "Unknown") kind = edge.get("kind", "edge") weight = edge.get("weight", 0.0) if weight > 0.05: edges_dto.append(EdgeDTO(id=f"{src}->{node_key}:{kind}", kind=kind, source=src, target=node_key, weight=weight, direction="in")) all_edges = sorted(edges_dto, key=lambda e: e.weight, reverse=True) for top_edge in all_edges[:3]: impact = edge_w_cfg * top_edge.weight dir_txt = "Verweist auf" if top_edge.direction == "out" else "Referenziert von" tgt_txt = top_edge.target if top_edge.direction == "out" else top_edge.source reasons.append(Reason(kind="edge", message=f"{dir_txt} '{tgt_txt}' via '{top_edge.kind}'", score_impact=impact, details={"kind": top_edge.kind})) if cent_bonus > 0.01: reasons.append(Reason(kind="centrality", message="Knoten liegt zentral im Kontext.", score_impact=breakdown.centrality_contribution)) return Explanation(breakdown=breakdown, reasons=reasons, related_edges=edges_dto if edges_dto else None) def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]: """Extrahiert depth und edge_types für Graph-Expansion.""" expand = getattr(req, "expand", None) if not expand: return 0, None depth = 1 edge_types: List[str] | None = None if hasattr(expand, "depth") or hasattr(expand, "edge_types"): depth = int(getattr(expand, "depth", 1) or 1) types_val = getattr(expand, "edge_types", None) if types_val: edge_types = list(types_val) return depth, edge_types if isinstance(expand, dict): if "depth" in expand: depth = int(expand.get("depth") or 1) if "edge_types" in expand and expand["edge_types"] is not None: edge_types = list(expand["edge_types"]) return depth, edge_types return 0, None def _build_hits_from_semantic( hits: Iterable[Tuple[str, float, Dict[str, Any]]], top_k: int, used_mode: str, subgraph: ga.Subgraph | None = None, explain: bool = False, dynamic_edge_boosts: Dict[str, float] = None ) -> QueryResponse: """Baut strukturierte QueryHits basierend auf Scoring (WP-22 & WP-04b).""" t0 = time.time() enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = [] for pid, semantic_score, payload in hits: edge_bonus = 0.0 cent_bonus = 0.0 node_key = payload.get("chunk_id") or payload.get("note_id") if subgraph is not None and node_key: try: edge_bonus = float(subgraph.edge_bonus(node_key)) except Exception: edge_bonus = 0.0 try: cent_bonus = float(subgraph.centrality_bonus(node_key)) except Exception: cent_bonus = 0.0 total, eb, cb = _compute_total_score( semantic_score, payload, edge_bonus=edge_bonus, cent_bonus=cent_bonus, dynamic_edge_boosts=dynamic_edge_boosts ) enriched.append((pid, float(semantic_score), payload, total, eb, cb)) # Sort & Limit enriched_sorted = sorted(enriched, key=lambda h: h[3], reverse=True) limited = enriched_sorted[: max(1, top_k)] results: List[QueryHit] = [] for pid, semantic_score, payload, total, eb, cb in limited: explanation_obj = None if explain: explanation_obj = _build_explanation( semantic_score=float(semantic_score), payload=payload, edge_bonus=eb, cent_bonus=cb, subgraph=subgraph, node_key=payload.get("chunk_id") or payload.get("note_id") ) text_content = payload.get("page_content") or payload.get("text") or payload.get("content") results.append(QueryHit( node_id=str(pid), note_id=payload.get("note_id", "unknown"), semantic_score=float(semantic_score), edge_bonus=eb, centrality_bonus=cb, total_score=total, paths=None, source={ "path": payload.get("path"), "section": payload.get("section") or payload.get("section_title"), "text": text_content }, payload=payload, explanation=explanation_obj )) dt = int((time.time() - t0) * 1000) return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt) def semantic_retrieve(req: QueryRequest) -> QueryResponse: """Reiner semantischer Retriever (WP-02).""" client, prefix = _get_client_and_prefix() vector = _get_query_vector(req) top_k = req.top_k or get_settings().RETRIEVER_TOP_K hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None, explain=req.explain) def hybrid_retrieve(req: QueryRequest) -> QueryResponse: """Hybrid-Retriever: semantische Suche + optionale Edge-Expansion (WP-04a).""" client, prefix = _get_client_and_prefix() # 1. Semantische Suche vector = list(req.query_vector) if req.query_vector else _get_query_vector(req) top_k = req.top_k or get_settings().RETRIEVER_TOP_K hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) # 2. Graph Expansion & Custom Boosting (WP-22 Teil C) depth, edge_types = _extract_expand_options(req) boost_edges = getattr(req, "boost_edges", {}) subgraph: ga.Subgraph | None = None if depth and depth > 0: seed_ids: List[str] = [] for _pid, _score, payload in hits: key = payload.get("note_id") if key and key not in seed_ids: seed_ids.append(key) if seed_ids: try: # Subgraph laden subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=edge_types) # --- WP-22: Kanten-Boosts im RAM-Graphen anwenden --- # Dies manipuliert die Gewichte im Graphen, bevor der 'edge_bonus' berechnet wird. if boost_edges and subgraph and hasattr(subgraph, "graph"): for u, v, data in subgraph.graph.edges(data=True): k = data.get("kind") if k in boost_edges: # Gewicht multiplizieren (z.B. caused_by * 3.0) data["weight"] = data.get("weight", 1.0) * boost_edges[k] except Exception: subgraph = None # 3. Scoring & Re-Ranking return _build_hits_from_semantic( hits, top_k=top_k, used_mode="hybrid", subgraph=subgraph, explain=req.explain, dynamic_edge_boosts=boost_edges ) class Retriever: """Wrapper-Klasse für Suchoperationen.""" async def search(self, request: QueryRequest) -> QueryResponse: return hybrid_retrieve(request)