from __future__ import annotations import os import time from functools import lru_cache from typing import Any, Dict, List, Tuple, Iterable from app.config import get_settings from app.models.dto import QueryRequest, QueryResponse, QueryHit import app.core.qdrant as qdr import app.core.qdrant_points as qp import app.services.embeddings_client as ec import app.core.graph_adapter as ga try: import yaml # type: ignore[import] except Exception: # pragma: no cover - Fallback falls PyYAML nicht installiert ist yaml = None # type: ignore[assignment] @lru_cache def _get_scoring_weights() -> Tuple[float, float, float]: """Liefert (semantic_weight, edge_weight, centrality_weight) für den Retriever. Priorität: 1. Werte aus config/retriever.yaml (falls vorhanden und gültig), Abschnitt: scoring: semantic_weight: 1.0 edge_weight: 0.5 centrality_weight: 0.5 2. Fallback auf Settings.RETRIEVER_W_* (ENV-basiert). """ settings = get_settings() sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0)) edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0)) cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0)) # YAML-Override, falls konfiguriert config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml") if yaml is None: return sem, edge, cent try: if os.path.exists(config_path): with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} scoring = data.get("scoring", {}) or {} sem = float(scoring.get("semantic_weight", sem)) edge = float(scoring.get("edge_weight", edge)) cent = float(scoring.get("centrality_weight", cent)) except Exception: # Bei Fehlern in der YAML-Konfiguration defensiv auf Defaults zurückfallen return sem, edge, cent return sem, edge, cent def _get_client_and_prefix() -> Tuple[Any, str]: """Liefert (QdrantClient, prefix) basierend auf QdrantConfig.from_env().""" cfg = qdr.QdrantConfig.from_env() client = qdr.get_client(cfg) return client, cfg.prefix def _get_query_vector(req: QueryRequest) -> List[float]: """ Liefert den Query-Vektor aus dem Request. - Falls req.query_vector gesetzt ist, wird dieser unverändert genutzt. - Andernfalls wird req.query über den Embedding-Service in einen Vektor transformiert. """ if req.query_vector: return list(req.query_vector) if not req.query: raise ValueError("QueryRequest benötigt entweder query oder query_vector") settings = get_settings() model_name = settings.MODEL_NAME return ec.embed_text(req.query, model_name=model_name) def _semantic_hits( client: Any, prefix: str, vector: List[float], top_k: int, filters: Dict[str, Any] | None = None, ) -> List[Tuple[str, float, Dict[str, Any]]]: """Führt eine semantische Suche über mindnet_chunks aus und liefert Roh-Treffer. Rückgabeformat: Liste von (point_id, score, payload) """ flt = filters or None hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt) results: List[Tuple[str, float, Dict[str, Any]]] = [] for point in hits: pid = str(point.id) score = float(point.score) payload = dict(point.payload or {}) results.append((pid, score, payload)) return results def _get_semantic_score_and_payload( hit: Tuple[str, float, Dict[str, Any]] ) -> Tuple[float, Dict[str, Any]]: """Extrahiert semantic_score und Payload aus einem Raw-Hit.""" _, score, payload = hit return float(score), payload or {} def _compute_total_score( semantic_score: float, payload: Dict[str, Any], edge_bonus: float = 0.0, cent_bonus: float = 0.0, ) -> Tuple[float, float, float]: """Berechnet total_score aus semantic_score, retriever_weight und Graph-Boni. Formel (WP-04, konfigurierbar über config/retriever.yaml bzw. ENV): total_score = W_sem * semantic_score * max(retriever_weight, 0.0) + W_edge * edge_bonus + W_cent * cent_bonus - retriever_weight stammt aus dem Chunk-Payload (types.yaml). - W_sem / W_edge / W_cent kommen aus _get_scoring_weights(). """ raw_weight = payload.get("retriever_weight", 1.0) try: weight = float(raw_weight) except (TypeError, ValueError): weight = 1.0 if weight < 0.0: weight = 0.0 sem_w, edge_w, cent_w = _get_scoring_weights() total = (sem_w * float(semantic_score) * weight) + (edge_w * edge_bonus) + (cent_w * cent_bonus) return float(total), float(edge_bonus), float(cent_bonus) def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]: """Extrahiert depth und edge_types aus req.expand, falls vorhanden. - Falls expand nicht gesetzt ist: depth=0, edge_types=None (keine Expansion). - Unterstützt sowohl Pydantic-Modelle als auch plain dicts. """ expand = getattr(req, "expand", None) if not expand: return 0, None depth = 1 edge_types = None # Pydantic-Modell oder Objekt mit Attributen if hasattr(expand, "depth") or hasattr(expand, "edge_types"): depth = int(getattr(expand, "depth", 1) or 1) types_val = getattr(expand, "edge_types", None) if types_val: edge_types = list(types_val) return depth, edge_types # Plain dict if isinstance(expand, dict): if "depth" in expand: depth = int(expand.get("depth") or 1) if "edge_types" in expand and expand["edge_types"] is not None: edge_types = list(expand["edge_types"]) return depth, edge_types return 0, None def _build_hits_from_semantic( hits: Iterable[Tuple[str, float, Dict[str, Any]]], top_k: int, used_mode: str, subgraph: ga.Subgraph | None = None, ) -> QueryResponse: """Baut aus Raw-Hits und optionalem Subgraph strukturierte QueryHits. - Aggregation auf Note-Ebene (note_id), - Berechnung von total_score unter Nutzung von retriever_weight + Graph-Boni, - Rückgabe als QueryResponse mit Latenz. """ t0 = time.time() enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = [] for pid, semantic_score, payload in hits: # Graph-Scores, falls Subgraph und stabiler Key vorhanden edge_bonus = 0.0 cent_bonus = 0.0 if subgraph is not None: node_key = payload.get("chunk_id") or payload.get("note_id") if node_key: try: edge_bonus = float(subgraph.edge_bonus(node_key)) except Exception: edge_bonus = 0.0 try: cent_bonus = float(subgraph.centrality_bonus(node_key)) except Exception: cent_bonus = 0.0 total, edge_bonus, cent_bonus = _compute_total_score( semantic_score, payload, edge_bonus=edge_bonus, cent_bonus=cent_bonus, ) enriched.append((pid, float(semantic_score), payload, total, edge_bonus, cent_bonus)) # Sortierung nach total_score absteigend enriched_sorted = sorted(enriched, key=lambda h: h[3], reverse=True) limited = enriched_sorted[: max(1, top_k)] results: List[QueryHit] = [] for pid, semantic_score, payload, total, edge_bonus, cent_bonus in limited: note_id = payload.get("note_id") path = payload.get("path") # mindnet_chunks: section (aktuell genutzt); ältere Stände nutzen ggf. section_title section = payload.get("section") or payload.get("section_title") results.append( QueryHit( node_id=str(pid), note_id=note_id, semantic_score=float(semantic_score), edge_bonus=edge_bonus, centrality_bonus=cent_bonus, total_score=total, paths=None, source={ "path": path, "section": section, }, ) ) dt = int((time.time() - t0) * 1000) return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt) def semantic_retrieve(req: QueryRequest) -> QueryResponse: """Reiner semantischer Retriever (ohne Edge-Expansion).""" client, prefix = _get_client_and_prefix() vector = _get_query_vector(req) top_k = req.top_k or get_settings().RETRIEVER_TOP_K hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None) def hybrid_retrieve(req: QueryRequest) -> QueryResponse: """Hybrid-Retriever: semantische Suche + optionale Edge-Expansion. Aktueller Stand (Step 3): - Semantische Suche über mindnet_chunks (wie semantic_retrieve), - Bei expand.depth > 0 wird ein Subgraph aus mindnet_edges aufgebaut, und edge_bonus / centrality_bonus in das Scoring einbezogen. """ client, prefix = _get_client_and_prefix() # Query-Vektor: # - Falls explizit gesetzt, unverändert nutzen. # - Andernfalls über embed_text erzeugen. if req.query_vector: vector = list(req.query_vector) else: vector = _get_query_vector(req) top_k = req.top_k or get_settings().RETRIEVER_TOP_K hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) depth, edge_types = _extract_expand_options(req) subgraph: ga.Subgraph | None = None if depth and depth > 0: # Seeds: stabile IDs aus dem Payload (chunk_id bevorzugt, sonst note_id) seed_ids: List[str] = [] for _, _score, payload in hits: key = payload.get("chunk_id") or payload.get("note_id") if key and key not in seed_ids: seed_ids.append(key) if seed_ids: try: subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=edge_types) except Exception: # Edge-Expansion ist optional: bei Fehlern weiter ohne Graph-Boni subgraph = None return _build_hits_from_semantic(hits, top_k=top_k, used_mode="hybrid", subgraph=subgraph)