mindnet/app/core/retriever.py
Lars 9d9239b11e
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
app/core/retriever.py aktualisiert
2025-12-04 15:55:39 +01:00

301 lines
10 KiB
Python

from __future__ import annotations
import os
import time
from functools import lru_cache
from typing import Any, Dict, List, Tuple, Iterable
from app.config import get_settings
from app.models.dto import QueryRequest, QueryResponse, QueryHit
import app.core.qdrant as qdr
import app.core.qdrant_points as qp
import app.services.embeddings_client as ec
import app.core.graph_adapter as ga
try:
import yaml # type: ignore[import]
except Exception: # pragma: no cover - Fallback, falls PyYAML nicht installiert ist
yaml = None # type: ignore[assignment]
@lru_cache
def _get_scoring_weights() -> Tuple[float, float, float]:
"""Liefert (semantic_weight, edge_weight, centrality_weight) für den Retriever.
Priorität:
1. Werte aus config/retriever.yaml (falls vorhanden und gültig),
Abschnitt:
scoring:
semantic_weight: 1.0
edge_weight: 0.5
centrality_weight: 0.5
2. Fallback auf Settings.RETRIEVER_W_* (ENV-basiert).
"""
settings = get_settings()
sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0))
edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0))
cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0))
# YAML-Override, falls konfiguriert
config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml")
if yaml is None:
return sem, edge, cent
try:
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
scoring = data.get("scoring", {}) or {}
sem = float(scoring.get("semantic_weight", sem))
edge = float(scoring.get("edge_weight", edge))
cent = float(scoring.get("centrality_weight", cent))
except Exception:
# Bei Fehlern in der YAML-Konfiguration defensiv auf Defaults zurückfallen
return sem, edge, cent
return sem, edge, cent
def _get_client_and_prefix() -> Tuple[Any, str]:
"""Liefert (QdrantClient, prefix) basierend auf QdrantConfig.from_env()."""
cfg = qdr.QdrantConfig.from_env()
client = qdr.get_client(cfg)
return client, cfg.prefix
def _get_query_vector(req: QueryRequest) -> List[float]:
"""
Liefert den Query-Vektor aus dem Request.
- Falls req.query_vector gesetzt ist, wird dieser unverändert genutzt.
- Andernfalls wird req.query über den Embedding-Service in einen Vektor
transformiert.
"""
if req.query_vector:
return list(req.query_vector)
if not req.query:
raise ValueError("QueryRequest benötigt entweder query oder query_vector")
settings = get_settings()
model_name = settings.MODEL_NAME
# Kompatibel mit Fakes in Unit-Tests (ohne model_name-Parameter)
try:
return ec.embed_text(req.query, model_name=model_name) # type: ignore[call-arg]
except TypeError:
# Fallback: einfache Signatur embed_text(text)
return ec.embed_text(req.query) # type: ignore[call-arg]
def _semantic_hits(
client: Any,
prefix: str,
vector: List[float],
top_k: int,
filters: Dict[str, Any] | None = None,
) -> List[Tuple[str, float, Dict[str, Any]]]:
"""Führt eine semantische Suche über mindnet_chunks aus und liefert Roh-Treffer.
Rückgabeformat: Liste von (point_id, score, payload)
Erwartetes Format von qp.search_chunks_by_vector:
List[Tuple[str, float, dict]]
"""
flt = filters or None
raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt)
results: List[Tuple[str, float, Dict[str, Any]]] = []
for pid, score, payload in raw_hits:
results.append((str(pid), float(score), dict(payload or {})))
return results
def _get_semantic_score_and_payload(
hit: Tuple[str, float, Dict[str, Any]]
) -> Tuple[float, Dict[str, Any]]:
"""Extrahiert semantic_score und Payload aus einem Raw-Hit."""
_, score, payload = hit
return float(score), payload or {}
def _compute_total_score(
semantic_score: float,
payload: Dict[str, Any],
edge_bonus: float = 0.0,
cent_bonus: float = 0.0,
) -> Tuple[float, float, float]:
"""Berechnet total_score aus semantic_score, retriever_weight und Graph-Boni.
Formel (WP-04, konfigurierbar über config/retriever.yaml bzw. ENV):
total_score = W_sem * semantic_score * max(retriever_weight, 0.0)
+ W_edge * edge_bonus
+ W_cent * cent_bonus
- retriever_weight stammt aus dem Chunk-Payload (types.yaml).
- W_sem / W_edge / W_cent kommen aus _get_scoring_weights().
"""
raw_weight = payload.get("retriever_weight", 1.0)
try:
weight = float(raw_weight)
except (TypeError, ValueError):
weight = 1.0
if weight < 0.0:
weight = 0.0
sem_w, edge_w, cent_w = _get_scoring_weights()
total = (sem_w * float(semantic_score) * weight) + (edge_w * edge_bonus) + (cent_w * cent_bonus)
return float(total), float(edge_bonus), float(cent_bonus)
def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]:
"""Extrahiert depth und edge_types aus req.expand, falls vorhanden.
- Falls expand nicht gesetzt ist: depth=0, edge_types=None (keine Expansion).
- Unterstützt sowohl Pydantic-Modelle als auch plain dicts.
"""
expand = getattr(req, "expand", None)
if not expand:
return 0, None
depth = 1
edge_types: List[str] | None = None
# Pydantic-Modell oder Objekt mit Attributen
if hasattr(expand, "depth") or hasattr(expand, "edge_types"):
depth = int(getattr(expand, "depth", 1) or 1)
types_val = getattr(expand, "edge_types", None)
if types_val:
edge_types = list(types_val)
return depth, edge_types
# Plain dict
if isinstance(expand, dict):
if "depth" in expand:
depth = int(expand.get("depth") or 1)
if "edge_types" in expand and expand["edge_types"] is not None:
edge_types = list(expand["edge_types"])
return depth, edge_types
return 0, None
def _build_hits_from_semantic(
hits: Iterable[Tuple[str, float, Dict[str, Any]]],
top_k: int,
used_mode: str,
subgraph: ga.Subgraph | None = None,
) -> QueryResponse:
"""Baut aus Raw-Hits und optionalem Subgraph strukturierte QueryHits.
- Aggregation auf Note-Ebene (note_id),
- Berechnung von total_score unter Nutzung von retriever_weight + Graph-Boni,
- Rückgabe als QueryResponse mit Latenz.
"""
t0 = time.time()
enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = []
for pid, semantic_score, payload in hits:
# Graph-Scores, falls Subgraph und stabiler Key vorhanden
edge_bonus = 0.0
cent_bonus = 0.0
if subgraph is not None:
node_key = payload.get("chunk_id") or payload.get("note_id")
if node_key:
try:
edge_bonus = float(subgraph.edge_bonus(node_key))
except Exception:
edge_bonus = 0.0
try:
cent_bonus = float(subgraph.centrality_bonus(node_key))
except Exception:
cent_bonus = 0.0
total, edge_bonus, cent_bonus = _compute_total_score(
semantic_score,
payload,
edge_bonus=edge_bonus,
cent_bonus=cent_bonus,
)
enriched.append((pid, float(semantic_score), payload, total, edge_bonus, cent_bonus))
# Sortierung nach total_score absteigend
enriched_sorted = sorted(enriched, key=lambda h: h[3], reverse=True)
limited = enriched_sorted[: max(1, top_k)]
results: List[QueryHit] = []
for pid, semantic_score, payload, total, edge_bonus, cent_bonus in limited:
note_id = payload.get("note_id")
path = payload.get("path")
# mindnet_chunks: section (aktuell genutzt); ältere Stände nutzen ggf. section_title
section = payload.get("section") or payload.get("section_title")
results.append(
QueryHit(
node_id=str(pid),
note_id=note_id,
semantic_score=float(semantic_score),
edge_bonus=edge_bonus,
centrality_bonus=cent_bonus,
total_score=total,
paths=None,
source={
"path": path,
"section": section,
},
)
)
dt = int((time.time() - t0) * 1000)
return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt)
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""Reiner semantischer Retriever (ohne Edge-Expansion)."""
client, prefix = _get_client_and_prefix()
vector = _get_query_vector(req)
top_k = req.top_k or get_settings().RETRIEVER_TOP_K
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None)
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
"""Hybrid-Retriever: semantische Suche + optionale Edge-Expansion.
Aktueller Stand (Step 3):
- Semantische Suche über mindnet_chunks (wie semantic_retrieve),
- Bei expand.depth > 0 wird ein Subgraph aus mindnet_edges aufgebaut,
und edge_bonus / centrality_bonus in das Scoring einbezogen.
"""
client, prefix = _get_client_and_prefix()
# Query-Vektor:
# - Falls explizit gesetzt, unverändert nutzen.
# - Andernfalls über embed_text erzeugen.
if req.query_vector:
vector = list(req.query_vector)
else:
vector = _get_query_vector(req)
top_k = req.top_k or get_settings().RETRIEVER_TOP_K
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
depth, edge_types = _extract_expand_options(req)
subgraph: ga.Subgraph | None = None
if depth and depth > 0:
# Seeds: stabile IDs aus dem Payload (chunk_id bevorzugt, sonst note_id)
seed_ids: List[str] = []
for _pid, _score, payload in hits:
key = payload.get("chunk_id") or payload.get("note_id")
if key and key not in seed_ids:
seed_ids.append(key)
if seed_ids:
try:
subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=edge_types)
except Exception:
# Edge-Expansion ist optional: bei Fehlern weiter ohne Graph-Boni
subgraph = None
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="hybrid", subgraph=subgraph)