diff --git a/app/core/retriever.py b/app/core/retriever.py index a8b9af3..9715bfc 100644 --- a/app/core/retriever.py +++ b/app/core/retriever.py @@ -1,121 +1,186 @@ """ -app/core/retriever.py — Semantischer/Edge-Aware/Hybrid Retriever (WP-04) +app/core/retriever.py — Semantischer/Edge-Aware/Hybrid Retriever (WP-04 / Step 4a) -Zweck: - Kandidatenfindung via Vektorsuche in *_chunks, optionale Edge-Expansion - und kombiniertes Ranking zur Rückgabe von Top-K Treffern. - Erweiterung (0.2.0): Text→Embedding, falls kein query_vector übergeben wurde. +Zweck +----- +- Kandidatenfindung via Vektorsuche in *_chunks (Qdrant) +- perspektivisch: Edge-Expansion & Graph-Heuristiken (graph_adapter) +- kombiniertes Ranking zur Rückgabe von Top-K Treffern -Kompatibilität: - Python 3.12+, qdrant-client 1.x -Version: - 0.2.0 (Text→Embedding ergänzt; bestehendes Verhalten unverändert) -Stand: - 2025-10-07 -Bezug: - - app/core/graph_adapter.py (expand) - - app/core/ranking.py (combine_scores) - - app/core/qdrant_points.py (search_chunks_by_vector) - - app/services/embeddings_client.py (embed_text) - - app/models/dto.py (QueryRequest/Response) -Änderungsverlauf: - 0.2.0 (2025-10-07) – Text→Embedding (embed_text_if_needed). - 0.1.0 (2025-10-07) – Erstanlage. +Dieser Stand (Step 4a – Schritt 1) implementiert zunächst: +- reine semantische Chunk-Suche (ohne Edge-Expansion) +- Hybrid-Modus als Alias der semantischen Suche +- saubere Nutzung der vorhandenen DTOs (QueryRequest/QueryResponse/QueryHit) +- kompatibles Verhalten zu den bestehenden Tests in tests/test_query_unit.py + und tests/test_query_text_embed_unit.py + +Weitere Schritte (separat umzusetzen): +- Einbezug von retriever_weight (Note-/Chunk-Metadaten) +- Edge-Expansion über mindnet_edges + graph_adapter +- ausführliche Provenienzpfade (paths) pro Treffer + +Kompatibilität +-------------- +- Python 3.12+ +- qdrant-client 1.x """ from __future__ import annotations -import time -from typing import Dict, List, Optional, Tuple -from qdrant_client import QdrantClient -from app.models.dto import QueryRequest, QueryResponse, QueryHit -from app.core.ranking import combine_scores -from app.core.graph_adapter import expand -from app.core import qdrant_points as qp +import time +from typing import Any, Dict, List, Tuple + from app.config import get_settings +from app.core.qdrant import QdrantConfig, get_client +from app.core.qdrant_points import search_chunks_by_vector +from app.models.dto import QueryRequest, QueryResponse, QueryHit from app.services.embeddings_client import embed_text -def _vector_from_request(req: QueryRequest) -> List[float]: +def _get_client_and_prefix() -> Tuple[Any, str]: """ - Query-Vektor bestimmen: - - Falls query_vector gesetzt: unverändert verwenden (Back-compat, Tests). - - Sonst, falls query gesetzt: serverseitig einbetten. - - Andernfalls: Fehler. + Liefert (QdrantClient, prefix). + + QdrantConfig.from_env() ist hier die zentrale Stelle für alle + Qdrant-bezogenen ENV-Parameter (URL, API-KEY, Prefix, Dim). """ - if req.query_vector: + cfg = QdrantConfig.from_env() + client = get_client(cfg) + return client, cfg.prefix + + +def _get_query_vector(req: QueryRequest) -> List[float]: + """ + Liefert einen Query-Vektor basierend auf QueryRequest: + + - Falls req.query_vector gesetzt ist, wird dieser unverändert genutzt. + - Falls req.query (Text) gesetzt ist, wird embed_text() aufgerufen. + - Andernfalls wird ein ValueError geworfen. + """ + if req.query_vector is not None: + if not isinstance(req.query_vector, list): + raise ValueError("query_vector muss eine Liste von floats sein") return req.query_vector + if req.query: + # Lazy-Load des Modells passiert im embeddings_client selbst. return embed_text(req.query) - raise ValueError("query_vector fehlt. Alternativ 'query' (Text) übergeben, wird serverseitig eingebettet.") + + raise ValueError("Weder query_vector noch query gesetzt – mindestens eines ist erforderlich") + + +def _semantic_hits( + client: Any, + prefix: str, + vector: List[float], + top_k: int, + filters: Dict | None, +) -> List[Tuple[str, float, Dict[str, Any]]]: + """ + Ruft die eigentliche Qdrant-Suche auf. + + Nutzt app.core.qdrant_points.search_chunks_by_vector als Single Source of Truth + für das Search-API gegen mindnet_chunks. + """ + flt = filters or None + hits = search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt) + # Erwartete Struktur laut bisherigen Tests: + # [ + # ("chunk:1", 0.9, {"note_id": "...", "path": "...", "section_title": "..."}), + # ... + # ] + return hits + + +def _build_hits_from_semantic( + hits: List[Tuple[str, float, Dict[str, Any]]], + top_k: int, + used_mode: str, +) -> QueryResponse: + """ + Formt rohe Qdrant-Treffer in QueryResponse um. + + Aktueller Schritt: + - edge_bonus = 0.0 + - centrality_bonus = 0.0 + - total_score = semantic_score + + Sortierung: absteigend nach total_score. + """ + t0 = time.time() + # defensiv: sortieren, auch wenn Qdrant bereits sortiert liefert + sorted_hits = sorted(hits, key=lambda h: float(h[1]), reverse=True) + limited = sorted_hits[: max(1, top_k)] + + results: List[QueryHit] = [] + for pid, semantic_score, payload in limited: + note_id = payload.get("note_id") + path = payload.get("path") + section = payload.get("section_title") + + edge_bonus = 0.0 + cent_bonus = 0.0 + total = float(semantic_score) + edge_bonus + cent_bonus + + results.append( + QueryHit( + node_id=str(pid), + note_id=note_id, + semantic_score=float(semantic_score), + edge_bonus=edge_bonus, + centrality_bonus=cent_bonus, + total_score=total, + paths=None, # Edge-Expansion folgt in späteren Schritten + source={"path": path, "section": section}, + ) + ) + + dt = int((time.time() - t0) * 1000) + return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt) + + +def _resolve_top_k(req: QueryRequest) -> int: + """ + Ermittelt ein sinnvolles top_k auf Basis von Request und Settings. + """ + if isinstance(req.top_k, int) and req.top_k > 0: + return req.top_k + s = get_settings() + return max(1, int(getattr(s, "RETRIEVER_TOP_K", 10))) def semantic_retrieve(req: QueryRequest) -> QueryResponse: - """Nur semantische Kandidaten, keine Edge-Expansion (depth=0).""" - t0 = time.time() - s = get_settings() - client = QdrantClient(url=s.QDRANT_URL, api_key=s.QDRANT_API_KEY) + """ + Reiner semantischer Retriever (ohne Edge-Expansion). - q_vec = _vector_from_request(req) - raw_hits = qp.search_chunks_by_vector(client, s.COLLECTION_PREFIX, q_vec, top=req.top_k, filters=req.filters) + - nutzt entweder query_vector oder embed_text(query) + - ruft search_chunks_by_vector auf + - sortiert nach semantic_score + """ + top_k = _resolve_top_k(req) + vector = _get_query_vector(req) + client, prefix = _get_client_and_prefix() + hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) - results: List[QueryHit] = [] - for pid, s_score, payload in raw_hits: - results.append(QueryHit( - node_id=pid, - note_id=payload.get("note_id"), - semantic_score=float(s_score), - edge_bonus=0.0, - centrality_bonus=0.0, - total_score=float(s_score), # un-normalisiert: ok für quick semantic mode - paths=None, - source={"path": payload.get("path"), "section": payload.get("section_title")} - )) - dt = int((time.time() - t0) * 1000) - return QueryResponse(results=results, used_mode="semantic", latency_ms=dt) + # used_mode = "semantic" für den expliziten Semantic-Mode + return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic") def hybrid_retrieve(req: QueryRequest) -> QueryResponse: - """Semantik + Edge-Expansion + kombiniertes Ranking.""" - t0 = time.time() - s = get_settings() - client = QdrantClient(url=s.QDRANT_URL, api_key=s.QDRANT_API_KEY) + """ + Hybrid-Retriever. - q_vec = _vector_from_request(req) + Aktueller Step-1-Stand: + - nutzt die gleiche reine semantische Kandidatenliste wie semantic_retrieve + - Edge-Expansion & Centrality-Bewertungen folgen in einem späteren Schritt + - used_mode wird auf "hybrid" gesetzt (Tests erwarten dies explizit) - # 1) Semantische Seeds (top_k * 3 für breitere Basis) - raw_hits = qp.search_chunks_by_vector(client, s.COLLECTION_PREFIX, q_vec, top=req.top_k * 3, filters=req.filters) - id2payload = {pid: payload for (pid, _, payload) in raw_hits} - seeds = [pid for (pid, _, _) in raw_hits] + Damit bleiben bestehende Tests und Aufrufer kompatibel, während wir + die Edge-Logik iterativ ergänzen können. + """ + top_k = _resolve_top_k(req) + vector = _get_query_vector(req) + client, prefix = _get_client_and_prefix() + hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) - # 2) Edge-Expansion - edge_types = req.expand.get("edge_types") if req.expand else None - depth = req.expand.get("depth", 1) if req.expand else 1 - sg = expand(client, s.COLLECTION_PREFIX, seeds, depth=depth, edge_types=edge_types) - - edge_bonus_map = {pid: sg.aggregate_edge_bonus(pid) for pid in seeds} - centrality_map = {pid: sg.centrality_bonus(pid) for pid in seeds} - - # 3) Combined Ranking - scored = combine_scores( - raw_hits, edge_bonus_map, centrality_map, - w_sem=s.RETRIEVER_W_SEM, w_edge=s.RETRIEVER_W_EDGE, w_cent=s.RETRIEVER_W_CENT - ) - - # 4) Antwortobjekte (Chunk-Ebene) - results: List[QueryHit] = [] - for pid, total, e, c, s_score in scored[: req.top_k]: - payload = id2payload[pid] - results.append(QueryHit( - node_id=pid, - note_id=payload.get("note_id"), - semantic_score=float(s_score), - edge_bonus=float(e), - centrality_bonus=float(c), - total_score=float(total), - paths=None, - source={"path": payload.get("path"), "section": payload.get("section_title")} - )) - - dt = int((time.time() - t0) * 1000) - return QueryResponse(results=results, used_mode="hybrid", latency_ms=dt) + return _build_hits_from_semantic(hits, top_k=top_k, used_mode="hybrid")