mindnet/app/core/retriever.py
2025-12-18 16:14:03 +01:00

435 lines
16 KiB
Python

"""
FILE: app/core/retriever.py
DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability).
WP-22 Update: Dynamic Edge Boosting, Lifecycle Scoring & Provenance Awareness.
VERSION: 0.6.7 (WP-22 Scoring & Provenance Fix)
STATUS: Active
DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter
LAST_ANALYSIS: 2025-12-18
"""
from __future__ import annotations
import os
import time
import logging
from functools import lru_cache
from typing import Any, Dict, List, Tuple, Iterable, Optional
from app.config import get_settings
from app.models.dto import (
QueryRequest,
QueryResponse,
QueryHit,
Explanation,
ScoreBreakdown,
Reason,
EdgeDTO
)
import app.core.qdrant as qdr
import app.core.qdrant_points as qp
import app.services.embeddings_client as ec
import app.core.graph_adapter as ga
try:
import yaml # type: ignore[import]
except Exception: # pragma: no cover
yaml = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
@lru_cache
def _get_scoring_weights() -> Tuple[float, float, float]:
"""Liefert (semantic_weight, edge_weight, centrality_weight) für den Retriever."""
settings = get_settings()
sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0))
edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0))
cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0))
config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml")
if yaml is None:
return sem, edge, cent
try:
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
scoring = data.get("scoring", {}) or {}
sem = float(scoring.get("semantic_weight", sem))
edge = float(scoring.get("edge_weight", edge))
cent = float(scoring.get("centrality_weight", cent))
except Exception:
return sem, edge, cent
return sem, edge, cent
def _get_client_and_prefix() -> Tuple[Any, str]:
"""Liefert (QdrantClient, prefix)."""
cfg = qdr.QdrantConfig.from_env()
client = qdr.get_client(cfg)
return client, cfg.prefix
def _get_query_vector(req: QueryRequest) -> List[float]:
"""Liefert den Query-Vektor aus dem Request."""
if req.query_vector:
return list(req.query_vector)
if not req.query:
raise ValueError("QueryRequest benötigt entweder query oder query_vector")
settings = get_settings()
model_name = settings.MODEL_NAME
try:
return ec.embed_text(req.query, model_name=model_name)
except TypeError:
return ec.embed_text(req.query)
def _semantic_hits(
client: Any,
prefix: str,
vector: List[float],
top_k: int,
filters: Dict[str, Any] | None = None,
) -> List[Tuple[str, float, Dict[str, Any]]]:
"""Führt eine semantische Suche aus."""
flt = filters or None
raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt)
results: List[Tuple[str, float, Dict[str, Any]]] = []
for pid, score, payload in raw_hits:
results.append((str(pid), float(score), dict(payload or {})))
return results
# --- WP-22 Helper: Lifecycle Multipliers (Teil A) ---
def _get_status_multiplier(payload: Dict[str, Any]) -> float:
"""
WP-22: stable (1.2), active/default (1.0), draft (0.5).
"""
status = str(payload.get("status", "active")).lower()
if status == "stable": return 1.2
if status == "draft": return 0.5
return 1.0
# --- WP-22: Dynamic Scoring Formula (Teil C) ---
def _compute_total_score(
semantic_score: float,
payload: Dict[str, Any],
edge_bonus_raw: float = 0.0,
cent_bonus_raw: float = 0.0,
dynamic_edge_boosts: Dict[str, float] = None
) -> Tuple[float, float, float]:
"""
WP-22 Mathematische Logik:
Score = BaseScore * (1 + ConfigWeight + DynamicBoost)
Hierbei gilt:
- BaseScore: semantic_similarity * status_multiplier
- ConfigWeight: retriever_weight (Type Boost) - 1.0
- DynamicBoost: (edge_weight * edge_bonus) + (centrality_weight * centrality_bonus)
"""
# 1. Base Score (Semantik * Lifecycle)
status_mult = _get_status_multiplier(payload)
base_score = float(semantic_score) * status_mult
# 2. Config Weight (Static Type Boost)
# Ein neutrales retriever_weight von 1.0 ergibt 0.0 Einfluss.
config_weight = float(payload.get("retriever_weight", 1.0)) - 1.0
# 3. Dynamic Boost (Graph-Signale)
_sem_w, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
# Multiplikator für Intent-Boosting (Teil C)
graph_boost_factor = 1.5 if dynamic_edge_boosts and (edge_bonus_raw > 0 or cent_bonus_raw > 0) else 1.0
edge_impact = (edge_w_cfg * edge_bonus_raw) * graph_boost_factor
cent_impact = (cent_w_cfg * cent_bonus_raw) * graph_boost_factor
dynamic_boost = edge_impact + cent_impact
total = base_score * (1.0 + config_weight + dynamic_boost)
# Debug Logging für Berechnungs-Validierung
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Scoring Node {payload.get('note_id')}: Base={base_score:.3f}, ConfigW={config_weight:.3f}, GraphB={dynamic_boost:.3f} -> Total={total:.3f}")
return float(total), float(edge_bonus_raw), float(cent_bonus_raw)
# --- WP-04b Explanation Logic ---
def _build_explanation(
semantic_score: float,
payload: Dict[str, Any],
edge_bonus: float,
cent_bonus: float,
subgraph: Optional[ga.Subgraph],
target_note_id: Optional[str]
) -> Explanation:
"""Erstellt ein Explanation-Objekt mit Provenance-Details."""
_, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
type_weight = float(payload.get("retriever_weight", 1.0))
status_mult = _get_status_multiplier(payload)
note_type = payload.get("type", "unknown")
# Breakdown für Explanation (Reflektiert die WP-22 Formel exakt)
base_val = semantic_score * status_mult
config_w_impact = type_weight - 1.0
# Zentrale Berechnung der Kontributionen für den Breakdown
breakdown = ScoreBreakdown(
semantic_contribution=base_val,
edge_contribution=base_val * (edge_w_cfg * edge_bonus),
centrality_contribution=base_val * (cent_w_cfg * cent_bonus),
raw_semantic=semantic_score,
raw_edge_bonus=edge_bonus,
raw_centrality=cent_bonus,
node_weight=type_weight
)
reasons: List[Reason] = []
edges_dto: List[EdgeDTO] = []
# 1. Semantische Gründe
if semantic_score > 0.85:
reasons.append(Reason(kind="semantic", message="Sehr hohe textuelle Übereinstimmung.", score_impact=breakdown.semantic_contribution))
elif semantic_score > 0.70:
reasons.append(Reason(kind="semantic", message="Gute textuelle Übereinstimmung.", score_impact=breakdown.semantic_contribution))
# 2. Typ-Gründe
if type_weight != 1.0:
msg = "Bevorzugt" if type_weight > 1.0 else "Leicht abgewertet"
reasons.append(Reason(kind="type", message=f"{msg} aufgrund des Typs '{note_type}'.", score_impact=base_val * config_w_impact))
# 3. Lifecycle-Gründe
if status_mult != 1.0:
msg = "Status-Bonus" if status_mult > 1.0 else "Status-Malus"
reasons.append(Reason(kind="lifecycle", message=f"{msg} (Notiz ist '{payload.get('status', 'unknown')}').", score_impact=0.0))
# 4. Graph-Gründe (Edges) - FIX: Beachtet eingehende UND ausgehende Kanten
if subgraph and target_note_id and edge_bonus > 0:
# Sammle alle relevanten Kanten (Incoming + Outgoing)
edges_raw = []
if hasattr(subgraph, "get_incoming_edges"):
edges_raw.extend(subgraph.get_incoming_edges(target_note_id) or [])
if hasattr(subgraph, "get_outgoing_edges"):
edges_raw.extend(subgraph.get_outgoing_edges(target_note_id) or [])
for edge in edges_raw:
src = edge.get("source", target_note_id)
tgt = edge.get("target", target_note_id)
k = edge.get("kind", "edge")
prov = edge.get("provenance", "rule")
conf = float(edge.get("confidence", 1.0))
# Richtung bestimmen
direction = "in" if tgt == target_note_id else "out"
peer_id = src if direction == "in" else tgt
edges_dto.append(EdgeDTO(
id=f"{src}->{tgt}:{k}", kind=k, source=src, target=tgt,
weight=conf, direction=direction, provenance=prov, confidence=conf
))
# Die 3 stärksten Kanten als Begründung auflisten
all_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True)
for top_e in all_edges[:3]:
prov_txt = "Bestätigte" if top_e.provenance == "explicit" else "Vermutete (KI)"
dir_txt = "Referenz von" if top_e.direction == "in" else "Verweis auf"
reasons.append(Reason(
kind="edge",
message=f"{prov_txt} Kante '{top_e.kind}': {dir_txt} '{top_e.peer_id if hasattr(top_e, 'peer_id') else (top_e.source if top_e.direction=='in' else top_e.target)}'.",
score_impact=edge_w_cfg * top_e.confidence,
details={"provenance": top_e.provenance, "kind": top_e.kind}
))
# 5. Zentralitäts-Gründe
if cent_bonus > 0.01:
reasons.append(Reason(kind="centrality", message="Knoten liegt zentral im aktuellen Kontext-Graphen.", score_impact=breakdown.centrality_contribution))
return Explanation(breakdown=breakdown, reasons=reasons, related_edges=edges_dto if edges_dto else None)
def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]:
"""Extrahiert depth und edge_types für die Expansion."""
expand = getattr(req, "expand", None)
if not expand:
return 0, None
depth = 1
edge_types: List[str] | None = None
if hasattr(expand, "depth") or hasattr(expand, "edge_types"):
depth = int(getattr(expand, "depth", 1) or 1)
types_val = getattr(expand, "edge_types", None)
if types_val:
edge_types = list(types_val)
return depth, edge_types
if isinstance(expand, dict):
if "depth" in expand:
depth = int(expand.get("depth") or 1)
if "edge_types" in expand and expand["edge_types"] is not None:
edge_types = list(expand["edge_types"])
return depth, edge_types
return 0, None
def _build_hits_from_semantic(
hits: Iterable[Tuple[str, float, Dict[str, Any]]],
top_k: int,
used_mode: str,
subgraph: ga.Subgraph | None = None,
explain: bool = False,
dynamic_edge_boosts: Dict[str, float] = None
) -> QueryResponse:
"""Baut strukturierte QueryHits basierend auf Hybrid-Scoring."""
t0 = time.time()
enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = []
for pid, semantic_score, payload in hits:
edge_bonus = 0.0
cent_bonus = 0.0
# WICHTIG für WP-22: Graph-Abfragen IMMER über die Note-ID, nicht Chunk-ID
target_note_id = payload.get("note_id")
if subgraph is not None and target_note_id:
try:
# edge_bonus nutzt intern bereits die confidence-gewichteten Pfade
edge_bonus = float(subgraph.edge_bonus(target_note_id))
except Exception:
edge_bonus = 0.0
try:
cent_bonus = float(subgraph.centrality_bonus(target_note_id))
except Exception:
cent_bonus = 0.0
total, eb, cb = _compute_total_score(
semantic_score,
payload,
edge_bonus_raw=edge_bonus,
cent_bonus_raw=cent_bonus,
dynamic_edge_boosts=dynamic_edge_boosts
)
enriched.append((pid, float(semantic_score), payload, total, eb, cb))
# Sortierung nach finalem Score
enriched_sorted = sorted(enriched, key=lambda h: h[3], reverse=True)
limited = enriched_sorted[: max(1, top_k)]
results: List[QueryHit] = []
for pid, semantic_score, payload, total, eb, cb in limited:
explanation_obj = None
if explain:
explanation_obj = _build_explanation(
semantic_score=float(semantic_score),
payload=payload,
edge_bonus=eb,
cent_bonus=cb,
subgraph=subgraph,
target_note_id=payload.get("note_id")
)
text_content = payload.get("page_content") or payload.get("text") or payload.get("content")
results.append(QueryHit(
node_id=str(pid),
note_id=payload.get("note_id", "unknown"),
semantic_score=float(semantic_score),
edge_bonus=eb,
centrality_bonus=cb,
total_score=total,
paths=None,
source={
"path": payload.get("path"),
"section": payload.get("section") or payload.get("section_title"),
"text": text_content
},
payload=payload,
explanation=explanation_obj
))
dt = int((time.time() - t0) * 1000)
return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt)
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""Reiner semantischer Retriever (WP-02)."""
client, prefix = _get_client_and_prefix()
vector = _get_query_vector(req)
top_k = req.top_k or get_settings().RETRIEVER_TOP_K
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None, explain=req.explain)
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
"""Hybrid-Retriever: semantische Suche + optionale Edge-Expansion (WP-04a)."""
client, prefix = _get_client_and_prefix()
if req.query_vector:
vector = list(req.query_vector)
else:
vector = _get_query_vector(req)
top_k = req.top_k or get_settings().RETRIEVER_TOP_K
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
depth, edge_types = _extract_expand_options(req)
# WP-22: Dynamic Boosts aus dem Request (vom Router)
boost_edges = getattr(req, "boost_edges", {})
subgraph: ga.Subgraph | None = None
if depth and depth > 0:
seed_ids: List[str] = []
for _pid, _score, payload in hits:
key = payload.get("note_id")
if key and key not in seed_ids:
seed_ids.append(key)
if seed_ids:
try:
# Subgraph laden
subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=edge_types)
# --- WP-22: Kanten-Boosts & Provenance-Weighting im RAM-Graphen ---
if subgraph and hasattr(subgraph, "graph"):
for u, v, data in subgraph.graph.edges(data=True):
# 1. Herkunfts-Basisgewichtung (Concept 2.6)
prov = data.get("provenance", "rule")
prov_weight = 1.0
if prov == "smart": prov_weight = 0.9
elif prov == "rule": prov_weight = 0.7
# 2. Intent-basierter Multiplikator (Teil C)
k = data.get("kind")
intent_boost = 1.0
if boost_edges and k in boost_edges:
intent_boost = boost_edges[k]
# Finales Gewicht im Graphen setzen
data["weight"] = data.get("weight", 1.0) * prov_weight * intent_boost
except Exception:
subgraph = None
return _build_hits_from_semantic(
hits,
top_k=top_k,
used_mode="hybrid",
subgraph=subgraph,
explain=req.explain,
dynamic_edge_boosts=boost_edges
)
class Retriever:
"""
Wrapper-Klasse für Suchoperationen.
"""
def __init__(self):
pass
async def search(self, request: QueryRequest) -> QueryResponse:
return hybrid_retrieve(request)