397 lines
14 KiB
Python
397 lines
14 KiB
Python
"""
|
|
FILE: app/core/retriever.py
|
|
DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability).
|
|
WP-22 Update: Dynamic Edge Boosting & Lifecycle Scoring.
|
|
VERSION: 0.6.5 (WP-22 Scoring Formula)
|
|
STATUS: Active
|
|
DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter
|
|
LAST_ANALYSIS: 2025-12-18
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
from functools import lru_cache
|
|
from typing import Any, Dict, List, Tuple, Iterable, Optional
|
|
|
|
from app.config import get_settings
|
|
from app.models.dto import (
|
|
QueryRequest,
|
|
QueryResponse,
|
|
QueryHit,
|
|
Explanation,
|
|
ScoreBreakdown,
|
|
Reason,
|
|
EdgeDTO
|
|
)
|
|
import app.core.qdrant as qdr
|
|
import app.core.qdrant_points as qp
|
|
import app.services.embeddings_client as ec
|
|
import app.core.graph_adapter as ga
|
|
|
|
try:
|
|
import yaml # type: ignore[import]
|
|
except Exception: # pragma: no cover
|
|
yaml = None # type: ignore[assignment]
|
|
|
|
|
|
@lru_cache
|
|
def _get_scoring_weights() -> Tuple[float, float, float]:
|
|
"""Liefert (semantic_weight, edge_weight, centrality_weight) für den Retriever."""
|
|
settings = get_settings()
|
|
sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0))
|
|
edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0))
|
|
cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0))
|
|
|
|
config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml")
|
|
if yaml is None:
|
|
return sem, edge, cent
|
|
try:
|
|
if os.path.exists(config_path):
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f) or {}
|
|
scoring = data.get("scoring", {}) or {}
|
|
sem = float(scoring.get("semantic_weight", sem))
|
|
edge = float(scoring.get("edge_weight", edge))
|
|
cent = float(scoring.get("centrality_weight", cent))
|
|
except Exception:
|
|
return sem, edge, cent
|
|
return sem, edge, cent
|
|
|
|
|
|
def _get_client_and_prefix() -> Tuple[Any, str]:
|
|
"""Liefert (QdrantClient, prefix)."""
|
|
cfg = qdr.QdrantConfig.from_env()
|
|
client = qdr.get_client(cfg)
|
|
return client, cfg.prefix
|
|
|
|
|
|
def _get_query_vector(req: QueryRequest) -> List[float]:
|
|
"""Liefert den Query-Vektor aus dem Request."""
|
|
if req.query_vector:
|
|
return list(req.query_vector)
|
|
|
|
if not req.query:
|
|
raise ValueError("QueryRequest benötigt entweder query oder query_vector")
|
|
|
|
settings = get_settings()
|
|
model_name = settings.MODEL_NAME
|
|
|
|
try:
|
|
return ec.embed_text(req.query, model_name=model_name)
|
|
except TypeError:
|
|
return ec.embed_text(req.query)
|
|
|
|
|
|
def _semantic_hits(
|
|
client: Any,
|
|
prefix: str,
|
|
vector: List[float],
|
|
top_k: int,
|
|
filters: Dict[str, Any] | None = None,
|
|
) -> List[Tuple[str, float, Dict[str, Any]]]:
|
|
"""Führt eine semantische Suche aus."""
|
|
flt = filters or None
|
|
raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt)
|
|
results: List[Tuple[str, float, Dict[str, Any]]] = []
|
|
for pid, score, payload in raw_hits:
|
|
results.append((str(pid), float(score), dict(payload or {})))
|
|
return results
|
|
|
|
# --- WP-22 Helper: Lifecycle Multipliers (Teil A) ---
|
|
def _get_status_multiplier(payload: Dict[str, Any]) -> float:
|
|
"""
|
|
WP-22: stable (1.2), active/default (1.0), draft (0.5).
|
|
"""
|
|
status = str(payload.get("status", "active")).lower()
|
|
if status == "stable": return 1.2
|
|
if status == "draft": return 0.5
|
|
return 1.0
|
|
|
|
# --- WP-22: Dynamic Scoring Formula (Teil C) ---
|
|
def _compute_total_score(
|
|
semantic_score: float,
|
|
payload: Dict[str, Any],
|
|
edge_bonus_raw: float = 0.0,
|
|
cent_bonus_raw: float = 0.0,
|
|
dynamic_edge_boosts: Dict[str, float] = None
|
|
) -> Tuple[float, float, float]:
|
|
"""
|
|
WP-22 Mathematische Logik:
|
|
Score = BaseScore * (1 + ConfigWeight + DynamicBoost)
|
|
|
|
Hierbei gilt:
|
|
- BaseScore: semantic_similarity * status_multiplier
|
|
- ConfigWeight: retriever_weight (Type Boost)
|
|
- DynamicBoost: (edge_weight * edge_bonus) + (centrality_weight * centrality_bonus)
|
|
"""
|
|
|
|
# 1. Base Score (Semantik * Lifecycle)
|
|
status_mult = _get_status_multiplier(payload)
|
|
base_score = float(semantic_score) * status_mult
|
|
|
|
# 2. Config Weight (Static Type Boost)
|
|
config_weight = float(payload.get("retriever_weight", 1.0)) - 1.0 # 1.0 ist neutral
|
|
|
|
# 3. Dynamic Boost (Graph-Signale)
|
|
_sem_w, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
|
|
dynamic_boost = (edge_w_cfg * edge_bonus_raw) + (cent_w_cfg * cent_bonus_raw)
|
|
|
|
# Falls Intent-Boosts vorliegen, verstärken wir den Dynamic Boost
|
|
if dynamic_edge_boosts and (edge_bonus_raw > 0 or cent_bonus_raw > 0):
|
|
dynamic_boost *= 1.5
|
|
|
|
total = base_score * (1.0 + config_weight + dynamic_boost)
|
|
return float(total), float(edge_bonus_raw), float(cent_bonus_raw)
|
|
|
|
|
|
# --- WP-04b Explanation Logic ---
|
|
|
|
def _build_explanation(
|
|
semantic_score: float,
|
|
payload: Dict[str, Any],
|
|
edge_bonus: float,
|
|
cent_bonus: float,
|
|
subgraph: Optional[ga.Subgraph],
|
|
node_key: Optional[str]
|
|
) -> Explanation:
|
|
"""Erstellt ein Explanation-Objekt (WP-04b)."""
|
|
_, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
|
|
|
|
type_weight = float(payload.get("retriever_weight", 1.0))
|
|
status_mult = _get_status_multiplier(payload)
|
|
note_type = payload.get("type", "unknown")
|
|
|
|
# Breakdown für Explanation (Muss die Scoring Formel spiegeln)
|
|
config_w_impact = type_weight - 1.0
|
|
dynamic_b_impact = (edge_w_cfg * edge_bonus) + (cent_w_cfg * cent_bonus)
|
|
base_val = semantic_score * status_mult
|
|
|
|
breakdown = ScoreBreakdown(
|
|
semantic_contribution=base_val,
|
|
edge_contribution=base_val * dynamic_b_impact,
|
|
centrality_contribution=0.0, # In dynamic_b_impact enthalten
|
|
raw_semantic=semantic_score,
|
|
raw_edge_bonus=edge_bonus,
|
|
raw_centrality=cent_bonus,
|
|
node_weight=type_weight
|
|
)
|
|
|
|
reasons: List[Reason] = []
|
|
edges_dto: List[EdgeDTO] = []
|
|
|
|
if semantic_score > 0.85:
|
|
reasons.append(Reason(kind="semantic", message="Sehr hohe textuelle Übereinstimmung.", score_impact=breakdown.semantic_contribution))
|
|
elif semantic_score > 0.70:
|
|
reasons.append(Reason(kind="semantic", message="Gute textuelle Übereinstimmung.", score_impact=breakdown.semantic_contribution))
|
|
|
|
if type_weight != 1.0:
|
|
msg = "Bevorzugt" if type_weight > 1.0 else "Leicht abgewertet"
|
|
reasons.append(Reason(kind="type", message=f"{msg} aufgrund des Typs '{note_type}'.", score_impact=base_val * config_w_impact))
|
|
|
|
if status_mult != 1.0:
|
|
msg = "Status-Bonus" if status_mult > 1.0 else "Status-Malus"
|
|
reasons.append(Reason(kind="lifecycle", message=f"{msg} ({payload.get('status', 'unknown')}).", score_impact=0.0))
|
|
|
|
if subgraph and node_key and edge_bonus > 0:
|
|
if hasattr(subgraph, "get_outgoing_edges"):
|
|
outgoing = subgraph.get_outgoing_edges(node_key)
|
|
for edge in outgoing:
|
|
target = edge.get("target", "Unknown")
|
|
kind = edge.get("kind", "edge")
|
|
weight = edge.get("weight", 0.0)
|
|
if weight > 0.05:
|
|
edges_dto.append(EdgeDTO(id=f"{node_key}->{target}:{kind}", kind=kind, source=node_key, target=target, weight=weight, direction="out"))
|
|
|
|
if hasattr(subgraph, "get_incoming_edges"):
|
|
incoming = subgraph.get_incoming_edges(node_key)
|
|
for edge in incoming:
|
|
src = edge.get("source", "Unknown")
|
|
kind = edge.get("kind", "edge")
|
|
weight = edge.get("weight", 0.0)
|
|
if weight > 0.05:
|
|
edges_dto.append(EdgeDTO(id=f"{src}->{node_key}:{kind}", kind=kind, source=src, target=node_key, weight=weight, direction="in"))
|
|
|
|
all_edges = sorted(edges_dto, key=lambda e: e.weight, reverse=True)
|
|
for top_edge in all_edges[:3]:
|
|
impact = edge_w_cfg * top_edge.weight
|
|
dir_txt = "Verweist auf" if top_edge.direction == "out" else "Referenziert von"
|
|
tgt_txt = top_edge.target if top_edge.direction == "out" else top_edge.source
|
|
reasons.append(Reason(kind="edge", message=f"{dir_txt} '{tgt_txt}' via '{top_edge.kind}'", score_impact=impact, details={"kind": top_edge.kind}))
|
|
|
|
if cent_bonus > 0.01:
|
|
reasons.append(Reason(kind="centrality", message="Knoten liegt zentral im Kontext.", score_impact=cent_w_cfg * cent_bonus))
|
|
|
|
return Explanation(breakdown=breakdown, reasons=reasons, related_edges=edges_dto if edges_dto else None)
|
|
|
|
|
|
def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]:
|
|
"""Extrahiert depth und edge_types für die Expansion."""
|
|
expand = getattr(req, "expand", None)
|
|
if not expand:
|
|
return 0, None
|
|
|
|
depth = 1
|
|
edge_types: List[str] | None = None
|
|
|
|
if hasattr(expand, "depth") or hasattr(expand, "edge_types"):
|
|
depth = int(getattr(expand, "depth", 1) or 1)
|
|
types_val = getattr(expand, "edge_types", None)
|
|
if types_val:
|
|
edge_types = list(types_val)
|
|
return depth, edge_types
|
|
|
|
if isinstance(expand, dict):
|
|
if "depth" in expand:
|
|
depth = int(expand.get("depth") or 1)
|
|
if "edge_types" in expand and expand["edge_types"] is not None:
|
|
edge_types = list(expand["edge_types"])
|
|
return depth, edge_types
|
|
|
|
return 0, None
|
|
|
|
|
|
def _build_hits_from_semantic(
|
|
hits: Iterable[Tuple[str, float, Dict[str, Any]]],
|
|
top_k: int,
|
|
used_mode: str,
|
|
subgraph: ga.Subgraph | None = None,
|
|
explain: bool = False,
|
|
dynamic_edge_boosts: Dict[str, float] = None
|
|
) -> QueryResponse:
|
|
"""Baut strukturierte QueryHits basierend auf Hybrid-Scoring."""
|
|
t0 = time.time()
|
|
enriched: List[Tuple[str, float, Dict[str, Any], float, float, float]] = []
|
|
|
|
for pid, semantic_score, payload in hits:
|
|
edge_bonus = 0.0
|
|
cent_bonus = 0.0
|
|
node_key = payload.get("chunk_id") or payload.get("note_id")
|
|
|
|
if subgraph is not None and node_key:
|
|
try:
|
|
edge_bonus = float(subgraph.edge_bonus(node_key))
|
|
except Exception:
|
|
edge_bonus = 0.0
|
|
try:
|
|
cent_bonus = float(subgraph.centrality_bonus(node_key))
|
|
except Exception:
|
|
cent_bonus = 0.0
|
|
|
|
total, eb, cb = _compute_total_score(
|
|
semantic_score,
|
|
payload,
|
|
edge_bonus_raw=edge_bonus,
|
|
cent_bonus_raw=cent_bonus,
|
|
dynamic_edge_boosts=dynamic_edge_boosts
|
|
)
|
|
enriched.append((pid, float(semantic_score), payload, total, eb, cb))
|
|
|
|
enriched_sorted = sorted(enriched, key=lambda h: h[3], reverse=True)
|
|
limited = enriched_sorted[: max(1, top_k)]
|
|
|
|
results: List[QueryHit] = []
|
|
for pid, semantic_score, payload, total, eb, cb in limited:
|
|
explanation_obj = None
|
|
if explain:
|
|
explanation_obj = _build_explanation(
|
|
semantic_score=float(semantic_score),
|
|
payload=payload,
|
|
edge_bonus=eb,
|
|
cent_bonus=cb,
|
|
subgraph=subgraph,
|
|
node_key=payload.get("chunk_id") or payload.get("note_id")
|
|
)
|
|
|
|
text_content = payload.get("page_content") or payload.get("text") or payload.get("content")
|
|
|
|
results.append(QueryHit(
|
|
node_id=str(pid),
|
|
note_id=payload.get("note_id", "unknown"),
|
|
semantic_score=float(semantic_score),
|
|
edge_bonus=eb,
|
|
centrality_bonus=cb,
|
|
total_score=total,
|
|
paths=None,
|
|
source={
|
|
"path": payload.get("path"),
|
|
"section": payload.get("section") or payload.get("section_title"),
|
|
"text": text_content
|
|
},
|
|
payload=payload,
|
|
explanation=explanation_obj
|
|
))
|
|
|
|
dt = int((time.time() - t0) * 1000)
|
|
return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt)
|
|
|
|
|
|
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
|
|
"""Reiner semantischer Retriever (WP-02)."""
|
|
client, prefix = _get_client_and_prefix()
|
|
vector = _get_query_vector(req)
|
|
top_k = req.top_k or get_settings().RETRIEVER_TOP_K
|
|
|
|
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
|
|
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None, explain=req.explain)
|
|
|
|
|
|
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
|
|
"""Hybrid-Retriever: semantische Suche + optionale Edge-Expansion (WP-04a)."""
|
|
client, prefix = _get_client_and_prefix()
|
|
if req.query_vector:
|
|
vector = list(req.query_vector)
|
|
else:
|
|
vector = _get_query_vector(req)
|
|
|
|
top_k = req.top_k or get_settings().RETRIEVER_TOP_K
|
|
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
|
|
|
|
depth, edge_types = _extract_expand_options(req)
|
|
|
|
# WP-22: Dynamic Boosts aus dem Request (vom Router)
|
|
boost_edges = getattr(req, "boost_edges", {})
|
|
|
|
subgraph: ga.Subgraph | None = None
|
|
if depth and depth > 0:
|
|
seed_ids: List[str] = []
|
|
for _pid, _score, payload in hits:
|
|
key = payload.get("note_id")
|
|
if key and key not in seed_ids:
|
|
seed_ids.append(key)
|
|
if seed_ids:
|
|
try:
|
|
# Subgraph laden
|
|
subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=edge_types)
|
|
|
|
# --- WP-22: Kanten-Boosts im RAM-Graphen anwenden ---
|
|
# Dies manipuliert die Gewichte im Graphen, bevor der 'edge_bonus' berechnet wird.
|
|
if boost_edges and subgraph and hasattr(subgraph, "graph"):
|
|
for u, v, data in subgraph.graph.edges(data=True):
|
|
k = data.get("kind")
|
|
if k in boost_edges:
|
|
# Gewicht multiplizieren (z.B. caused_by * 3.0)
|
|
data["weight"] = data.get("weight", 1.0) * boost_edges[k]
|
|
|
|
except Exception:
|
|
subgraph = None
|
|
|
|
return _build_hits_from_semantic(
|
|
hits,
|
|
top_k=top_k,
|
|
used_mode="hybrid",
|
|
subgraph=subgraph,
|
|
explain=req.explain,
|
|
dynamic_edge_boosts=boost_edges
|
|
)
|
|
|
|
|
|
class Retriever:
|
|
"""
|
|
Wrapper-Klasse für Suchoperationen.
|
|
"""
|
|
def __init__(self):
|
|
pass
|
|
|
|
async def search(self, request: QueryRequest) -> QueryResponse:
|
|
return hybrid_retrieve(request) |