app/core/retriever.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 6s

This commit is contained in:
Lars 2025-12-02 17:31:49 +01:00
parent fafa74b0b8
commit c67eb4b2f3

View File

@ -1,121 +1,186 @@
"""
app/core/retriever.py Semantischer/Edge-Aware/Hybrid Retriever (WP-04)
app/core/retriever.py Semantischer/Edge-Aware/Hybrid Retriever (WP-04 / Step 4a)
Zweck:
Kandidatenfindung via Vektorsuche in *_chunks, optionale Edge-Expansion
und kombiniertes Ranking zur Rückgabe von Top-K Treffern.
Erweiterung (0.2.0): TextEmbedding, falls kein query_vector übergeben wurde.
Zweck
-----
- Kandidatenfindung via Vektorsuche in *_chunks (Qdrant)
- perspektivisch: Edge-Expansion & Graph-Heuristiken (graph_adapter)
- kombiniertes Ranking zur Rückgabe von Top-K Treffern
Kompatibilität:
Python 3.12+, qdrant-client 1.x
Version:
0.2.0 (TextEmbedding ergänzt; bestehendes Verhalten unverändert)
Stand:
2025-10-07
Bezug:
- app/core/graph_adapter.py (expand)
- app/core/ranking.py (combine_scores)
- app/core/qdrant_points.py (search_chunks_by_vector)
- app/services/embeddings_client.py (embed_text)
- app/models/dto.py (QueryRequest/Response)
Änderungsverlauf:
0.2.0 (2025-10-07) TextEmbedding (embed_text_if_needed).
0.1.0 (2025-10-07) Erstanlage.
Dieser Stand (Step 4a Schritt 1) implementiert zunächst:
- reine semantische Chunk-Suche (ohne Edge-Expansion)
- Hybrid-Modus als Alias der semantischen Suche
- saubere Nutzung der vorhandenen DTOs (QueryRequest/QueryResponse/QueryHit)
- kompatibles Verhalten zu den bestehenden Tests in tests/test_query_unit.py
und tests/test_query_text_embed_unit.py
Weitere Schritte (separat umzusetzen):
- Einbezug von retriever_weight (Note-/Chunk-Metadaten)
- Edge-Expansion über mindnet_edges + graph_adapter
- ausführliche Provenienzpfade (paths) pro Treffer
Kompatibilität
--------------
- Python 3.12+
- qdrant-client 1.x
"""
from __future__ import annotations
import time
from typing import Dict, List, Optional, Tuple
from qdrant_client import QdrantClient
from app.models.dto import QueryRequest, QueryResponse, QueryHit
from app.core.ranking import combine_scores
from app.core.graph_adapter import expand
from app.core import qdrant_points as qp
import time
from typing import Any, Dict, List, Tuple
from app.config import get_settings
from app.core.qdrant import QdrantConfig, get_client
from app.core.qdrant_points import search_chunks_by_vector
from app.models.dto import QueryRequest, QueryResponse, QueryHit
from app.services.embeddings_client import embed_text
def _vector_from_request(req: QueryRequest) -> List[float]:
def _get_client_and_prefix() -> Tuple[Any, str]:
"""
Query-Vektor bestimmen:
- Falls query_vector gesetzt: unverändert verwenden (Back-compat, Tests).
- Sonst, falls query gesetzt: serverseitig einbetten.
- Andernfalls: Fehler.
Liefert (QdrantClient, prefix).
QdrantConfig.from_env() ist hier die zentrale Stelle für alle
Qdrant-bezogenen ENV-Parameter (URL, API-KEY, Prefix, Dim).
"""
if req.query_vector:
cfg = QdrantConfig.from_env()
client = get_client(cfg)
return client, cfg.prefix
def _get_query_vector(req: QueryRequest) -> List[float]:
"""
Liefert einen Query-Vektor basierend auf QueryRequest:
- Falls req.query_vector gesetzt ist, wird dieser unverändert genutzt.
- Falls req.query (Text) gesetzt ist, wird embed_text() aufgerufen.
- Andernfalls wird ein ValueError geworfen.
"""
if req.query_vector is not None:
if not isinstance(req.query_vector, list):
raise ValueError("query_vector muss eine Liste von floats sein")
return req.query_vector
if req.query:
# Lazy-Load des Modells passiert im embeddings_client selbst.
return embed_text(req.query)
raise ValueError("query_vector fehlt. Alternativ 'query' (Text) übergeben, wird serverseitig eingebettet.")
raise ValueError("Weder query_vector noch query gesetzt mindestens eines ist erforderlich")
def _semantic_hits(
client: Any,
prefix: str,
vector: List[float],
top_k: int,
filters: Dict | None,
) -> List[Tuple[str, float, Dict[str, Any]]]:
"""
Ruft die eigentliche Qdrant-Suche auf.
Nutzt app.core.qdrant_points.search_chunks_by_vector als Single Source of Truth
für das Search-API gegen mindnet_chunks.
"""
flt = filters or None
hits = search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt)
# Erwartete Struktur laut bisherigen Tests:
# [
# ("chunk:1", 0.9, {"note_id": "...", "path": "...", "section_title": "..."}),
# ...
# ]
return hits
def _build_hits_from_semantic(
hits: List[Tuple[str, float, Dict[str, Any]]],
top_k: int,
used_mode: str,
) -> QueryResponse:
"""
Formt rohe Qdrant-Treffer in QueryResponse um.
Aktueller Schritt:
- edge_bonus = 0.0
- centrality_bonus = 0.0
- total_score = semantic_score
Sortierung: absteigend nach total_score.
"""
t0 = time.time()
# defensiv: sortieren, auch wenn Qdrant bereits sortiert liefert
sorted_hits = sorted(hits, key=lambda h: float(h[1]), reverse=True)
limited = sorted_hits[: max(1, top_k)]
results: List[QueryHit] = []
for pid, semantic_score, payload in limited:
note_id = payload.get("note_id")
path = payload.get("path")
section = payload.get("section_title")
edge_bonus = 0.0
cent_bonus = 0.0
total = float(semantic_score) + edge_bonus + cent_bonus
results.append(
QueryHit(
node_id=str(pid),
note_id=note_id,
semantic_score=float(semantic_score),
edge_bonus=edge_bonus,
centrality_bonus=cent_bonus,
total_score=total,
paths=None, # Edge-Expansion folgt in späteren Schritten
source={"path": path, "section": section},
)
)
dt = int((time.time() - t0) * 1000)
return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt)
def _resolve_top_k(req: QueryRequest) -> int:
"""
Ermittelt ein sinnvolles top_k auf Basis von Request und Settings.
"""
if isinstance(req.top_k, int) and req.top_k > 0:
return req.top_k
s = get_settings()
return max(1, int(getattr(s, "RETRIEVER_TOP_K", 10)))
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""Nur semantische Kandidaten, keine Edge-Expansion (depth=0)."""
t0 = time.time()
s = get_settings()
client = QdrantClient(url=s.QDRANT_URL, api_key=s.QDRANT_API_KEY)
"""
Reiner semantischer Retriever (ohne Edge-Expansion).
q_vec = _vector_from_request(req)
raw_hits = qp.search_chunks_by_vector(client, s.COLLECTION_PREFIX, q_vec, top=req.top_k, filters=req.filters)
- nutzt entweder query_vector oder embed_text(query)
- ruft search_chunks_by_vector auf
- sortiert nach semantic_score
"""
top_k = _resolve_top_k(req)
vector = _get_query_vector(req)
client, prefix = _get_client_and_prefix()
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
results: List[QueryHit] = []
for pid, s_score, payload in raw_hits:
results.append(QueryHit(
node_id=pid,
note_id=payload.get("note_id"),
semantic_score=float(s_score),
edge_bonus=0.0,
centrality_bonus=0.0,
total_score=float(s_score), # un-normalisiert: ok für quick semantic mode
paths=None,
source={"path": payload.get("path"), "section": payload.get("section_title")}
))
dt = int((time.time() - t0) * 1000)
return QueryResponse(results=results, used_mode="semantic", latency_ms=dt)
# used_mode = "semantic" für den expliziten Semantic-Mode
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic")
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
"""Semantik + Edge-Expansion + kombiniertes Ranking."""
t0 = time.time()
s = get_settings()
client = QdrantClient(url=s.QDRANT_URL, api_key=s.QDRANT_API_KEY)
"""
Hybrid-Retriever.
q_vec = _vector_from_request(req)
Aktueller Step-1-Stand:
- nutzt die gleiche reine semantische Kandidatenliste wie semantic_retrieve
- Edge-Expansion & Centrality-Bewertungen folgen in einem späteren Schritt
- used_mode wird auf "hybrid" gesetzt (Tests erwarten dies explizit)
# 1) Semantische Seeds (top_k * 3 für breitere Basis)
raw_hits = qp.search_chunks_by_vector(client, s.COLLECTION_PREFIX, q_vec, top=req.top_k * 3, filters=req.filters)
id2payload = {pid: payload for (pid, _, payload) in raw_hits}
seeds = [pid for (pid, _, _) in raw_hits]
Damit bleiben bestehende Tests und Aufrufer kompatibel, während wir
die Edge-Logik iterativ ergänzen können.
"""
top_k = _resolve_top_k(req)
vector = _get_query_vector(req)
client, prefix = _get_client_and_prefix()
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
# 2) Edge-Expansion
edge_types = req.expand.get("edge_types") if req.expand else None
depth = req.expand.get("depth", 1) if req.expand else 1
sg = expand(client, s.COLLECTION_PREFIX, seeds, depth=depth, edge_types=edge_types)
edge_bonus_map = {pid: sg.aggregate_edge_bonus(pid) for pid in seeds}
centrality_map = {pid: sg.centrality_bonus(pid) for pid in seeds}
# 3) Combined Ranking
scored = combine_scores(
raw_hits, edge_bonus_map, centrality_map,
w_sem=s.RETRIEVER_W_SEM, w_edge=s.RETRIEVER_W_EDGE, w_cent=s.RETRIEVER_W_CENT
)
# 4) Antwortobjekte (Chunk-Ebene)
results: List[QueryHit] = []
for pid, total, e, c, s_score in scored[: req.top_k]:
payload = id2payload[pid]
results.append(QueryHit(
node_id=pid,
note_id=payload.get("note_id"),
semantic_score=float(s_score),
edge_bonus=float(e),
centrality_bonus=float(c),
total_score=float(total),
paths=None,
source={"path": payload.get("path"), "section": payload.get("section_title")}
))
dt = int((time.time() - t0) * 1000)
return QueryResponse(results=results, used_mode="hybrid", latency_ms=dt)
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="hybrid")