mindnet/app/core/retriever.py
Lars c67eb4b2f3
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 6s
app/core/retriever.py aktualisiert
2025-12-02 17:31:49 +01:00

187 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
app/core/retriever.py — Semantischer/Edge-Aware/Hybrid Retriever (WP-04 / Step 4a)
Zweck
-----
- Kandidatenfindung via Vektorsuche in *_chunks (Qdrant)
- perspektivisch: Edge-Expansion & Graph-Heuristiken (graph_adapter)
- kombiniertes Ranking zur Rückgabe von Top-K Treffern
Dieser Stand (Step 4a Schritt 1) implementiert zunächst:
- reine semantische Chunk-Suche (ohne Edge-Expansion)
- Hybrid-Modus als Alias der semantischen Suche
- saubere Nutzung der vorhandenen DTOs (QueryRequest/QueryResponse/QueryHit)
- kompatibles Verhalten zu den bestehenden Tests in tests/test_query_unit.py
und tests/test_query_text_embed_unit.py
Weitere Schritte (separat umzusetzen):
- Einbezug von retriever_weight (Note-/Chunk-Metadaten)
- Edge-Expansion über mindnet_edges + graph_adapter
- ausführliche Provenienzpfade (paths) pro Treffer
Kompatibilität
--------------
- Python 3.12+
- qdrant-client 1.x
"""
from __future__ import annotations
import time
from typing import Any, Dict, List, Tuple
from app.config import get_settings
from app.core.qdrant import QdrantConfig, get_client
from app.core.qdrant_points import search_chunks_by_vector
from app.models.dto import QueryRequest, QueryResponse, QueryHit
from app.services.embeddings_client import embed_text
def _get_client_and_prefix() -> Tuple[Any, str]:
"""
Liefert (QdrantClient, prefix).
QdrantConfig.from_env() ist hier die zentrale Stelle für alle
Qdrant-bezogenen ENV-Parameter (URL, API-KEY, Prefix, Dim).
"""
cfg = QdrantConfig.from_env()
client = get_client(cfg)
return client, cfg.prefix
def _get_query_vector(req: QueryRequest) -> List[float]:
"""
Liefert einen Query-Vektor basierend auf QueryRequest:
- Falls req.query_vector gesetzt ist, wird dieser unverändert genutzt.
- Falls req.query (Text) gesetzt ist, wird embed_text() aufgerufen.
- Andernfalls wird ein ValueError geworfen.
"""
if req.query_vector is not None:
if not isinstance(req.query_vector, list):
raise ValueError("query_vector muss eine Liste von floats sein")
return req.query_vector
if req.query:
# Lazy-Load des Modells passiert im embeddings_client selbst.
return embed_text(req.query)
raise ValueError("Weder query_vector noch query gesetzt mindestens eines ist erforderlich")
def _semantic_hits(
client: Any,
prefix: str,
vector: List[float],
top_k: int,
filters: Dict | None,
) -> List[Tuple[str, float, Dict[str, Any]]]:
"""
Ruft die eigentliche Qdrant-Suche auf.
Nutzt app.core.qdrant_points.search_chunks_by_vector als Single Source of Truth
für das Search-API gegen mindnet_chunks.
"""
flt = filters or None
hits = search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt)
# Erwartete Struktur laut bisherigen Tests:
# [
# ("chunk:1", 0.9, {"note_id": "...", "path": "...", "section_title": "..."}),
# ...
# ]
return hits
def _build_hits_from_semantic(
hits: List[Tuple[str, float, Dict[str, Any]]],
top_k: int,
used_mode: str,
) -> QueryResponse:
"""
Formt rohe Qdrant-Treffer in QueryResponse um.
Aktueller Schritt:
- edge_bonus = 0.0
- centrality_bonus = 0.0
- total_score = semantic_score
Sortierung: absteigend nach total_score.
"""
t0 = time.time()
# defensiv: sortieren, auch wenn Qdrant bereits sortiert liefert
sorted_hits = sorted(hits, key=lambda h: float(h[1]), reverse=True)
limited = sorted_hits[: max(1, top_k)]
results: List[QueryHit] = []
for pid, semantic_score, payload in limited:
note_id = payload.get("note_id")
path = payload.get("path")
section = payload.get("section_title")
edge_bonus = 0.0
cent_bonus = 0.0
total = float(semantic_score) + edge_bonus + cent_bonus
results.append(
QueryHit(
node_id=str(pid),
note_id=note_id,
semantic_score=float(semantic_score),
edge_bonus=edge_bonus,
centrality_bonus=cent_bonus,
total_score=total,
paths=None, # Edge-Expansion folgt in späteren Schritten
source={"path": path, "section": section},
)
)
dt = int((time.time() - t0) * 1000)
return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt)
def _resolve_top_k(req: QueryRequest) -> int:
"""
Ermittelt ein sinnvolles top_k auf Basis von Request und Settings.
"""
if isinstance(req.top_k, int) and req.top_k > 0:
return req.top_k
s = get_settings()
return max(1, int(getattr(s, "RETRIEVER_TOP_K", 10)))
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""
Reiner semantischer Retriever (ohne Edge-Expansion).
- nutzt entweder query_vector oder embed_text(query)
- ruft search_chunks_by_vector auf
- sortiert nach semantic_score
"""
top_k = _resolve_top_k(req)
vector = _get_query_vector(req)
client, prefix = _get_client_and_prefix()
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
# used_mode = "semantic" für den expliziten Semantic-Mode
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic")
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
"""
Hybrid-Retriever.
Aktueller Step-1-Stand:
- nutzt die gleiche reine semantische Kandidatenliste wie semantic_retrieve
- Edge-Expansion & Centrality-Bewertungen folgen in einem späteren Schritt
- used_mode wird auf "hybrid" gesetzt (Tests erwarten dies explizit)
Damit bleiben bestehende Tests und Aufrufer kompatibel, während wir
die Edge-Logik iterativ ergänzen können.
"""
top_k = _resolve_top_k(req)
vector = _get_query_vector(req)
client, prefix = _get_client_and_prefix()
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="hybrid")