aufteilung retriever

This commit is contained in:
Lars 2025-12-18 17:13:36 +01:00
parent 5dd58f49f0
commit 33b0c83c87
2 changed files with 256 additions and 249 deletions

View File

@ -1,185 +1,63 @@
""" """
FILE: app/core/retriever.py FILE: app/core/retriever.py
DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability). DESCRIPTION: Haupt-Schnittstelle für die Suche. Orchestriert Vektorsuche und Graph-Expansion.
WP-22 Update: Dynamic Edge Boosting, Lifecycle Scoring & Provenance Awareness. Nutzt retriever_scoring.py für die WP-22 Logik.
Enthält detaillierte Debug-Informationen für die mathematische Verifizierung. VERSION: 0.6.14 (WP-22 Full, Debug & Stable)
VERSION: 0.6.12 (WP-22 Full, Debug & Stable)
STATUS: Active STATUS: Active
DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.core.graph_adapter, app.core.retriever_scoring
LAST_ANALYSIS: 2025-12-18
""" """
from __future__ import annotations from __future__ import annotations
import os import os
import time import time
import logging import logging
from functools import lru_cache
from typing import Any, Dict, List, Tuple, Iterable, Optional from typing import Any, Dict, List, Tuple, Iterable, Optional
from app.config import get_settings from app.config import get_settings
from app.models.dto import ( from app.models.dto import (
QueryRequest, QueryRequest, QueryResponse, QueryHit,
QueryResponse, Explanation, ScoreBreakdown, Reason, EdgeDTO
QueryHit,
Explanation,
ScoreBreakdown,
Reason,
EdgeDTO
) )
import app.core.qdrant as qdr import app.core.qdrant as qdr
import app.core.qdrant_points as qp import app.core.qdrant_points as qp
import app.services.embeddings_client as ec import app.services.embeddings_client as ec
import app.core.graph_adapter as ga import app.core.graph_adapter as ga
try: # Mathematische Engine importieren
import yaml # type: ignore[import] from app.core.retriever_scoring import get_weights, compute_wp22_score
except Exception: # pragma: no cover
yaml = None # type: ignore[assignment]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ============================================================================== # --- Hilfsfunktionen für Qdrant ---
# 1. CORE HELPERS & CONFIG LOADERS
# ==============================================================================
@lru_cache
def _get_scoring_weights() -> Tuple[float, float, float]:
"""
Liefert die Basis-Gewichtung (semantic_weight, edge_weight, centrality_weight).
Priorität: 1. retriever.yaml -> 2. Environment/Settings -> 3. Hardcoded Defaults
"""
settings = get_settings()
sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0))
edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0))
cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0))
config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml")
if yaml is None:
return sem, edge, cent
try:
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
scoring = data.get("scoring", {}) or {}
sem = float(scoring.get("semantic_weight", sem))
edge = float(scoring.get("edge_weight", edge))
cent = float(scoring.get("centrality_weight", cent))
except Exception as e:
logger.warning(f"Failed to load weights from {config_path}: {e}")
return sem, edge, cent
return sem, edge, cent
def _get_client_and_prefix() -> Tuple[Any, str]: def _get_client_and_prefix() -> Tuple[Any, str]:
"""Liefert das initialisierte Qdrant-Client-Objekt und das Collection-Präfix.""" """Initialisiert Qdrant Client und lädt Collection-Prefix."""
cfg = qdr.QdrantConfig.from_env() cfg = qdr.QdrantConfig.from_env()
client = qdr.get_client(cfg) return qdr.get_client(cfg), cfg.prefix
return client, cfg.prefix
def _get_query_vector(req: QueryRequest) -> List[float]: def _get_query_vector(req: QueryRequest) -> List[float]:
"""Wandelt Text-Queries via EmbeddingsClient um oder nutzt vorhandenen Vektor.""" """Vektorisiert die Anfrage oder nutzt vorhandenen Vektor."""
if req.query_vector: if req.query_vector:
return list(req.query_vector) return list(req.query_vector)
if not req.query: if not req.query:
raise ValueError("QueryRequest benötigt entweder 'query' oder 'query_vector'") raise ValueError("Kein Text oder Vektor für die Suche angegeben.")
settings = get_settings() settings = get_settings()
model_name = settings.MODEL_NAME return ec.embed_text(req.query, model_name=settings.MODEL_NAME)
try:
return ec.embed_text(req.query, model_name=model_name)
except TypeError:
return ec.embed_text(req.query)
def _semantic_hits( def _semantic_hits(
client: Any, client: Any,
prefix: str, prefix: str,
vector: List[float], vector: List[float],
top_k: int, top_k: int,
filters: Dict[str, Any] | None = None, filters: Optional[Dict] = None
) -> List[Tuple[str, float, Dict[str, Any]]]: ) -> List[Tuple[str, float, Dict[str, Any]]]:
"""Führt eine Vektorsuche in Qdrant aus.""" """Führt die Vektorsuche durch und konvertiert Qdrant-Points in ein einheitliches Format."""
flt = filters or None raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=filters)
raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt) # Strikte Typkonvertierung für Stabilität
results: List[Tuple[str, float, Dict[str, Any]]] = [] return [(str(hit[0]), float(hit[1]), dict(hit[2] or {})) for hit in raw_hits]
for pid, score, payload in raw_hits:
results.append((str(pid), float(score), dict(payload or {})))
return results
# ============================================================================== # --- Explanation Layer (Detaillierte Begründungen) ---
# 2. WP-22 SCORING LOGIC (LIFECYCLE & FORMULA)
# ==============================================================================
def _get_status_multiplier(payload: Dict[str, Any]) -> float:
"""
WP-22 A: Lifecycle-Scoring.
- stable: 1.2 (Validiertes Wissen fördern)
- draft: 0.5 (Entwürfe de-priorisieren)
"""
status = str(payload.get("status", "active")).lower().strip()
if status == "stable":
return 1.2
if status == "draft":
return 0.5
return 1.0
def _compute_total_score(
semantic_score: float,
payload: Dict[str, Any],
edge_bonus_raw: float = 0.0,
cent_bonus_raw: float = 0.0,
dynamic_edge_boosts: Dict[str, float] = None
) -> Dict[str, Any]:
"""
Die zentrale mathematische Scoring-Formel von WP-22.
Score = (Similarity * StatusMult) * (1 + (Weight-1) + DynamicBoost)
"""
_sem_w, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
status_mult = _get_status_multiplier(payload)
node_weight = float(payload.get("retriever_weight", 1.0))
# 1. Base Score (Semantik * Lifecycle)
base_val = float(semantic_score) * status_mult
# 2. Graph Boost Factor (WP-22 C)
# Globaler Verstärker für Graph-Signale bei spezifischen Intents
graph_boost_factor = 1.5 if dynamic_edge_boosts and (edge_bonus_raw > 0 or cent_bonus_raw > 0) else 1.0
# 3. Graph Contributions
edge_impact_raw = edge_w_cfg * edge_bonus_raw
cent_impact_raw = cent_w_cfg * cent_bonus_raw
# Finaler Impact unter Einbeziehung des Intent-Boosters
edge_impact_final = edge_impact_raw * graph_boost_factor
cent_impact_final = cent_impact_raw * graph_boost_factor
dynamic_graph_impact = edge_impact_final + cent_impact_final
# 4. Final Merge
total = base_val * (1.0 + (node_weight - 1.0) + dynamic_graph_impact)
# Debug Logging für Berechnungs-Validierung
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Scoring Node {payload.get('note_id')}: Base={base_val:.3f}, GraphI={dynamic_graph_impact:.3f} -> Total={total:.3f}")
return {
"total": max(0.001, float(total)),
"edge_bonus": float(edge_bonus_raw),
"cent_bonus": float(cent_bonus_raw),
"status_multiplier": status_mult,
"graph_boost_factor": graph_boost_factor,
"type_impact": node_weight - 1.0,
"base_val": base_val,
"edge_impact_final": edge_impact_final,
"cent_impact_final": cent_impact_final
}
# ==============================================================================
# 3. EXPLANATION LAYER (DEBUG & VERIFIABILITY)
# ==============================================================================
def _build_explanation( def _build_explanation(
semantic_score: float, semantic_score: float,
@ -189,16 +67,14 @@ def _build_explanation(
target_note_id: Optional[str], target_note_id: Optional[str],
applied_boosts: Optional[Dict[str, float]] = None applied_boosts: Optional[Dict[str, float]] = None
) -> Explanation: ) -> Explanation:
"""Erstellt ein detailliertes Explanation-Objekt inkl. WP-22 Metriken.""" """
_, edge_w_cfg, _ = _get_scoring_weights() Transformiert mathematische Scores und Graph-Signale in eine menschenlesbare Erklärung.
Behebt Pydantic ValidationErrors durch explizite String-Sicherung.
type_weight = float(payload.get("retriever_weight", 1.0)) """
status_mult = scoring_debug["status_multiplier"] _, edge_w_cfg, _ = get_weights()
graph_bf = scoring_debug["graph_boost_factor"]
note_type = payload.get("type", "unknown")
base_val = scoring_debug["base_val"] base_val = scoring_debug["base_val"]
# 1. Score Breakdown Objekt # 1. Detaillierter mathematischer Breakdown
breakdown = ScoreBreakdown( breakdown = ScoreBreakdown(
semantic_contribution=base_val, semantic_contribution=base_val,
edge_contribution=base_val * scoring_debug["edge_impact_final"], edge_contribution=base_val * scoring_debug["edge_impact_final"],
@ -206,29 +82,27 @@ def _build_explanation(
raw_semantic=semantic_score, raw_semantic=semantic_score,
raw_edge_bonus=scoring_debug["edge_bonus"], raw_edge_bonus=scoring_debug["edge_bonus"],
raw_centrality=scoring_debug["cent_bonus"], raw_centrality=scoring_debug["cent_bonus"],
node_weight=type_weight, node_weight=float(payload.get("retriever_weight", 1.0)),
status_multiplier=status_mult, status_multiplier=scoring_debug["status_multiplier"],
graph_boost_factor=graph_bf graph_boost_factor=scoring_debug["graph_boost_factor"]
) )
reasons: List[Reason] = [] reasons: List[Reason] = []
edges_dto: List[EdgeDTO] = [] edges_dto: List[EdgeDTO] = []
# 2. Gründe generieren # 2. Gründe für Semantik hinzufügen
if semantic_score > 0.85: if semantic_score > 0.85:
reasons.append(Reason(kind="semantic", message="Herausragende inhaltliche Übereinstimmung.", score_impact=base_val)) reasons.append(Reason(kind="semantic", message="Sehr hohe textuelle Übereinstimmung.", score_impact=base_val))
elif semantic_score > 0.70: elif semantic_score > 0.70:
reasons.append(Reason(kind="semantic", message="Gute inhaltliche Übereinstimmung.", score_impact=base_val)) reasons.append(Reason(kind="semantic", message="Inhaltliche Übereinstimmung.", score_impact=base_val))
# 3. Gründe für Typ und Lifecycle
type_weight = float(payload.get("retriever_weight", 1.0))
if type_weight != 1.0: if type_weight != 1.0:
msg = "Bevorzugt" if type_weight > 1.0 else "Abgewertet" msg = "Bevorzugt" if type_weight > 1.0 else "De-priorisiert"
reasons.append(Reason(kind="type", message=f"{msg} durch Typ '{note_type}'.", score_impact=base_val * (type_weight - 1.0))) reasons.append(Reason(kind="type", message=f"{msg} aufgrund des Notiz-Typs.", score_impact=base_val * (type_weight - 1.0)))
if status_mult != 1.0: # 4. Kanten-Verarbeitung (Graph-Intelligence)
txt = "Bonus" if status_mult > 1.0 else "Malus"
reasons.append(Reason(kind="lifecycle", message=f"Status-{txt} ({payload.get('status')}).", score_impact=0.0))
# 3. Kanten-Details (WP-22 B)
if subgraph and target_note_id and scoring_debug["edge_bonus"] > 0: if subgraph and target_note_id and scoring_debug["edge_bonus"] > 0:
raw_edges = [] raw_edges = []
if hasattr(subgraph, "get_incoming_edges"): if hasattr(subgraph, "get_incoming_edges"):
@ -237,37 +111,42 @@ def _build_explanation(
raw_edges.extend(subgraph.get_outgoing_edges(target_note_id) or []) raw_edges.extend(subgraph.get_outgoing_edges(target_note_id) or [])
for edge in raw_edges: for edge in raw_edges:
src = edge.get("source") # FIX: Zwingende String-Konvertierung für Pydantic-Stabilität
tgt = edge.get("target") src = str(edge.get("source") or "note_root")
k = edge.get("kind", "edge") tgt = str(edge.get("target") or target_note_id or "unknown_target")
prov = edge.get("provenance", "rule") kind = str(edge.get("kind", "related_to"))
prov = str(edge.get("provenance", "rule"))
conf = float(edge.get("confidence", 1.0)) conf = float(edge.get("confidence", 1.0))
is_incoming = (tgt == target_note_id) direction = "in" if tgt == target_note_id else "out"
direction = "in" if is_incoming else "out"
# Peer-ID bestimmen (für die Anzeige)
neighbor_name = src if is_incoming else tgt
edge_obj = EdgeDTO( edge_obj = EdgeDTO(
id=f"{src}->{tgt}:{k}", kind=k, source=src, target=tgt, id=f"{src}->{tgt}:{kind}",
weight=conf, direction=direction, kind=kind,
provenance=prov, confidence=conf source=src,
target=tgt,
weight=conf,
direction=direction,
provenance=prov,
confidence=conf
) )
edges_dto.append(edge_obj) edges_dto.append(edge_obj)
# Die 3 wichtigsten Kanten als Begründung formulieren
top_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True) top_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True)
for e in top_edges[:3]: for e in top_edges[:3]:
prov_txt = "Explizite" if e.provenance == "explicit" else "Heuristische" peer = e.source if e.direction == "in" else e.target
prov_txt = "Bestätigte" if e.provenance == "explicit" else "KI-basierte"
boost_txt = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else "" boost_txt = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else ""
# Richtigen Nachbarn für die Reason-Message finden reasons.append(Reason(
target_name = e.source if e.direction == "in" else e.target kind="edge",
msg = f"{prov_txt} Kante '{e.kind}'{boost_txt} von/zu '{target_name}'." message=f"{prov_txt} Kante '{e.kind}'{boost_txt} von/zu '{peer}'.",
reasons.append(Reason(kind="edge", message=msg, score_impact=edge_w_cfg * e.confidence)) score_impact=edge_w_cfg * e.confidence
))
if scoring_debug["cent_bonus"] > 0.01: if scoring_debug["cent_bonus"] > 0.01:
reasons.append(Reason(kind="centrality", message="Knoten liegt zentral im Kontext.", score_impact=breakdown.centrality_contribution)) reasons.append(Reason(kind="centrality", message="Die Notiz ist ein zentraler Informations-Hub.", score_impact=breakdown.centrality_contribution))
return Explanation( return Explanation(
breakdown=breakdown, breakdown=breakdown,
@ -276,17 +155,7 @@ def _build_explanation(
applied_boosts=applied_boosts applied_boosts=applied_boosts
) )
# --- Kern-Logik für Hybrid-Retrieval ---
def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]:
"""Extrahiert Expansion-Tiefe und Kanten-Filter."""
expand = getattr(req, "expand", None)
if not expand: return 0, None
if isinstance(expand, dict):
return int(expand.get("depth", 1)), expand.get("edge_types")
if hasattr(expand, "depth"):
return int(getattr(expand, "depth", 1)), getattr(expand, "edge_types", None)
return 1, None
def _build_hits_from_semantic( def _build_hits_from_semantic(
hits: Iterable[Tuple[str, float, Dict[str, Any]]], hits: Iterable[Tuple[str, float, Dict[str, Any]]],
@ -296,110 +165,128 @@ def _build_hits_from_semantic(
explain: bool = False, explain: bool = False,
dynamic_edge_boosts: Dict[str, float] = None dynamic_edge_boosts: Dict[str, float] = None
) -> QueryResponse: ) -> QueryResponse:
"""Wandelt semantische Roh-Treffer in strukturierte QueryHits um.""" """Wandelt semantische Roh-Treffer in hochgeladene, bewertete QueryHits um."""
t0 = time.time() t0 = time.time()
enriched = [] enriched = []
for pid, semantic_score, payload in hits: for pid, semantic_score, payload in hits:
edge_bonus, cent_bonus = 0.0, 0.0 edge_bonus, cent_bonus = 0.0, 0.0
target_note_id = payload.get("note_id") # Graph-Abfrage erfolgt IMMER über die Note-ID, nicht Chunk-ID
target_id = payload.get("note_id")
if subgraph is not None and target_note_id: if subgraph and target_id:
try: try:
edge_bonus = float(subgraph.edge_bonus(target_note_id)) edge_bonus = float(subgraph.edge_bonus(target_id))
cent_bonus = float(subgraph.centrality_bonus(target_note_id)) cent_bonus = float(subgraph.centrality_bonus(target_id))
except Exception: pass except Exception:
pass
debug_data = _compute_total_score( # Mathematisches Scoring via WP-22 Engine
semantic_score, payload, edge_bonus_raw=edge_bonus, debug_data = compute_wp22_score(
cent_bonus_raw=cent_bonus, dynamic_edge_boosts=dynamic_edge_boosts semantic_score, payload, edge_bonus, cent_bonus, dynamic_edge_boosts
) )
enriched.append((pid, float(semantic_score), payload, debug_data)) enriched.append((pid, semantic_score, payload, debug_data))
# Sortierung nach finalem Score # Sortierung nach finalem mathematischen Score
enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True) enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True)
limited_hits = enriched_sorted[: max(1, top_k)] limited_hits = enriched_sorted[: max(1, top_k)]
results: List[QueryHit] = [] results: List[QueryHit] = []
for pid, semantic_score, payload, debug in limited_hits: for pid, s_score, pl, dbg in limited_hits:
explanation_obj = None explanation_obj = None
if explain: if explain:
explanation_obj = _build_explanation( explanation_obj = _build_explanation(
semantic_score=float(semantic_score), semantic_score=float(s_score),
payload=payload, scoring_debug=debug, payload=pl,
subgraph=subgraph, target_note_id=payload.get("note_id"), scoring_debug=dbg,
subgraph=subgraph,
target_note_id=pl.get("note_id"),
applied_boosts=dynamic_edge_boosts applied_boosts=dynamic_edge_boosts
) )
text_content = payload.get("page_content") or payload.get("text") or payload.get("content") # Payload Text-Feld normalisieren
text_content = pl.get("page_content") or pl.get("text") or pl.get("content", "[Kein Text]")
results.append(QueryHit( results.append(QueryHit(
node_id=str(pid), node_id=str(pid),
note_id=payload.get("note_id", "unknown"), note_id=str(pl.get("note_id", "unknown")),
semantic_score=float(semantic_score), semantic_score=float(s_score),
edge_bonus=debug["edge_bonus"], edge_bonus=dbg["edge_bonus"],
centrality_bonus=debug["cent_bonus"], centrality_bonus=dbg["cent_bonus"],
total_score=debug["total"], total_score=dbg["total"],
source={ source={
"path": payload.get("path"), "path": pl.get("path"),
"section": payload.get("section") or payload.get("section_title"), "section": pl.get("section") or pl.get("section_title"),
"text": text_content "text": text_content
}, },
payload=payload, payload=pl,
explanation=explanation_obj explanation=explanation_obj
)) ))
return QueryResponse(results=results, used_mode=used_mode, latency_ms=int((time.time() - t0) * 1000)) return QueryResponse(results=results, used_mode=used_mode, latency_ms=int((time.time() - t0) * 1000))
# ==============================================================================
# 4. PUBLIC INTERFACE
# ==============================================================================
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""Standard-Vektorsuche ohne Graph-Einfluss (WP-02)."""
client, prefix = _get_client_and_prefix()
vector = _get_query_vector(req)
top_k = req.top_k or 10
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None, explain=req.explain)
def hybrid_retrieve(req: QueryRequest) -> QueryResponse: def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
"""Hybrid-Suche: Semantik + WP-22 Graph Intelligence.""" """
Die Haupt-Einstiegsfunktion für die hybride Suche.
Kombiniert Vektorsuche mit Graph-Expansion, Provenance-Weighting und Intent-Boosting.
"""
client, prefix = _get_client_and_prefix() client, prefix = _get_client_and_prefix()
vector = list(req.query_vector) if req.query_vector else _get_query_vector(req) vector = list(req.query_vector) if req.query_vector else _get_query_vector(req)
top_k = req.top_k or 10 top_k = req.top_k or 10
# 1. Semantische Seed-Suche
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters) hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
expand_depth, edge_types = _extract_expand_options(req) # 2. Graph Expansion Konfiguration
expand_cfg = req.expand if isinstance(req.expand, dict) else {}
depth = int(expand_cfg.get("depth", 1))
boost_edges = getattr(req, "boost_edges", {}) or {} boost_edges = getattr(req, "boost_edges", {}) or {}
subgraph: ga.Subgraph | None = None subgraph: ga.Subgraph | None = None
if expand_depth > 0 and hits: if depth > 0 and hits:
# Start-IDs für den Graph-Traversal sammeln
seed_ids = list({h[2].get("note_id") for h in hits if h[2].get("note_id")}) seed_ids = list({h[2].get("note_id") for h in hits if h[2].get("note_id")})
if seed_ids: if seed_ids:
try: try:
subgraph = ga.expand(client, prefix, seed_ids, depth=expand_depth, edge_types=edge_types) # Subgraph aus RAM/DB laden
subgraph = ga.expand(client, prefix, seed_ids, depth=depth, edge_types=expand_cfg.get("edge_types"))
# --- WP-22: Kanten-Gewichtung im RAM-Graphen vor Bonus-Berechnung ---
if subgraph and hasattr(subgraph, "graph"): if subgraph and hasattr(subgraph, "graph"):
for u, v, data in subgraph.graph.edges(data=True): for _, _, data in subgraph.graph.edges(data=True):
# Provenance Weighting (Concept 2.6) # A. Provenance Weighting (WP-22 Bonus für Herkunft)
prov = data.get("provenance", "rule") prov = data.get("provenance", "rule")
# Belohnung: Explizite Links (1.0) > Smart (0.9) > Rule (0.7)
prov_w = 1.0 if prov == "explicit" else (0.9 if prov == "smart" else 0.7) prov_w = 1.0 if prov == "explicit" else (0.9 if prov == "smart" else 0.7)
# Intent Boost Mapping # B. Intent Boost Multiplikator (Vom Router dynamisch injiziert)
k = data.get("kind") kind = data.get("kind")
intent_multiplier = boost_edges.get(k, 1.0) intent_multiplier = boost_edges.get(kind, 1.0)
# Finales Kanten-Gewicht im Graphen setzen # Finales Gewicht setzen (Basis * Provenance * Intent)
data["weight"] = data.get("weight", 1.0) * prov_w * intent_multiplier data["weight"] = data.get("weight", 1.0) * prov_w * intent_multiplier
except Exception as e: except Exception as e:
logger.error(f"Graph expansion failed: {e}") logger.error(f"Graph Expansion failed criticaly: {e}", exc_info=True)
subgraph = None subgraph = None
# 3. Scoring & Explanation Generierung
return _build_hits_from_semantic(hits, top_k, "hybrid", subgraph, req.explain, boost_edges) return _build_hits_from_semantic(hits, top_k, "hybrid", subgraph, req.explain, boost_edges)
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""Standard Vektorsuche ohne Graph-Einfluss (WP-02 Fallback)."""
client, prefix = _get_client_and_prefix()
vector = _get_query_vector(req)
hits = _semantic_hits(client, prefix, vector, req.top_k or 10, req.filters)
return _build_hits_from_semantic(hits, req.top_k or 10, "semantic", explain=req.explain)
class Retriever: class Retriever:
"""Asynchroner Wrapper für FastAPI-Integration.""" """Schnittstelle für die asynchrone Suche."""
async def search(self, request: QueryRequest) -> QueryResponse: async def search(self, request: QueryRequest) -> QueryResponse:
return await ga.run_in_threadpool(hybrid_retrieve, request) if hasattr(ga, "run_in_threadpool") else hybrid_retrieve(request) """Führt eine Suche durch. Nutzt hybrid_retrieve als Standard."""
# Standard ist Hybrid-Modus
return hybrid_retrieve(request)

