From a571c41c6100e6e3c3bcac6b40b545253c03af7d Mon Sep 17 00:00:00 2001 From: Lars Date: Thu, 4 Dec 2025 15:43:32 +0100 Subject: [PATCH] app/core/retriever.py aktualisiert --- app/core/retriever.py | 206 ++++++++++++++++++++++++++---------------- 1 file changed, 129 insertions(+), 77 deletions(-) diff --git a/app/core/retriever.py b/app/core/retriever.py index f7b2640..119dc9c 100644 --- a/app/core/retriever.py +++ b/app/core/retriever.py @@ -1,7 +1,9 @@ from __future__ import annotations +import os import time -from typing import Any, Dict, List, Tuple +from functools import lru_cache +from typing import Any, Dict, List, Tuple, Iterable from app.config import get_settings from app.models.dto import QueryRequest, QueryResponse, QueryHit @@ -10,6 +12,47 @@ import app.core.qdrant_points as qp import app.services.embeddings_client as ec import app.core.graph_adapter as ga +try: + import yaml # type: ignore[import] +except Exception: # pragma: no cover - Fallback falls PyYAML nicht installiert ist + yaml = None # type: ignore[assignment] + + +@lru_cache +def _get_scoring_weights() -> Tuple[float, float, float]: + """Liefert (semantic_weight, edge_weight, centrality_weight) für den Retriever. + + Priorität: + 1. Werte aus config/retriever.yaml (falls vorhanden und gültig), + Abschnitt: + scoring: + semantic_weight: 1.0 + edge_weight: 0.5 + centrality_weight: 0.5 + 2. Fallback auf Settings.RETRIEVER_W_* (ENV-basiert). + """ + settings = get_settings() + sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0)) + edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0)) + cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0)) + + # YAML-Override, falls konfiguriert + config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml") + if yaml is None: + return sem, edge, cent + try: + if os.path.exists(config_path): + with open(config_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + scoring = data.get("scoring", {}) or {} + sem = float(scoring.get("semantic_weight", sem)) + edge = float(scoring.get("edge_weight", edge)) + cent = float(scoring.get("centrality_weight", cent)) + except Exception: + # Bei Fehlern in der YAML-Konfiguration defensiv auf Defaults zurückfallen + return sem, edge, cent + return sem, edge, cent + def _get_client_and_prefix() -> Tuple[Any, str]: """Liefert (QdrantClient, prefix) basierend auf QdrantConfig.from_env().""" @@ -23,18 +66,18 @@ def _get_query_vector(req: QueryRequest) -> List[float]: Liefert den Query-Vektor aus dem Request. - Falls req.query_vector gesetzt ist, wird dieser unverändert genutzt. - - Falls req.query (Text) gesetzt ist, wird ec.embed_text(req.query) aufgerufen. - - Andernfalls: ValueError. + - Andernfalls wird req.query über den Embedding-Service in einen Vektor + transformiert. """ - if req.query_vector is not None: - if not isinstance(req.query_vector, list): - raise ValueError("query_vector muss eine Liste von floats sein") - return req.query_vector + if req.query_vector: + return list(req.query_vector) - if req.query: - return ec.embed_text(req.query) + if not req.query: + raise ValueError("QueryRequest benötigt entweder query oder query_vector") - raise ValueError("Weder query_vector noch query gesetzt – mindestens eines ist erforderlich") + settings = get_settings() + model_name = settings.MODEL_NAME + return ec.embed_text(req.query, model_name=model_name) def _semantic_hits( @@ -42,20 +85,29 @@ def _semantic_hits( prefix: str, vector: List[float], top_k: int, - filters: Dict | None, -): - """Kapselt den Aufruf von qp.search_chunks_by_vector.""" + filters: Dict[str, Any] | None = None, +) -> List[Tuple[str, float, Dict[str, Any]]]: + """Führt eine semantische Suche über mindnet_chunks aus und liefert Roh-Treffer. + + Rückgabeformat: Liste von (point_id, score, payload) + """ flt = filters or None hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt) - return hits + results: List[Tuple[str, float, Dict[str, Any]]] = [] + for point in hits: + pid = str(point.id) + score = float(point.score) + payload = dict(point.payload or {}) + results.append((pid, score, payload)) + return results -def _resolve_top_k(req: QueryRequest) -> int: - """Ermittelt ein sinnvolles top_k.""" - if isinstance(req.top_k, int) and req.top_k > 0: - return req.top_k - s = get_settings() - return max(1, int(getattr(s, "RETRIEVER_TOP_K", 10))) +def _get_semantic_score_and_payload( + hit: Tuple[str, float, Dict[str, Any]] +) -> Tuple[float, Dict[str, Any]]: + """Extrahiert semantic_score und Payload aus einem Raw-Hit.""" + _, score, payload = hit + return float(score), payload or {} def _compute_total_score( @@ -64,13 +116,15 @@ def _compute_total_score( edge_bonus: float = 0.0, cent_bonus: float = 0.0, ) -> Tuple[float, float, float]: - """Berechnet total_score auf Basis von semantic_score, retriever_weight und Graph-Boni. + """Berechnet total_score aus semantic_score, retriever_weight und Graph-Boni. - Aktuelle Formel (Step 3): - total_score = semantic_score * max(retriever_weight, 0.0) + edge_bonus + cent_bonus + Formel (WP-04, konfigurierbar über config/retriever.yaml bzw. ENV): + total_score = W_sem * semantic_score * max(retriever_weight, 0.0) + + W_edge * edge_bonus + + W_cent * cent_bonus - retriever_weight stammt aus dem Chunk-Payload und ist bereits aus types.yaml - abgeleitet. Falls nicht gesetzt oder nicht interpretierbar, wird 1.0 angenommen. + - retriever_weight stammt aus dem Chunk-Payload (types.yaml). + - W_sem / W_edge / W_cent kommen aus _get_scoring_weights(). """ raw_weight = payload.get("retriever_weight", 1.0) try: @@ -80,8 +134,9 @@ def _compute_total_score( if weight < 0.0: weight = 0.0 - total = float(semantic_score) * weight + edge_bonus + cent_bonus - return total, float(edge_bonus), float(cent_bonus) + sem_w, edge_w, cent_w = _get_scoring_weights() + total = (sem_w * float(semantic_score) * weight) + (edge_w * edge_bonus) + (cent_w * cent_bonus) + return float(total), float(edge_bonus), float(cent_bonus) def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]: @@ -99,56 +154,45 @@ def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]: # Pydantic-Modell oder Objekt mit Attributen if hasattr(expand, "depth") or hasattr(expand, "edge_types"): - try: - depth_val = getattr(expand, "depth", 1) or 1 - depth = int(depth_val) - except Exception: - depth = 1 - edge_types = getattr(expand, "edge_types", None) - # plain dict aus FastAPI/Pydantic - elif isinstance(expand, dict): - try: - depth_val = expand.get("depth", 1) or 1 - depth = int(depth_val) - except Exception: - depth = 1 - edge_types = expand.get("edge_types") + depth = int(getattr(expand, "depth", 1) or 1) + types_val = getattr(expand, "edge_types", None) + if types_val: + edge_types = list(types_val) + return depth, edge_types - if depth < 0: - depth = 0 + # Plain dict + if isinstance(expand, dict): + if "depth" in expand: + depth = int(expand.get("depth") or 1) + if "edge_types" in expand and expand["edge_types"] is not None: + edge_types = list(expand["edge_types"]) + return depth, edge_types - if edge_types is not None and not isinstance(edge_types, list): - try: - edge_types = list(edge_types) - except Exception: - edge_types = None - - return depth, edge_types + return 0, None def _build_hits_from_semantic( - hits: List[Tuple[str, float, Dict[str, Any]]], + hits: Iterable[Tuple[str, float, Dict[str, Any]]], top_k: int, used_mode: str, - subgraph: Any | None = None, + subgraph: ga.Subgraph | None = None, ) -> QueryResponse: - """Formt rohe Treffer in QueryResponse um und wendet das Scoring an.""" + """Baut aus Raw-Hits und optionalem Subgraph strukturierte QueryHits. + + - Aggregation auf Note-Ebene (note_id), + - Berechnung von total_score unter Nutzung von retriever_weight + Graph-Boni, + - Rückgabe als QueryResponse mit Latenz. + """ t0 = time.time() enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = [] + for pid, semantic_score, payload in hits: # Graph-Scores, falls Subgraph und stabiler Key vorhanden edge_bonus = 0.0 cent_bonus = 0.0 if subgraph is not None: - # Standard-Key wie im ursprünglichen Verhalten (für Fakes in Tests): node_key = payload.get("chunk_id") or payload.get("note_id") - - # Falls es sich um unseren echten Subgraph-Typ handelt, wissen wir, - # dass Knoten als note_id modelliert sind → dann gezielt note_id nutzen. - if isinstance(subgraph, ga.Subgraph): - node_key = payload.get("note_id") - if node_key: try: edge_bonus = float(subgraph.edge_bonus(node_key)) @@ -175,7 +219,8 @@ def _build_hits_from_semantic( for pid, semantic_score, payload, total, edge_bonus, cent_bonus in limited: note_id = payload.get("note_id") path = payload.get("path") - section = payload.get("section_title") + # mindnet_chunks: section (aktuell genutzt); ältere Stände nutzen ggf. section_title + section = payload.get("section") or payload.get("section_title") results.append( QueryHit( @@ -199,36 +244,43 @@ def _build_hits_from_semantic( def semantic_retrieve(req: QueryRequest) -> QueryResponse: """Reiner semantischer Retriever (ohne Edge-Expansion).""" - top_k = _resolve_top_k(req) - vector = _get_query_vector(req) client, prefix = _get_client_and_prefix() + vector = _get_query_vector(req) + top_k = req.top_k or get_settings().RETRIEVER_TOP_K + hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) - # semantic mode: keine Edge-Expansion return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None) def hybrid_retrieve(req: QueryRequest) -> QueryResponse: - """Hybrid-Retriever mit optionaler Edge-Expansion. + """Hybrid-Retriever: semantische Suche + optionale Edge-Expansion. - Schritt 3: - - Basis sind die semantischen Chunk-Treffer (wie im Semantic-Modus) - - Zusätzlich wird, falls req.expand gesetzt ist und depth > 0, - ein lokaler Subgraph über ga.expand aufgebaut und zur Score-Berechnung verwendet. + Aktueller Stand (Step 3): + - Semantische Suche über mindnet_chunks (wie semantic_retrieve), + - Bei expand.depth > 0 wird ein Subgraph aus mindnet_edges aufgebaut, + und edge_bonus / centrality_bonus in das Scoring einbezogen. """ - top_k = _resolve_top_k(req) - vector = _get_query_vector(req) client, prefix = _get_client_and_prefix() + + # Query-Vektor: + # - Falls explizit gesetzt, unverändert nutzen. + # - Andernfalls über embed_text erzeugen. + if req.query_vector: + vector = list(req.query_vector) + else: + vector = _get_query_vector(req) + + top_k = req.top_k or get_settings().RETRIEVER_TOP_K + hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) depth, edge_types = _extract_expand_options(req) - subgraph = None - if depth > 0: - # Seeds: stabile IDs aus dem Payload - # WICHTIG: Wir verwenden note_id als Knoten-ID, da Edges zwischen Notes - # modelliert sind (source_id/target_id = note_id). + subgraph: ga.Subgraph | None = None + if depth and depth > 0: + # Seeds: stabile IDs aus dem Payload (chunk_id bevorzugt, sonst note_id) seed_ids: List[str] = [] for _, _score, payload in hits: - key = payload.get("note_id") + key = payload.get("chunk_id") or payload.get("note_id") if key and key not in seed_ids: seed_ids.append(key)