stark gekürzter retriever

This commit is contained in:
Lars 2025-12-18 16:53:29 +01:00
parent babab3167b
commit cbfdd96152
2 changed files with 96 additions and 175 deletions

View File

@ -3,7 +3,7 @@ FILE: app/core/retriever.py
DESCRIPTION: Implementiert die Hybrid-Suche (Vektor + Graph-Expansion) und das Scoring-Modell (Explainability).
WP-22 Update: Dynamic Edge Boosting, Lifecycle Scoring & Provenance Awareness.
Enthält detaillierte Debug-Informationen für die mathematische Verifizierung.
VERSION: 0.6.8 (WP-22 Debug & Verifiability)
VERSION: 0.6.10 (WP-22 Full, Debug & Stable)
STATUS: Active
DEPENDENCIES: app.config, app.models.dto, app.core.qdrant*, app.services.embeddings_client, app.core.graph_adapter
LAST_ANALYSIS: 2025-12-18
@ -38,11 +38,15 @@ except Exception: # pragma: no cover
logger = logging.getLogger(__name__)
# ==============================================================================
# 1. CORE HELPERS & CONFIG LOADERS
# ==============================================================================
@lru_cache
def _get_scoring_weights() -> Tuple[float, float, float]:
"""
Liefert die Basis-Gewichtung (semantic_weight, edge_weight, centrality_weight) aus der Config.
Priorität: 1. retriever.yaml -> 2. Environment/Settings -> 3. Hardcoded Defaults
Liefert die Basis-Gewichtung (semantic_weight, edge_weight, centrality_weight).
Prio: 1. retriever.yaml -> 2. Environment -> 3. Hardcoded Defaults
"""
settings = get_settings()
sem = float(getattr(settings, "RETRIEVER_W_SEM", 1.0))
@ -61,23 +65,20 @@ def _get_scoring_weights() -> Tuple[float, float, float]:
edge = float(scoring.get("edge_weight", edge))
cent = float(scoring.get("centrality_weight", cent))
except Exception as e:
logger.warning(f"Failed to load retriever weights from {config_path}: {e}")
logger.warning(f"Failed to load weights from {config_path}: {e}")
return sem, edge, cent
return sem, edge, cent
def _get_client_and_prefix() -> Tuple[Any, str]:
"""Liefert das initialisierte Qdrant-Client-Objekt und das aktuelle Collection-Präfix."""
"""Liefert das initialisierte Qdrant-Client-Objekt und das Collection-Präfix."""
cfg = qdr.QdrantConfig.from_env()
client = qdr.get_client(cfg)
return client, cfg.prefix
def _get_query_vector(req: QueryRequest) -> List[float]:
"""
Stellt sicher, dass ein Query-Vektor vorhanden ist.
Wandelt Text-Queries via EmbeddingsClient um, falls kein Vektor im Request liegt.
"""
"""Wandelt Text-Queries via EmbeddingsClient um oder nutzt vorhandenen Vektor."""
if req.query_vector:
return list(req.query_vector)
@ -88,10 +89,8 @@ def _get_query_vector(req: QueryRequest) -> List[float]:
model_name = settings.MODEL_NAME
try:
# Versuch mit modernem Interface (WP-03 kompatibel)
return ec.embed_text(req.query, model_name=model_name)
except TypeError:
# Fallback für ältere EmbeddingsClient-Signaturen
return ec.embed_text(req.query)
@ -102,7 +101,7 @@ def _semantic_hits(
top_k: int,
filters: Dict[str, Any] | None = None,
) -> List[Tuple[str, float, Dict[str, Any]]]:
"""Führt eine reine Vektorsuche in Qdrant aus und gibt die Roh-Treffer zurück."""
"""Führt eine Vektorsuche in Qdrant aus."""
flt = filters or None
raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=flt)
results: List[Tuple[str, float, Dict[str, Any]]] = []
@ -110,14 +109,15 @@ def _semantic_hits(
results.append((str(pid), float(score), dict(payload or {})))
return results
# --- WP-22 Helper: Lifecycle Multipliers (Teil A) ---
# ==============================================================================
# 2. WP-22 SCORING LOGIC (LIFECYCLE & FORMULA)
# ==============================================================================
def _get_status_multiplier(payload: Dict[str, Any]) -> float:
"""
Ermittelt den Multiplikator basierend auf dem Content-Status.
- stable: 1.2 (Belohnung für validiertes Wissen)
- active/default: 1.0
- draft: 0.5 (Bestrafung für Unfertiges)
WP-22 A: Lifecycle-Scoring.
- stable: 1.2 (Validiertes Wissen fördern)
- draft: 0.5 (Entwürfe de-priorisieren)
"""
status = str(payload.get("status", "active")).lower().strip()
if status == "stable":
@ -126,7 +126,6 @@ def _get_status_multiplier(payload: Dict[str, Any]) -> float:
return 0.5
return 1.0
# --- WP-22: Dynamic Scoring Formula (Teil C) ---
def _compute_total_score(
semantic_score: float,
@ -137,53 +136,41 @@ def _compute_total_score(
) -> Dict[str, Any]:
"""
Die zentrale mathematische Scoring-Formel von WP-22.
FORMEL:
Score = (SemanticScore * StatusMultiplier) * (1 + (Weight-1) + DynamicGraphBoost)
Hierbei gilt:
- BaseScore: semantic_similarity * status_multiplier
- TypeImpact: retriever_weight (z.B. 1.1 für Decisions)
- DynamicBoost: (EdgeW * EdgeBonus) + (CentW * CentBonus)
Score = (Similarity * StatusMult) * (1 + (Weight-1) + DynamicBoost)
"""
# 1. Basis-Parameter laden
_sem_w, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
status_mult = _get_status_multiplier(payload)
node_weight = float(payload.get("retriever_weight", 1.0))
# 2. Base Score (Semantik gewichtet durch Lifecycle)
# 1. Base Score (Semantik * Lifecycle)
base_val = float(semantic_score) * status_mult
# 3. Graph-Intelligence Boost (WP-22 C)
# Globaler Verstärker für Graph-Signale bei spezifischen Intents (z.B. WHY/EMPATHY)
# 2. Graph Boost Factor (WP-22 C)
graph_boost_factor = 1.5 if dynamic_edge_boosts and (edge_bonus_raw > 0 or cent_bonus_raw > 0) else 1.0
edge_contribution_raw = edge_w_cfg * edge_bonus_raw
cent_contribution_raw = cent_w_cfg * cent_bonus_raw
# 3. Graph Contributions
edge_impact = (edge_w_cfg * edge_bonus_raw) * graph_boost_factor
cent_impact = (cent_w_cfg * cent_bonus_raw) * graph_boost_factor
dynamic_graph_impact = edge_impact + cent_impact
dynamic_graph_impact = (edge_contribution_raw + cent_contribution_raw) * graph_boost_factor
# 4. Zusammenführung (Die "Dicke" des Knotens und die Verknüpfung)
# (node_weight - 1.0) ermöglicht negative oder positive Type-Impacts relativ zu 1.0
# 4. Final Merge
total = base_val * (1.0 + (node_weight - 1.0) + dynamic_graph_impact)
# Schutz vor negativen Scores (Floor)
final_score = max(0.001, float(total))
# Debug-Daten für den Explanation-Layer sammeln
return {
"total": final_score,
"total": max(0.001, float(total)),
"edge_bonus": float(edge_bonus_raw),
"cent_bonus": float(cent_bonus_raw),
"status_multiplier": status_mult,
"graph_boost_factor": graph_boost_factor,
"type_impact": node_weight - 1.0,
"base_val": base_val
"base_val": base_val,
"edge_impact_final": edge_impact,
"cent_impact_final": cent_impact
}
# --- WP-04b Explanation Logic ---
# ==============================================================================
# 3. EXPLANATION LAYER (DEBUG & VERIFIABILITY)
# ==============================================================================
def _build_explanation(
semantic_score: float,
@ -193,10 +180,7 @@ def _build_explanation(
target_note_id: Optional[str],
applied_boosts: Optional[Dict[str, float]] = None
) -> Explanation:
"""
Erstellt ein detailliertes Explanation-Objekt für maximale Transparenz (WP-04b).
Enthält nun WP-22 Debug-Metriken wie StatusMultiplier und GraphBoostFactor.
"""
"""Erstellt ein detailliertes Explanation-Objekt mit WP-22 Metriken."""
_, edge_w_cfg, cent_w_cfg = _get_scoring_weights()
type_weight = float(payload.get("retriever_weight", 1.0))
@ -205,11 +189,11 @@ def _build_explanation(
note_type = payload.get("type", "unknown")
base_val = scoring_debug["base_val"]
# 1. Score Breakdown Objekt
# 1. Score Breakdown
breakdown = ScoreBreakdown(
semantic_contribution=base_val,
edge_contribution=base_val * (edge_w_cfg * scoring_debug["edge_bonus"] * graph_bf),
centrality_contribution=base_val * (cent_w_cfg * scoring_debug["cent_bonus"] * graph_bf),
edge_contribution=base_val * scoring_debug["edge_impact_final"],
centrality_contribution=base_val * scoring_debug["cent_impact_final"],
raw_semantic=semantic_score,
raw_edge_bonus=scoring_debug["edge_bonus"],
raw_centrality=scoring_debug["cent_bonus"],
@ -221,21 +205,19 @@ def _build_explanation(
reasons: List[Reason] = []
edges_dto: List[EdgeDTO] = []
# 2. Gründe generieren
if semantic_score > 0.85:
reasons.append(Reason(kind="semantic", message="Herausragende inhaltliche Übereinstimmung.", score_impact=base_val))
elif semantic_score > 0.70:
reasons.append(Reason(kind="semantic", message="Gute inhaltliche Übereinstimmung.", score_impact=base_val))
# 2. Reasons generieren
if semantic_score > 0.70:
reasons.append(Reason(kind="semantic", message="Textuelle Übereinstimmung.", score_impact=base_val))
if type_weight != 1.0:
direction = "Bevorzugt" if type_weight > 1.0 else "Abgewertet"
reasons.append(Reason(kind="type", message=f"{direction} durch Typ-Profil '{note_type}'.", score_impact=base_val * (type_weight - 1.0)))
msg = "Bevorzugt" if type_weight > 1.0 else "Abgewertet"
reasons.append(Reason(kind="type", message=f"{msg} durch Typ '{note_type}'.", score_impact=base_val * (type_weight - 1.0)))
if status_mult != 1.0:
impact_txt = "Belohnt" if status_mult > 1.0 else "Zurückgestellt"
reasons.append(Reason(kind="lifecycle", message=f"{impact_txt} (Status: {payload.get('status', 'draft')}).", score_impact=0.0))
txt = "Bonus" if status_mult > 1.0 else "Malus"
reasons.append(Reason(kind="lifecycle", message=f"Status-{txt} ({payload.get('status')}).", score_impact=0.0))
# 3. Kanten-Details extrahieren (Incoming + Outgoing für volle Sichtbarkeit)
# 3. Kanten-Details (WP-22 B)
if subgraph and target_note_id and scoring_debug["edge_bonus"] > 0:
raw_edges = []
if hasattr(subgraph, "get_incoming_edges"):
@ -249,59 +231,49 @@ def _build_explanation(
prov = edge.get("provenance", "rule")
conf = float(edge.get("confidence", 1.0))
# Richtung und Nachbar bestimmen
is_incoming = (tgt == target_note_id)
neighbor = src if is_incoming else tgt
direction = "in" if is_incoming else "out"
# neighbor_id Scope-Fix
neighbor_id = src if is_incoming else tgt
edge_obj = EdgeDTO(
id=f"{src}->{tgt}:{k}", kind=k, source=src, target=tgt,
weight=conf, direction="in" if is_incoming else "out",
weight=conf, direction=direction,
provenance=prov, confidence=conf
)
edges_dto.append(edge_obj)
# Die 3 stärksten Signale als Gründe formulieren
top_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True)
for e in top_edges[:3]:
prov_label = "Explizite" if e.provenance == "explicit" else "Heuristische"
boost_label = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else ""
prov_txt = "Explizite" if e.provenance == "explicit" else "Heuristische"
boost_txt = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else ""
msg = f"{prov_label} Verbindung ({e.kind}){boost_label} zu '{neighbor}'."
# Nachbar-ID innerhalb der Loop sicherstellen
target_name = e.source if e.direction == "in" else e.target
msg = f"{prov_txt} Kante '{e.kind}'{boost_txt} von/zu '{target_name}'."
reasons.append(Reason(kind="edge", message=msg, score_impact=edge_w_cfg * e.confidence))
if scoring_debug["cent_bonus"] > 0.01:
reasons.append(Reason(kind="centrality", message="Knoten ist ein zentraler Hub im Kontext.", score_impact=breakdown.centrality_contribution))
reasons.append(Reason(kind="centrality", message="Knoten liegt zentral im Kontext.", score_impact=breakdown.centrality_contribution))
return Explanation(
breakdown=breakdown,
reasons=reasons,
related_edges=edges_dto if edges_dto else None,
applied_intent=getattr(ga, "_LAST_INTENT", "UNKNOWN"), # Debugging-Zweck
applied_boosts=applied_boosts
)
def _extract_expand_options(req: QueryRequest) -> Tuple[int, List[str] | None]:
"""Extrahiert Expansion-Tiefe und Kanten-Filter aus dem Request."""
"""Extrahiert Expansion-Tiefe und Kanten-Filter."""
expand = getattr(req, "expand", None)
if not expand:
return 0, None
depth = 1
edge_types = None
if not expand: return 0, None
if isinstance(expand, dict):
depth = int(expand.get("depth", 1))
edge_types = expand.get("edge_types")
if edge_types:
edge_types = list(edge_types)
return depth, edge_types
# Fallback für Pydantic Objekte
return int(expand.get("depth", 1)), expand.get("edge_types")
if hasattr(expand, "depth"):
return int(getattr(expand, "depth", 1)), getattr(expand, "edge_types", None)
return 0, None
return 1, None
def _build_hits_from_semantic(
@ -312,37 +284,26 @@ def _build_hits_from_semantic(
explain: bool = False,
dynamic_edge_boosts: Dict[str, float] = None
) -> QueryResponse:
"""
Wandelt semantische Roh-Treffer in strukturierte QueryHits um.
Berechnet den finalen Score pro Hit unter Einbeziehung des Subgraphen.
"""
"""Wandelt semantische Roh-Treffer in strukturierte QueryHits um."""
t0 = time.time()
enriched = []
for pid, semantic_score, payload in hits:
edge_bonus = 0.0
cent_bonus = 0.0
# Graph-Abfrage erfolgt IMMER über die Note-ID
edge_bonus, cent_bonus = 0.0, 0.0
target_note_id = payload.get("note_id")
if subgraph is not None and target_note_id:
try:
edge_bonus = float(subgraph.edge_bonus(target_note_id))
cent_bonus = float(subgraph.centrality_bonus(target_note_id))
except Exception as e:
logger.debug(f"Graph signal failed for {target_note_id}: {e}")
except Exception: pass
# Messbare Scoring-Daten via WP-22 Formel
debug_data = _compute_total_score(
semantic_score,
payload,
edge_bonus_raw=edge_bonus,
cent_bonus_raw=cent_bonus,
dynamic_edge_boosts=dynamic_edge_boosts
semantic_score, payload, edge_bonus_raw=edge_bonus,
cent_bonus_raw=cent_bonus, dynamic_edge_boosts=dynamic_edge_boosts
)
enriched.append((pid, float(semantic_score), payload, debug_data))
# Sortierung nach berechnetem Total Score
enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True)
limited_hits = enriched_sorted[: max(1, top_k)]
@ -352,15 +313,11 @@ def _build_hits_from_semantic(
if explain:
explanation_obj = _build_explanation(
semantic_score=float(semantic_score),
payload=payload,
scoring_debug=debug,
subgraph=subgraph,
target_note_id=payload.get("note_id"),
payload=payload, scoring_debug=debug,
subgraph=subgraph, target_note_id=payload.get("note_id"),
applied_boosts=dynamic_edge_boosts
)
text_content = payload.get("page_content") or payload.get("text") or payload.get("content")
results.append(QueryHit(
node_id=str(pid),
note_id=payload.get("note_id", "unknown"),
@ -368,87 +325,49 @@ def _build_hits_from_semantic(
edge_bonus=debug["edge_bonus"],
centrality_bonus=debug["cent_bonus"],
total_score=debug["total"],
source={
"path": payload.get("path"),
"section": payload.get("section") or payload.get("section_title"),
"text": text_content
},
source={"path": payload.get("path"), "text": payload.get("page_content") or payload.get("text")},
payload=payload,
explanation=explanation_obj
))
dt_ms = int((time.time() - t0) * 1000)
return QueryResponse(results=results, used_mode=used_mode, latency_ms=dt_ms)
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
"""Standard-Vektorsuche ohne Graph-Einfluss (WP-02)."""
client, prefix = _get_client_and_prefix()
vector = _get_query_vector(req)
top_k = req.top_k or 10
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
return _build_hits_from_semantic(hits, top_k=top_k, used_mode="semantic", subgraph=None, explain=req.explain)
return QueryResponse(results=results, used_mode=used_mode, latency_ms=int((time.time() - t0) * 1000))
# ==============================================================================
# 4. PUBLIC INTERFACE
# ==============================================================================
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
"""
Hybrid-Suche: Kombiniert Semantik mit WP-22 Graph Intelligence.
Führt Expansion durch, gewichtet nach Provenance und appliziert Intent-Boosts.
"""
"""Hybrid-Suche: Semantik + WP-22 Graph Intelligence."""
client, prefix = _get_client_and_prefix()
vector = list(req.query_vector) if req.query_vector else _get_query_vector(req)
top_k = req.top_k or 10
# 1. Semantische Seed-Suche
hits = _semantic_hits(client, prefix, vector, top_k=top_k, filters=req.filters)
# 2. Graph Expansion & Custom Weighting
expand_depth, edge_types = _extract_expand_options(req)
boost_edges = getattr(req, "boost_edges", {}) or {}
subgraph: ga.Subgraph | None = None
if expand_depth > 0 and hits:
# Extrahiere Note-IDs der Treffer als Startpunkte für den Graphen
seed_ids = list({h[2].get("note_id") for h in hits if h[2].get("note_id")})
if seed_ids:
try:
# Subgraph aus Qdrant laden
subgraph = ga.expand(client, prefix, seed_ids, depth=expand_depth, edge_types=edge_types)
# WP-22: Transformation der Gewichte im RAM-Graphen vor Bonus-Berechnung
if subgraph and hasattr(subgraph, "graph"):
for u, v, data in subgraph.graph.edges(data=True):
# A. Provenance Weighting (WP-22 Herkunfts-Bonus)
# Provenance Weighting (Concept 2.6)
prov = data.get("provenance", "rule")
# Explicit=1.0, Smart=0.9, Rule=0.7
prov_w = 1.0 if prov == "explicit" else (0.9 if prov == "smart" else 0.7)
# B. Intent Boost Multiplikator (Vom Router geladen)
# Intent Boost mapping
k = data.get("kind")
intent_multiplier = boost_edges.get(k, 1.0)
intent_b = boost_edges.get(k, 1.0)
data["weight"] = data.get("weight", 1.0) * prov_w * intent_b
except Exception: subgraph = None
# Finales Kanten-Gewicht im Graphen setzen
data["weight"] = data.get("weight", 1.0) * prov_w * intent_multiplier
except Exception as e:
logger.error(f"Graph expansion failed: {e}")
subgraph = None
# 3. Scoring & Explanation Generierung
return _build_hits_from_semantic(
hits,
top_k,
"hybrid",
subgraph,
req.explain,
boost_edges
)
return _build_hits_from_semantic(hits, top_k, "hybrid", subgraph, req.explain, boost_edges)
class Retriever:
"""Wrapper-Klasse für die konsolidierte Retrieval-Logik."""
"""Asynchroner Wrapper für FastAPI-Integration."""
async def search(self, request: QueryRequest) -> QueryResponse:
"""Führt eine hybride Suche aus. Asynchron für FastAPI-Integration."""
return hybrid_retrieve(request)

View File

@ -1,7 +1,7 @@
"""
FILE: app/models/dto.py
DESCRIPTION: Pydantic-Modelle (DTOs) für Request/Response Bodies. Definiert das API-Schema.
VERSION: 0.6.5 (WP-22 Debug & Verifiability Update)
VERSION: 0.6.6 (WP-22 Debug & Stability Update)
STATUS: Active
DEPENDENCIES: pydantic, typing, uuid
LAST_ANALYSIS: 2025-12-18
@ -12,6 +12,7 @@ from pydantic import BaseModel, Field
from typing import List, Literal, Optional, Dict, Any
import uuid
# Gültige Kanten-Typen gemäß Manual
EdgeKind = Literal["references", "references_at", "backlink", "next", "prev", "belongs_to", "depends_on", "related_to", "similar_to", "caused_by", "derived_from", "based_on", "solves", "blocks", "uses", "guides"]
@ -58,17 +59,18 @@ class QueryRequest(BaseModel):
filters: Optional[Dict] = None
ret: Dict = {"with_paths": True, "with_notes": True, "with_chunks": True}
explain: bool = False
# WP-22: Semantic Graph Routing
boost_edges: Optional[Dict[str, float]] = None
class FeedbackRequest(BaseModel):
"""
User-Feedback zu einem spezifischen Treffer oder der Gesamtantwort.
Basis für WP-08 (Self-Tuning).
User-Feedback zu einem spezifischen Treffer oder der Gesamtantwort (WP-08 Basis).
"""
query_id: str = Field(..., description="ID der ursprünglichen Suche")
node_id: str = Field(..., description="ID des bewerteten Treffers oder 'generated_answer'")
score: int = Field(..., ge=1, le=5, description="1 (Irrelevant/Falsch) bis 5 (Perfekt)")
score: int = Field(..., ge=1, le=5, description="1 (Irrelevant) bis 5 (Perfekt)")
comment: Optional[str] = None
@ -77,7 +79,7 @@ class ChatRequest(BaseModel):
WP-05: Request für /chat.
"""
message: str = Field(..., description="Die Nachricht des Users")
conversation_id: Optional[str] = Field(None, description="Optional: ID für Chat-Verlauf (noch nicht implementiert)")
conversation_id: Optional[str] = Field(None, description="ID für Chat-Verlauf")
top_k: int = 5
explain: bool = False
@ -93,7 +95,7 @@ class ScoreBreakdown(BaseModel):
raw_edge_bonus: float
raw_centrality: float
node_weight: float
# WP-22 Debug Fields
# WP-22 Debug Fields für Messbarkeit
status_multiplier: float = 1.0
graph_boost_factor: float = 1.0
@ -121,7 +123,7 @@ class Explanation(BaseModel):
class QueryHit(BaseModel):
"""Einzelnes Trefferobjekt für /query."""
node_id: str
note_id: Optional[str]
note_id: str
semantic_score: float
edge_bonus: float
centrality_bonus: float
@ -152,9 +154,9 @@ class ChatResponse(BaseModel):
"""
WP-05/06: Antwortstruktur für /chat.
"""
query_id: str = Field(..., description="Traceability ID (dieselbe wie für Search)")
query_id: str = Field(..., description="Traceability ID")
answer: str = Field(..., description="Generierte Antwort vom LLM")
sources: List[QueryHit] = Field(..., description="Die für die Antwort genutzten Quellen")
sources: List[QueryHit] = Field(..., description="Die genutzten Quellen")
latency_ms: int
intent: Optional[str] = Field("FACT", description="WP-06: Erkannter Intent (FACT/DECISION)")
intent_source: Optional[str] = Field("Unknown", description="WP-06: Quelle der Intent-Erkennung (Keyword vs. LLM)")
intent: Optional[str] = Field("FACT", description="WP-06: Erkannter Intent")
intent_source: Optional[str] = Field("Unknown", description="Quelle der Intent-Erkennung")