View File

@ -0,0 +1,120 @@
"""
FILE: app/core/retriever_scoring.py
DESCRIPTION: Mathematische Kern-Logik für das WP-22 Scoring.
Berechnet Relevanz-Scores basierend auf Semantik, Graph-Intelligence und Content Lifecycle.
VERSION: 1.0.1 (WP-22 Full Math Engine)
STATUS: Active
DEPENDENCIES: app.config, typing
"""
import os
import logging
from functools import lru_cache
from typing import Any, Dict, Tuple, Optional
try:
import yaml
except ImportError:
yaml = None
logger = logging.getLogger(__name__)
@lru_cache
def get_weights() -> Tuple[float, float, float]:
"""
Liefert die Basis-Gewichtung (semantic, edge, centrality) aus der Konfiguration.
Priorität:
1. config/retriever.yaml (Scoring-Sektion)
2. Umgebungsvariablen (RETRIEVER_W_*)
3. System-Defaults (1.0, 0.0, 0.0)
"""
from app.config import get_settings
settings = get_settings()
# Defaults aus Settings laden
sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0))
edge = float(getattr(settings, "RETRIEVER_W_EDGE", 0.0))
cent = float(getattr(settings, "RETRIEVER_W_CENT", 0.0))
# Optionaler Override via YAML
config_path = os.getenv("MINDNET_RETRIEVER_CONFIG", "config/retriever.yaml")
if yaml and os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
scoring = data.get("scoring", {})
sem = float(scoring.get("semantic_weight", sem))
edge = float(scoring.get("edge_weight", edge))
cent = float(scoring.get("centrality_weight", cent))
except Exception as e:
logger.warning(f"Retriever Configuration could not be fully loaded from {config_path}: {e}")
return sem, edge, cent
def get_status_multiplier(payload: Dict[str, Any]) -> float:
"""
WP-22 A: Content Lifecycle Multiplier.
Steuert das Ranking basierend auf dem Reifegrad der Information.
- stable: 1.2 (Belohnung für verifiziertes Wissen)
- active: 1.0 (Standard-Gewichtung)
- draft: 0.5 (Bestrafung für unfertige Fragmente)
"""
status = str(payload.get("status", "active")).lower().strip()
if status == "stable":
return 1.2
if status == "draft":
return 0.5
return 1.0
def compute_wp22_score(
semantic_score: float,
payload: Dict[str, Any],
edge_bonus_raw: float = 0.0,
cent_bonus_raw: float = 0.0,
dynamic_edge_boosts: Optional[Dict[str, float]] = None
) -> Dict[str, Any]:
"""
Die zentrale mathematische Scoring-Formel der Mindnet Intelligence.
Implementiert das WP-22 Hybrid-Scoring (Semantic * Lifecycle * Graph).
FORMEL:
Score = (Similarity * StatusMult) * (1 + (TypeWeight - 1) + ((EdgeW * EB + CentW * CB) * IntentBoost))
Returns:
Dict mit dem finalen 'total' Score und allen mathematischen Zwischenwerten für den Explanation Layer.
"""
sem_w, edge_w_cfg, cent_w_cfg = get_weights()
status_mult = get_status_multiplier(payload)
# Retriever Weight (Type Boost aus types.yaml, z.B. 1.1 für Decisions)
node_weight = float(payload.get("retriever_weight", 1.0))
# 1. Berechnung des Base Scores (Semantik gewichtet durch Lifecycle-Status)
base_val = float(semantic_score) * status_mult
# 2. Graph Boost Factor (Teil C: Intent-spezifische Verstärkung)
# Erhöht das Gewicht des gesamten Graphen um 50%, wenn ein spezifischer Intent vorliegt.
graph_boost_factor = 1.5 if dynamic_edge_boosts and (edge_bonus_raw > 0 or cent_bonus_raw > 0) else 1.0
# 3. Einzelne Graph-Komponenten berechnen
edge_impact_final = (edge_w_cfg * edge_bonus_raw) * graph_boost_factor
cent_impact_final = (cent_w_cfg * cent_bonus_raw) * graph_boost_factor
# 4. Finales Zusammenführen (Merging)
# node_weight - 1.0 sorgt dafür, dass ein Gewicht von 1.0 keinen Einfluss hat (neutral).
total = base_val * (1.0 + (node_weight - 1.0) + edge_impact_final + cent_impact_final)
# Sicherstellen, dass der Score niemals 0 oder negativ ist (Floor)
final_score = max(0.0001, float(total))
return {
"total": final_score,
"edge_bonus": float(edge_bonus_raw),
"cent_bonus": float(cent_bonus_raw),
"status_multiplier": status_mult,
"graph_boost_factor": graph_boost_factor,
"type_impact": node_weight - 1.0,
"base_val": base_val,
"edge_impact_final": edge_impact_final,
"cent_impact_final": cent_impact_final
}