""" FILE: app/core/retrieval/retriever.py DESCRIPTION: Haupt-Schnittstelle für die Suche. Orchestriert Vektorsuche und Graph-Expansion. WP-15c Update: Note-Level Diversity Pooling & Super-Edge Aggregation. WP-24c v4.1.0: Gold-Standard - Scope-Awareness, Section-Filtering, Authority-Priorisierung. VERSION: 0.8.0 (WP-24c: Gold-Standard v4.1.0) STATUS: Active DEPENDENCIES: app.config, app.models.dto, app.core.database*, app.core.graph_adapter """ from __future__ import annotations import os import time import logging from typing import Any, Dict, List, Tuple, Iterable, Optional from collections import defaultdict from app.config import get_settings from app.models.dto import ( QueryRequest, QueryResponse, QueryHit, Explanation, ScoreBreakdown, Reason, EdgeDTO ) # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene import app.core.database.qdrant as qdr import app.core.database.qdrant_points as qp import app.services.embeddings_client as ec import app.core.graph.graph_subgraph as ga import app.core.graph.graph_db_adapter as gdb from app.core.graph.graph_utils import PROVENANCE_PRIORITY from qdrant_client.http import models as rest # Mathematische Engine importieren from app.core.retrieval.retriever_scoring import get_weights, compute_wp22_score logger = logging.getLogger(__name__) # ============================================================================== # 1. CORE HELPERS & CONFIG LOADERS # ============================================================================== def _get_client_and_prefix() -> Tuple[Any, str]: """Initialisiert Qdrant Client und lädt Collection-Prefix via database-Paket.""" cfg = qdr.QdrantConfig.from_env() return qdr.get_client(cfg), cfg.prefix def _get_query_vector(req: QueryRequest) -> List[float]: """ Vektorisiert die Anfrage. FIX: Enthält try-except Block für unterschiedliche Signaturen von ec.embed_text. """ if req.query_vector: return list(req.query_vector) if not req.query: raise ValueError("Kein Text oder Vektor für die Suche angegeben.") settings = get_settings() try: # Versuch mit modernem Interface (WP-03 kompatibel) return ec.embed_text(req.query, model_name=settings.MODEL_NAME) except TypeError: # Fallback für Signaturen, die 'model_name' nicht als Keyword akzeptieren logger.debug("ec.embed_text does not accept 'model_name' keyword. Falling back.") return ec.embed_text(req.query) def _get_chunk_ids_for_notes( client: Any, prefix: str, note_ids: List[str] ) -> List[str]: """ WP-24c v4.1.0: Lädt alle Chunk-IDs für gegebene Note-IDs. Wird für Scope-Aware Edge Retrieval benötigt. """ if not note_ids: return [] _, chunks_col, _ = qp._names(prefix) chunk_ids = [] try: # Filter: note_id IN note_ids note_filter = rest.Filter(should=[ rest.FieldCondition(key="note_id", match=rest.MatchValue(value=str(nid))) for nid in note_ids ]) pts, _ = client.scroll( collection_name=chunks_col, scroll_filter=note_filter, limit=2048, with_payload=True, with_vectors=False ) for pt in pts: pl = pt.payload or {} cid = pl.get("chunk_id") if cid: chunk_ids.append(str(cid)) except Exception as e: logger.warning(f"Failed to load chunk IDs for notes: {e}") return chunk_ids def _semantic_hits( client: Any, prefix: str, vector: List[float], top_k: int, filters: Optional[Dict] = None, target_section: Optional[str] = None ) -> List[Tuple[str, float, Dict[str, Any]]]: """ Führt die Vektorsuche via database-Points-Modul durch. WP-24c v4.1.0: Unterstützt optionales Section-Filtering. """ # WP-24c v4.1.0: Section-Filtering für präzise Section-Links if target_section and filters: filters = {**filters, "section": target_section} elif target_section: filters = {"section": target_section} raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=filters) # WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Protokollierung der rohen Qdrant-Antwort logger.debug(f"📊 [RAW-HITS] Qdrant lieferte {len(raw_hits)} Roh-Treffer (Top-K: {top_k})") if filters: logger.debug(f" ⚙️ [FILTER] Angewandte Filter: {filters}") # Logge die Top 3 Roh-Scores für Diagnose for i, hit in enumerate(raw_hits[:3]): hit_id = str(hit[0]) if hit else "N/A" hit_score = float(hit[1]) if hit and len(hit) > 1 else 0.0 hit_payload = dict(hit[2] or {}) if hit and len(hit) > 2 else {} hit_path = hit_payload.get('path', 'N/A') logger.debug(f" [{i+1}] ID: {hit_id} | Raw-Score: {hit_score:.4f} | Path: {hit_path}") # Strikte Typkonvertierung für Stabilität return [(str(hit[0]), float(hit[1]), dict(hit[2] or {})) for hit in raw_hits] # ============================================================================== # 2. EXPLANATION LAYER (DEBUG & VERIFIABILITY) # ============================================================================== def _build_explanation( semantic_score: float, payload: Dict[str, Any], scoring_debug: Dict[str, Any], subgraph: Optional[ga.Subgraph], target_note_id: Optional[str], applied_boosts: Optional[Dict[str, float]] = None ) -> Explanation: """ Transformiert mathematische Scores und Graph-Signale in eine menschenlesbare Erklärung. """ _, edge_w_cfg, _ = get_weights() base_val = scoring_debug["base_val"] # 1. Detaillierter mathematischer Breakdown breakdown = ScoreBreakdown( semantic_contribution=base_val, edge_contribution=base_val * scoring_debug["edge_impact_final"], centrality_contribution=base_val * scoring_debug["cent_impact_final"], raw_semantic=semantic_score, raw_edge_bonus=scoring_debug["edge_bonus"], raw_centrality=scoring_debug["cent_bonus"], node_weight=float(payload.get("retriever_weight", 1.0)), status_multiplier=scoring_debug["status_multiplier"], graph_boost_factor=scoring_debug["graph_boost_factor"] ) reasons: List[Reason] = [] edges_dto: List[EdgeDTO] = [] # 2. Gründe für Semantik hinzufügen if semantic_score > 0.85: reasons.append(Reason(kind="semantic", message="Sehr hohe textuelle Übereinstimmung.", score_impact=base_val)) elif semantic_score > 0.70: reasons.append(Reason(kind="semantic", message="Inhaltliche Übereinstimmung.", score_impact=base_val)) # 3. Gründe für Typ und Lifecycle (WP-25 Vorbereitung) type_weight = float(payload.get("retriever_weight", 1.0)) if type_weight != 1.0: msg = "Bevorzugt" if type_weight > 1.0 else "De-priorisiert" reasons.append(Reason(kind="type", message=f"{msg} durch Typ-Profil.", score_impact=base_val * (type_weight - 1.0))) # NEU: Explizite Ausweisung des Lifecycle-Status (WP-22) status_mult = scoring_debug.get("status_multiplier", 1.0) if status_mult != 1.0: status_msg = "Belohnt (Stable)" if status_mult > 1.0 else "De-priorisiert (Draft)" reasons.append(Reason( kind="status", message=f"{status_msg} durch Content-Lifecycle.", score_impact=semantic_score * (status_mult - 1.0) )) # 4. Kanten-Verarbeitung (Graph-Intelligence) if subgraph and target_note_id and scoring_debug["edge_bonus"] > 0: raw_edges = [] if hasattr(subgraph, "get_incoming_edges"): raw_edges.extend(subgraph.get_incoming_edges(target_note_id) or []) if hasattr(subgraph, "get_outgoing_edges"): raw_edges.extend(subgraph.get_outgoing_edges(target_note_id) or []) for edge in raw_edges: src = str(edge.get("source") or "note_root") tgt = str(edge.get("target") or target_note_id or "unknown_target") kind = str(edge.get("kind", "related_to")) prov = str(edge.get("provenance", "rule")) conf = float(edge.get("confidence", 1.0)) direction = "in" if tgt == target_note_id else "out" # WP-24c v4.5.10: Robuste EdgeDTO-Erstellung mit Fehlerbehandlung # Falls Provenance-Wert nicht unterstützt wird, verwende Fallback try: edge_obj = EdgeDTO( id=f"{src}->{tgt}:{kind}", kind=kind, source=src, target=tgt, weight=conf, direction=direction, provenance=prov, confidence=conf ) edges_dto.append(edge_obj) except Exception as e: # WP-24c v4.5.10: Fallback bei Validierungsfehler (z.B. alte EdgeDTO-Version im Cache) logger.warning( f"⚠️ [EDGE-DTO] Provenance '{prov}' nicht unterstützt für Edge {src}->{tgt} ({kind}). " f"Fehler: {e}. Verwende Fallback 'explicit'." ) # Fallback: Verwende 'explicit' als sicheren Default try: edge_obj = EdgeDTO( id=f"{src}->{tgt}:{kind}", kind=kind, source=src, target=tgt, weight=conf, direction=direction, provenance="explicit", # Fallback confidence=conf ) edges_dto.append(edge_obj) except Exception as e2: logger.error(f"❌ [EDGE-DTO] Auch Fallback fehlgeschlagen: {e2}. Überspringe Edge.") # Überspringe diese Kante - besser als kompletter Fehler # Die 3 wichtigsten Kanten als Begründung formulieren top_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True) for e in top_edges[:3]: peer = e.source if e.direction == "in" else e.target # WP-24c v4.5.3: Unterstütze alle explicit-Varianten (explicit, explicit:callout, etc.) prov_txt = "Bestätigte" if e.provenance and e.provenance.startswith("explicit") else "KI-basierte" boost_txt = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else "" reasons.append(Reason( kind="edge", message=f"{prov_txt} Kante '{e.kind}'{boost_txt} von/zu '{peer}'.", score_impact=edge_w_cfg * e.confidence )) if scoring_debug["cent_bonus"] > 0.01: reasons.append(Reason(kind="centrality", message="Die Notiz ist ein zentraler Informations-Hub.", score_impact=breakdown.centrality_contribution)) return Explanation( breakdown=breakdown, reasons=reasons, related_edges=edges_dto if edges_dto else None, applied_boosts=applied_boosts ) # ============================================================================== # 3. CORE RETRIEVAL PIPELINE # ============================================================================== def _build_hits_from_semantic( hits: Iterable[Tuple[str, float, Dict[str, Any]]], top_k: int, used_mode: str, subgraph: ga.Subgraph | None = None, explain: bool = False, dynamic_edge_boosts: Dict[str, float] = None ) -> QueryResponse: """ Wandelt semantische Roh-Treffer in bewertete QueryHits um. WP-15c: Implementiert Note-Level Diversity Pooling. """ t0 = time.time() enriched = [] # Erstes Scoring für alle Kandidaten for pid, semantic_score, payload in hits: edge_bonus, cent_bonus = 0.0, 0.0 target_id = payload.get("note_id") if subgraph and target_id: try: edge_bonus = float(subgraph.edge_bonus(target_id)) cent_bonus = float(subgraph.centrality_bonus(target_id)) except Exception: pass debug_data = compute_wp22_score( semantic_score, payload, edge_bonus, cent_bonus, dynamic_edge_boosts ) enriched.append((pid, semantic_score, payload, debug_data)) # 1. Sortierung nach finalem mathematischen Score enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True) # 2. WP-15c: Note-Level Diversity Pooling # Wir behalten pro note_id nur den Hit mit dem höchsten total_score. # Dies verhindert, dass 10 Chunks derselben Note andere KeyNotes verdrängen. unique_note_hits = [] seen_notes = set() for item in enriched_sorted: _, _, payload, _ = item note_id = str(payload.get("note_id", "unknown")) if note_id not in seen_notes: unique_note_hits.append(item) seen_notes.add(note_id) # 3. Begrenzung auf top_k nach dem Diversity-Pooling limited_hits = unique_note_hits[: max(1, top_k)] results: List[QueryHit] = [] for pid, s_score, pl, dbg in limited_hits: explanation_obj = None if explain: explanation_obj = _build_explanation( semantic_score=float(s_score), payload=pl, scoring_debug=dbg, subgraph=subgraph, target_note_id=pl.get("note_id"), applied_boosts=dynamic_edge_boosts ) text_content = pl.get("page_content") or pl.get("text") or pl.get("content", "[Kein Text]") # WP-24c v4.1.0: RAG-Kontext - source_chunk_id aus Edge-Payload extrahieren source_chunk_id = None if explanation_obj and explanation_obj.related_edges: # Finde die erste Edge mit chunk_id als source for edge in explanation_obj.related_edges: # Prüfe, ob source eine Chunk-ID ist (enthält # oder ist chunk_id) if edge.source and ("#" in edge.source or edge.source.startswith("chunk:")): source_chunk_id = edge.source break results.append(QueryHit( node_id=str(pid), note_id=str(pl.get("note_id", "unknown")), semantic_score=float(s_score), edge_bonus=dbg["edge_bonus"], centrality_bonus=dbg["cent_bonus"], total_score=dbg["total"], source={ "path": pl.get("path"), "section": pl.get("section") or pl.get("section_title"), "text": text_content }, payload=pl, explanation=explanation_obj, source_chunk_id=source_chunk_id # WP-24c v4.1.0: RAG-Kontext )) # WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Finale Ergebnisse latency_ms = int((time.time() - t0) * 1000) if not results: logger.warning(f"⚠️ [EMPTY] Hybride Suche lieferte 0 Ergebnisse (Latency: {latency_ms}ms)") else: logger.info(f"✨ [SUCCESS] Hybride Suche lieferte {len(results)} Treffer (Latency: {latency_ms}ms)") # Top 3 finale Scores loggen # WP-24c v4.5.4: QueryHit hat kein chunk_id Feld - verwende node_id (enthält die Chunk-ID) for i, hit in enumerate(results[:3]): chunk_id = hit.node_id # node_id ist die Chunk-ID (pid) logger.debug(f" [{i+1}] Final: Chunk={chunk_id} | Total-Score={hit.total_score:.4f} | Semantic={hit.semantic_score:.4f} | Edge={hit.edge_bonus:.4f}") return QueryResponse(results=results, used_mode=used_mode, latency_ms=latency_ms) def hybrid_retrieve(req: QueryRequest) -> QueryResponse: """ Die Haupt-Einstiegsfunktion für die hybride Suche. WP-15c: Implementiert Edge-Aggregation (Super-Kanten). WP-24c v4.5.0-DEBUG: Retrieval-Tracer für Diagnose. """ # WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Start der hybriden Suche logger.info(f"🔍 [RETRIEVAL] Starte hybride Suche") logger.info(f" -> Query: '{req.query[:100]}...' (Länge: {len(req.query)})") logger.debug(f" ⚙️ [FILTER] Request-Filter: {req.filters}") logger.debug(f" ⚙️ [FILTER] Top-K: {req.top_k}, Expand: {req.expand}, Target-Section: {req.target_section}") client, prefix = _get_client_and_prefix() vector = list(req.query_vector) if req.query_vector else _get_query_vector(req) top_k = req.top_k or 10 # 1. Semantische Seed-Suche (Wir laden etwas mehr für das Pooling) # WP-24c v4.1.0: Section-Filtering unterstützen target_section = getattr(req, "target_section", None) # WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Vor semantischer Suche logger.debug(f"🔍 [RETRIEVAL] Starte semantische Seed-Suche (Top-K: {top_k * 3}, Target-Section: {target_section})") hits = _semantic_hits(client, prefix, vector, top_k=top_k * 3, filters=req.filters, target_section=target_section) # WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Nach semantischer Suche logger.debug(f"📊 [SEED-HITS] Semantische Suche lieferte {len(hits)} Seed-Treffer") # 2. Graph Expansion Konfiguration expand_cfg = req.expand if isinstance(req.expand, dict) else {} depth = int(expand_cfg.get("depth", 1)) boost_edges = getattr(req, "boost_edges", {}) or {} subgraph: ga.Subgraph | None = None if depth > 0 and hits: # WP-24c v4.5.2: Chunk-Aware Graph Traversal # Extrahiere sowohl note_id als auch chunk_id (pid) direkt aus den Hits # Dies stellt sicher, dass Chunk-Scope Edges gefunden werden seed_note_ids = list({h[2].get("note_id") for h in hits if h[2].get("note_id")}) seed_chunk_ids = list({h[0] for h in hits if h[0]}) # pid ist die Chunk-ID # Kombiniere beide Sets für vollständige Seed-Abdeckung # Chunk-IDs können auch als Note-IDs fungieren (für Note-Scope Edges) all_seed_ids = list(set(seed_note_ids + seed_chunk_ids)) if all_seed_ids: try: # WP-24c v4.5.2: Chunk-IDs sind bereits aus Hits extrahiert # Zusätzlich können wir noch weitere Chunk-IDs für die Note-IDs laden # (für den Fall, dass nicht alle Chunks in den Top-K Hits sind) additional_chunk_ids = _get_chunk_ids_for_notes(client, prefix, seed_note_ids) # Kombiniere direkte Chunk-IDs aus Hits mit zusätzlich geladenen all_chunk_ids = list(set(seed_chunk_ids + additional_chunk_ids)) # WP-24c v4.5.2: Erweiterte Edge-Retrieval mit Chunk-Scope und Section-Filtering # Verwende all_seed_ids (enthält sowohl note_id als auch chunk_id) # und all_chunk_ids für explizite Chunk-Scope Edge-Suche subgraph = ga.expand( client, prefix, all_seed_ids, depth=depth, edge_types=expand_cfg.get("edge_types"), chunk_ids=all_chunk_ids, target_section=target_section ) # WP-24c v4.5.2: Debug-Logging für Chunk-Awareness logger.debug(f"🔍 [SEEDS] Note-IDs: {len(seed_note_ids)}, Chunk-IDs: {len(seed_chunk_ids)}, Total Seeds: {len(all_seed_ids)}") logger.debug(f" -> Zusätzliche Chunk-IDs geladen: {len(additional_chunk_ids)}, Total Chunk-IDs: {len(all_chunk_ids)}") # --- WP-24c v4.1.0: Chunk-Level Edge-Aggregation & Deduplizierung --- # Verhindert Score-Explosion durch multiple Links auf versch. Abschnitte. # Logik: 1. Kante zählt voll, weitere dämpfen auf Faktor 0.1. # Erweitert um Chunk-Level Tracking für präzise In-Degree-Berechnung. if subgraph and hasattr(subgraph, "adj"): # WP-24c v4.1.0: Chunk-Level In-Degree Tracking chunk_level_in_degree = defaultdict(int) # target -> count of chunk sources for src, edge_list in subgraph.adj.items(): # Gruppiere Kanten nach Ziel-Note (Deduplizierung ID_A -> ID_B) by_target = defaultdict(list) for e in edge_list: by_target[e["target"]].append(e) # WP-24c v4.1.0: Chunk-Level In-Degree Tracking # Wenn source eine Chunk-ID ist, zähle für Chunk-Level In-Degree if e.get("chunk_id") or (src and ("#" in src or src.startswith("chunk:"))): chunk_level_in_degree[e["target"]] += 1 aggregated_list = [] for tgt, edges in by_target.items(): if len(edges) > 1: # Sortiere: Stärkste Kante zuerst (Authority-Priorisierung) sorted_edges = sorted( edges, key=lambda x: ( x.get("weight", 0.0) * (1.0 if not x.get("virtual", False) else 0.5) * # Virtual-Penalty float(x.get("confidence", 1.0)) # Confidence-Boost ), reverse=True ) primary = sorted_edges[0] # Aggregiertes Gewicht berechnen (Sättigungs-Logik) total_w = primary.get("weight", 0.0) chunk_count = 0 for secondary in sorted_edges[1:]: total_w += secondary.get("weight", 0.0) * 0.1 if secondary.get("chunk_id") or (secondary.get("source") and ("#" in secondary.get("source", "") or secondary.get("source", "").startswith("chunk:"))): chunk_count += 1 primary["weight"] = total_w primary["is_super_edge"] = True # Flag für Explanation Layer primary["edge_count"] = len(edges) primary["chunk_source_count"] = chunk_count + (1 if (primary.get("chunk_id") or (primary.get("source") and ("#" in primary.get("source", "") or primary.get("source", "").startswith("chunk:")))) else 0) aggregated_list.append(primary) else: edge = edges[0] # WP-24c v4.1.0: Chunk-Count auch für einzelne Edges if edge.get("chunk_id") or (edge.get("source") and ("#" in edge.get("source", "") or edge.get("source", "").startswith("chunk:"))): edge["chunk_source_count"] = 1 aggregated_list.append(edge) # In-Place Update der Adjazenzliste des Graphen subgraph.adj[src] = aggregated_list # Re-Sync der In-Degrees für Centrality-Bonus (Aggregation konsistent halten) subgraph.in_degree = defaultdict(int) for src, edges in subgraph.adj.items(): for e in edges: subgraph.in_degree[e["target"]] += 1 # WP-24c v4.1.0: Chunk-Level In-Degree als Attribut speichern subgraph.chunk_level_in_degree = chunk_level_in_degree # --- WP-24c v4.1.0: Authority-Priorisierung (Provenance & Confidence) --- if subgraph and hasattr(subgraph, "adj"): for src, edges in subgraph.adj.items(): for e in edges: # A. Provenance Weighting (nutzt PROVENANCE_PRIORITY aus graph_utils) prov = e.get("provenance", "rule") prov_key = f"{prov}:{e.get('kind', 'related_to')}" if ":" not in prov else prov prov_w = PROVENANCE_PRIORITY.get(prov_key, PROVENANCE_PRIORITY.get(prov, 0.7)) # B. Confidence-Weighting (aus Edge-Payload) confidence = float(e.get("confidence", 1.0)) # C. Virtual-Flag De-Priorisierung is_virtual = e.get("virtual", False) virtual_penalty = 0.5 if is_virtual else 1.0 # D. Intent Boost Multiplikator kind = e.get("kind") intent_multiplier = boost_edges.get(kind, 1.0) # Gewichtung anpassen (Authority-Priorisierung) e["weight"] = e.get("weight", 1.0) * prov_w * confidence * virtual_penalty * intent_multiplier except Exception as e: logger.error(f"Graph Expansion failed: {e}") subgraph = None # 3. Scoring & Explanation Generierung # top_k wird erst hier final angewandt # WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Vor finaler Hit-Erstellung if subgraph: # WP-24c v4.5.1: Subgraph hat kein .edges Attribut, sondern .adj (Adjazenzliste) # Zähle alle Kanten aus der Adjazenzliste edge_count = sum(len(edges) for edges in subgraph.adj.values()) if hasattr(subgraph, 'adj') else 0 logger.debug(f"📊 [GRAPH] Subgraph enthält {edge_count} Kanten") else: logger.debug(f"📊 [GRAPH] Kein Subgraph (depth=0 oder keine Seed-IDs)") result = _build_hits_from_semantic(hits, top_k, "hybrid", subgraph, req.explain, boost_edges) # WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Nach finaler Hit-Erstellung if not result.results: logger.warning(f"⚠️ [EMPTY] Hybride Suche lieferte nach Scoring 0 finale Ergebnisse") else: logger.info(f"✨ [SUCCESS] Hybride Suche lieferte {len(result.results)} finale Treffer (Mode: {result.used_mode})") return result def semantic_retrieve(req: QueryRequest) -> QueryResponse: """Standard Vektorsuche ohne Graph-Einfluss.""" client, prefix = _get_client_and_prefix() vector = _get_query_vector(req) hits = _semantic_hits(client, prefix, vector, req.top_k or 10, req.filters) return _build_hits_from_semantic(hits, req.top_k or 10, "semantic", explain=req.explain) class Retriever: """Schnittstelle für die asynchrone Suche.""" async def search(self, request: QueryRequest) -> QueryResponse: return hybrid_retrieve(request)