""" FILE: app/core/retriever.py DESCRIPTION: Haupt-Schnittstelle für die Suche. Orchestriert Vektorsuche und Graph-Expansion. Nutzt retriever_scoring.py für die WP-22 Logik. FIX: TypeError in embed_text (model_name) behoben. FIX: Pydantic ValidationError (Target/Source) behoben. VERSION: 0.6.15 (WP-22 Full & Stable) STATUS: Active DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.core.graph_adapter, app.core.retriever_scoring """ from __future__ import annotations import os import time import logging from typing import Any, Dict, List, Tuple, Iterable, Optional from app.config import get_settings from app.models.dto import ( QueryRequest, QueryResponse, QueryHit, Explanation, ScoreBreakdown, Reason, EdgeDTO ) import app.core.qdrant as qdr import app.core.qdrant_points as qp import app.services.embeddings_client as ec import app.core.graph_adapter as ga # Mathematische Engine importieren from app.core.retriever_scoring import get_weights, compute_wp22_score logger = logging.getLogger(__name__) # ============================================================================== # 1. CORE HELPERS & CONFIG LOADERS # ============================================================================== def _get_client_and_prefix() -> Tuple[Any, str]: """Initialisiert Qdrant Client und lädt Collection-Prefix.""" cfg = qdr.QdrantConfig.from_env() return qdr.get_client(cfg), cfg.prefix def _get_query_vector(req: QueryRequest) -> List[float]: """ Vektorisiert die Anfrage. FIX: Enthält try-except Block für unterschiedliche Signaturen von ec.embed_text. """ if req.query_vector: return list(req.query_vector) if not req.query: raise ValueError("Kein Text oder Vektor für die Suche angegeben.") settings = get_settings() try: # Versuch mit modernem Interface (WP-03 kompatibel) return ec.embed_text(req.query, model_name=settings.MODEL_NAME) except TypeError: # Fallback für Signaturen, die 'model_name' nicht als Keyword akzeptieren logger.debug("ec.embed_text does not accept 'model_name' keyword. Falling back.") return ec.embed_text(req.query) def _semantic_hits( client: Any, prefix: str, vector: List[float], top_k: int, filters: Optional[Dict] = None ) -> List[Tuple[str, float, Dict[str, Any]]]: """Führt die Vektorsuche durch und konvertiert Qdrant-Points in ein einheitliches Format.""" raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=filters) # Strikte Typkonvertierung für Stabilität return [(str(hit[0]), float(hit[1]), dict(hit[2] or {})) for hit in raw_hits] # ============================================================================== # 2. EXPLANATION LAYER (DEBUG & VERIFIABILITY) # ============================================================================== def _build_explanation( semantic_score: float, payload: Dict[str, Any], scoring_debug: Dict[str, Any], subgraph: Optional[ga.Subgraph], target_note_id: Optional[str], applied_boosts: Optional[Dict[str, float]] = None ) -> Explanation: """ Transformiert mathematische Scores und Graph-Signale in eine menschenlesbare Erklärung. Behebt Pydantic ValidationErrors durch explizite String-Sicherung. """ _, edge_w_cfg, _ = get_weights() base_val = scoring_debug["base_val"] # 1. Detaillierter mathematischer Breakdown breakdown = ScoreBreakdown( semantic_contribution=base_val, edge_contribution=base_val * scoring_debug["edge_impact_final"], centrality_contribution=base_val * scoring_debug["cent_impact_final"], raw_semantic=semantic_score, raw_edge_bonus=scoring_debug["edge_bonus"], raw_centrality=scoring_debug["cent_bonus"], node_weight=float(payload.get("retriever_weight", 1.0)), status_multiplier=scoring_debug["status_multiplier"], graph_boost_factor=scoring_debug["graph_boost_factor"] ) reasons: List[Reason] = [] edges_dto: List[EdgeDTO] = [] # 2. Gründe für Semantik hinzufügen if semantic_score > 0.85: reasons.append(Reason(kind="semantic", message="Sehr hohe textuelle Übereinstimmung.", score_impact=base_val)) elif semantic_score > 0.70: reasons.append(Reason(kind="semantic", message="Inhaltliche Übereinstimmung.", score_impact=base_val)) # 3. Gründe für Typ und Lifecycle type_weight = float(payload.get("retriever_weight", 1.0)) if type_weight != 1.0: msg = "Bevorzugt" if type_weight > 1.0 else "De-priorisiert" reasons.append(Reason(kind="type", message=f"{msg} durch Typ-Profil.", score_impact=base_val * (type_weight - 1.0))) # 4. Kanten-Verarbeitung (Graph-Intelligence) if subgraph and target_note_id and scoring_debug["edge_bonus"] > 0: raw_edges = [] if hasattr(subgraph, "get_incoming_edges"): raw_edges.extend(subgraph.get_incoming_edges(target_note_id) or []) if hasattr(subgraph, "get_outgoing_edges"): raw_edges.extend(subgraph.get_outgoing_edges(target_note_id) or []) for edge in raw_edges: # FIX: Zwingende String-Konvertierung für Pydantic-Stabilität src = str(edge.get("source") or "note_root") tgt = str(edge.get("target") or target_note_id or "unknown_target") kind = str(edge.get("kind", "related_to")) prov = str(edge.get("provenance", "rule")) conf = float(edge.get("confidence", 1.0)) direction = "in" if tgt == target_note_id else "out" edge_obj = EdgeDTO( id=f"{src}->{tgt}:{kind}", kind=kind, source=src, target=tgt, weight=conf, direction=direction, provenance=prov, confidence=conf ) edges_dto.append(edge_obj) # Die 3 wichtigsten Kanten als Begründung formulieren top_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True) for e in top_edges[:3]: peer = e.source if e.direction == "in" else e.target prov_txt = "Bestätigte" if e.provenance == "explicit" else "KI-basierte" boost_txt = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else "" reasons.append(Reason( kind="edge", message=f"{prov_txt} Kante '{e.kind}'{boost_txt} von/zu '{peer}'.", score_impact=edge_w_cfg * e.confidence )) if scoring_debug["cent_bonus"] > 0.01: reasons.append(Reason(kind="centrality", message="Die Notiz ist ein zentraler Informations-Hub.", score_impact=breakdown.centrality_contribution)) return Explanation( breakdown=breakdown, reasons=reasons, related_edges=edges_dto if edges_dto else None, applied_boosts=applied_boosts ) # ============================================================================== # 3. CORE RETRIEVAL PIPELINE # ============================================================================== def _build_hits_from_semantic( hits: Iterable[Tuple[str, float, Dict[str, Any]]], top_k: int, used_mode: str, subgraph: ga.Subgraph | None = None, explain: bool = False, dynamic_edge_boosts: Dict[str, float] = None ) -> QueryResponse: """Wandelt semantische Roh-Treffer in hochgeladene, bewertete QueryHits um.""" t0 = time.time() enriched = [] for pid, semantic_score, payload in hits: edge_bonus, cent_bonus = 0.0, 0.0 target_id = payload.get("note_id") if subgraph and target_id: try: edge_bonus = float(subgraph.edge_bonus(target_id)) cent_bonus = float(subgraph.centrality_bonus(target_id)) except Exception: pass # Mathematisches Scoring via WP-22 Engine debug_data = compute_wp22_score( semantic_score, payload, edge_bonus, cent_bonus, dynamic_edge_boosts ) enriched.append((pid, semantic_score, payload, debug_data)) # Sortierung nach finalem mathematischen Score enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True) limited_hits = enriched_sorted[: max(1, top_k)] results: List[QueryHit] = [] for pid, s_score, pl, dbg in limited_hits: explanation_obj = None if explain: explanation_obj = _build_explanation( semantic_score=float(s_score), payload=pl, scoring_debug=dbg, subgraph=subgraph, target_note_id=pl.get("note_id"), applied_boosts=dynamic_edge_boosts ) # Payload Text-Feld normalisieren text_content = pl.get("page_content") or pl.get("text") or pl.get("content", "[Kein Text]") results.append(QueryHit( node_id=str(pid), note_id=str(pl.get("note_id", "unknown")), semantic_score=float(s_score), edge_bonus=dbg["edge_bonus"], centrality_bonus=dbg["cent_bonus"], total_score=dbg["total"], source={ "path": pl.get("path"), "section": pl.get("section") or pl.get("section_title"), "text": text_content }, payload=pl, explanation=explanation_obj )) return QueryResponse(results=results, used_mode=used_mode, latency_ms=int((time.time() - t0) * 1000)) def hybrid_retrieve(req: QueryRequest) -> QueryResponse: """ Die Haupt-Einstiegsfunktion für die hybride Suche. Kombiniert Vektorsuche mit Graph-Expansion, Provenance-Weighting und Intent-Boosting. """ client, prefix = _get_client_and_prefix() vector = list(req.query_vector) if req.query_vector else _get_query_vector(req) top_k = req.top_k or 10 # 1. Semantische Seed-Suche hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) # 2. Graph Expansion Konfiguration expand_cfg = req.expand if isinstance(req.expand, dict) else {} depth = int(expand_cfg.get("depth", 1)) boost_edges = getattr(req, "boost_edges", {}) or {} subgraph: ga.Subgraph | None = None if depth > 0 and hits: # Start-IDs für den Graph-Traversal sammeln seed_ids = list({h[2].get("note_id") for h in hits if h[2].get("note_id")}) if seed_ids: try: # Subgraph aus RAM/DB laden subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=expand_cfg.get("edge_types")) # --- WP-22: Kanten-Gewichtung im RAM-Graphen vor Bonus-Berechnung --- if subgraph and hasattr(subgraph, "graph"): for _, _, data in subgraph.graph.edges(data=True): # A. Provenance Weighting (WP-22 Bonus für Herkunft) prov = data.get("provenance", "rule") # Belohnung: Explizite Links (1.0) > Smart (0.9) > Rule (0.7) prov_w = 1.0 if prov == "explicit" else (0.9 if prov == "smart" else 0.7) # B. Intent Boost Multiplikator (Vom Router dynamisch injiziert) kind = data.get("kind") intent_multiplier = boost_edges.get(kind, 1.0) # Finales Gewicht setzen (Basis * Provenance * Intent) data["weight"] = data.get("weight", 1.0) * prov_w * intent_multiplier except Exception as e: logger.error(f"Graph Expansion failed: {e}") subgraph = None # 3. Scoring & Explanation Generierung return _build_hits_from_semantic(hits, top_k, "hybrid", subgraph, req.explain, boost_edges) def semantic_retrieve(req: QueryRequest) -> QueryResponse: """Standard Vektorsuche ohne Graph-Einfluss (WP-02 Fallback).""" client, prefix = _get_client_and_prefix() vector = _get_query_vector(req) hits = _semantic_hits(client, prefix, vector, req.top_k or 10, req.filters) return _build_hits_from_semantic(hits, req.top_k or 10, "semantic", explain=req.explain) class Retriever: """Schnittstelle für die asynchrone Suche.""" async def search(self, request: QueryRequest) -> QueryResponse: """Führt eine hybride Suche aus.""" return hybrid_retrieve(request)