All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
161 lines
4.9 KiB
Python
161 lines
4.9 KiB
Python
"""
|
||
app/core/retriever.py — Semantischer/Hybrid-Retriever (WP-04 / Step 4a, Schritt 1)
|
||
|
||
Aktueller Stand:
|
||
- Reine Chunk-Vektorsuche gegen *_chunks (Qdrant)
|
||
- Zwei Modi:
|
||
- semantic_retrieve: used_mode = "semantic"
|
||
- hybrid_retrieve: used_mode = "hybrid" (aktuell gleiche Kandidatenliste)
|
||
- Noch keine Edge-Expansion, kein retriever_weight
|
||
|
||
Wichtige Design-Entscheidung:
|
||
- Wir importieren die benötigten Funktionen NICHT direkt,
|
||
sondern die Module (qdr, qp, ec). Dadurch funktionieren
|
||
Monkeypatches in den Tests (monkeypatch.setattr(qp, "..."), etc.).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import time
|
||
from typing import Any, Dict, List, Tuple
|
||
|
||
from app.config import get_settings
|
||
from app.models.dto import QueryRequest, QueryResponse, QueryHit
|
||
import app.core.qdrant as qdr
|
||
import app.core.qdrant_points as qp
|
||
import app.services.embeddings_client as ec
|
||
|
||
|
||
def _get_client_and_prefix() -> Tuple[Any, str]:
|
||
"""
|
||
Liefert (QdrantClient, prefix) basierend auf QdrantConfig.from_env().
|
||
"""
|
||
cfg = qdr.QdrantConfig.from_env()
|
||
client = qdr.get_client(cfg)
|
||
return client, cfg.prefix
|
||
|
||
|
||
def _get_query_vector(req: QueryRequest) -> List[float]:
|
||
"""
|
||
Liefert den Query-Vektor aus dem Request.
|
||
|
||
- Falls req.query_vector gesetzt ist, wird dieser unverändert genutzt.
|
||
- Falls req.query (Text) gesetzt ist, wird ec.embed_text(req.query) aufgerufen.
|
||
- Andernfalls: ValueError.
|
||
"""
|
||
if req.query_vector is not None:
|
||
if not isinstance(req.query_vector, list):
|
||
raise ValueError("query_vector muss eine Liste von floats sein")
|
||
return req.query_vector
|
||
|
||
if req.query:
|
||
return ec.embed_text(req.query)
|
||
|
||
raise ValueError("Weder query_vector noch query gesetzt – mindestens eines ist erforderlich")
|
||
|
||
|
||
def _semantic_hits(
|
||
client: Any,
|
||
prefix: str,
|
||
vector: List[float],
|
||
top_k: int,
|
||
filters: Dict | None,
|
||
):
|
||
"""
|
||
Kapselt den Aufruf von qp.search_chunks_by_vector.
|
||
|
||
Rückgabeformat laut qdrant_points.search_chunks_by_vector:
|
||
[
|
||
(point_id: str, score: float, payload: dict),
|
||
...
|
||
]
|
||
"""
|
||
flt = filters or None
|
||
hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt)
|
||
return hits
|
||
|
||
|
||
def _build_hits_from_semantic(
|
||
hits: List[Tuple[str, float, Dict[str, Any]]],
|
||
top_k: int,
|
||
used_mode: str,
|
||
) -> QueryResponse:
|
||
"""
|
||
Formt rohe Treffer in QueryResponse um.
|
||
|
||
Aktueller Step-1-Stand:
|
||
- edge_bonus = 0.0
|
||
- centrality_bonus = 0.0
|
||
- total_score = semantic_score
|
||
"""
|
||
t0 = time.time()
|
||
# defensiv sortieren, auch wenn Qdrant selbst sortiert
|
||
sorted_hits = sorted(hits, key=lambda h: float(h[1]), reverse=True)
|
||
limited = sorted_hits[: max(1, top_k)]
|
||
|
||
results: List[QueryHit] = []
|
||
for pid, semantic_score, payload in limited:
|
||
note_id = payload.get("note_id")
|
||
path = payload.get("path")
|
||
section = payload.get("section_title")
|
||
|
||
edge_bonus = 0.0
|
||
cent_bonus = 0.0
|
||
total = float(semantic_score) + edge_bonus + cent_bonus
|
||
|
||
results.append(
|
||
QueryHit(
|
||
node_id=str(pid),
|
||
note_id=note_id,
|
||
semantic_score=float(semantic_score),
|
||
edge_bonus=edge_bonus,
|
||
centrality_bonus=cent_bonus,
|
||
total_score=total,
|
||
paths=None, # Edge-Paths kommen in einem späteren Schritt hinzu
|
||
source={"path": path, "section": section},
|
||
)
|
||
)
|
||
|
||
dt = int((time.time() - t0) * 1000)
|
||
return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt)
|
||
|
||
|
||
def _resolve_top_k(req: QueryRequest) -> int:
|
||
"""
|
||
Ermittelt ein sinnvolles top_k:
|
||
|
||
- bevorzugt req.top_k, falls > 0
|
||
- sonst Settings.RETRIEVER_TOP_K (Default 10)
|
||
"""
|
||
if isinstance(req.top_k, int) and req.top_k > 0:
|
||
return req.top_k
|
||
s = get_settings()
|
||
return max(1, int(getattr(s, "RETRIEVER_TOP_K", 10)))
|
||
|
||
|
||
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
|
||
"""
|
||
Reiner semantischer Retriever (ohne Edge-Expansion).
|
||
"""
|
||
top_k = _resolve_top_k(req)
|
||
vector = _get_query_vector(req)
|
||
client, prefix = _get_client_and_prefix()
|
||
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
|
||
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic")
|
||
|
||
|
||
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
|
||
"""
|
||
Hybrid-Retriever.
|
||
|
||
Step-1-Implementierung:
|
||
- nutzt die gleiche semantische Kandidatenliste wie semantic_retrieve
|
||
- setzt lediglich used_mode = "hybrid"
|
||
- Edge-Expansion & Score-Modifikationen folgen in den nächsten Schritten.
|
||
"""
|
||
top_k = _resolve_top_k(req)
|
||
vector = _get_query_vector(req)
|
||
client, prefix = _get_client_and_prefix()
|
||
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
|
||
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="hybrid")
|