""" app/core/retriever.py — Semantischer/Edge-Aware/Hybrid Retriever (WP-04) Zweck: Kandidatenfindung via Vektorsuche in *_chunks, optionale Edge-Expansion und kombiniertes Ranking zur Rückgabe von Top-K Treffern. Erweiterung (0.2.0): Text→Embedding, falls kein query_vector übergeben wurde. Kompatibilität: Python 3.12+, qdrant-client 1.x Version: 0.2.0 (Text→Embedding ergänzt; bestehendes Verhalten unverändert) Stand: 2025-10-07 Bezug: - app/core/graph_adapter.py (expand) - app/core/ranking.py (combine_scores) - app/core/qdrant_points.py (search_chunks_by_vector) - app/services/embeddings_client.py (embed_text) - app/models/dto.py (QueryRequest/Response) Änderungsverlauf: 0.2.0 (2025-10-07) – Text→Embedding (embed_text_if_needed). 0.1.0 (2025-10-07) – Erstanlage. """ from __future__ import annotations import time from typing import Dict, List, Optional, Tuple from qdrant_client import QdrantClient from app.models.dto import QueryRequest, QueryResponse, QueryHit from app.core.ranking import combine_scores from app.core.graph_adapter import expand from app.core import qdrant_points as qp from app.config import get_settings from app.services.embeddings_client import embed_text def _vector_from_request(req: QueryRequest) -> List[float]: """ Query-Vektor bestimmen: - Falls query_vector gesetzt: unverändert verwenden (Back-compat, Tests). - Sonst, falls query gesetzt: serverseitig einbetten. - Andernfalls: Fehler. """ if req.query_vector: return req.query_vector if req.query: return embed_text(req.query) raise ValueError("query_vector fehlt. Alternativ 'query' (Text) übergeben, wird serverseitig eingebettet.") def semantic_retrieve(req: QueryRequest) -> QueryResponse: """Nur semantische Kandidaten, keine Edge-Expansion (depth=0).""" t0 = time.time() s = get_settings() client = QdrantClient(url=s.QDRANT_URL, api_key=s.QDRANT_API_KEY) q_vec = _vector_from_request(req) raw_hits = qp.search_chunks_by_vector(client, s.COLLECTION_PREFIX, q_vec, top=req.top_k, filters=req.filters) results: List[QueryHit] = [] for pid, s_score, payload in raw_hits: results.append(QueryHit( node_id=pid, note_id=payload.get("note_id"), semantic_score=float(s_score), edge_bonus=0.0, centrality_bonus=0.0, total_score=float(s_score), # un-normalisiert: ok für quick semantic mode paths=None, source={"path": payload.get("path"), "section": payload.get("section_title")} )) dt = int((time.time() - t0) * 1000) return QueryResponse(results=results, used_mode="semantic", latency_ms=dt) def hybrid_retrieve(req: QueryRequest) -> QueryResponse: """Semantik + Edge-Expansion + kombiniertes Ranking.""" t0 = time.time() s = get_settings() client = QdrantClient(url=s.QDRANT_URL, api_key=s.QDRANT_API_KEY) q_vec = _vector_from_request(req) # 1) Semantische Seeds (top_k * 3 für breitere Basis) raw_hits = qp.search_chunks_by_vector(client, s.COLLECTION_PREFIX, q_vec, top=req.top_k * 3, filters=req.filters) id2payload = {pid: payload for (pid, _, payload) in raw_hits} seeds = [pid for (pid, _, _) in raw_hits] # 2) Edge-Expansion edge_types = req.expand.get("edge_types") if req.expand else None depth = req.expand.get("depth", 1) if req.expand else 1 sg = expand(client, s.COLLECTION_PREFIX, seeds, depth=depth, edge_types=edge_types) edge_bonus_map = {pid: sg.aggregate_edge_bonus(pid) for pid in seeds} centrality_map = {pid: sg.centrality_bonus(pid) for pid in seeds} # 3) Combined Ranking scored = combine_scores( raw_hits, edge_bonus_map, centrality_map, w_sem=s.RETRIEVER_W_SEM, w_edge=s.RETRIEVER_W_EDGE, w_cent=s.RETRIEVER_W_CENT ) # 4) Antwortobjekte (Chunk-Ebene) results: List[QueryHit] = [] for pid, total, e, c, s_score in scored[: req.top_k]: payload = id2payload[pid] results.append(QueryHit( node_id=pid, note_id=payload.get("note_id"), semantic_score=float(s_score), edge_bonus=float(e), centrality_bonus=float(c), total_score=float(total), paths=None, source={"path": payload.get("path"), "section": payload.get("section_title")} )) dt = int((time.time() - t0) * 1000) return QueryResponse(results=results, used_mode="hybrid", latency_ms=dt)