559 lines
25 KiB
Python
559 lines
25 KiB
Python
"""
|
|
FILE: app/core/retrieval/retriever.py
|
|
DESCRIPTION: Haupt-Schnittstelle für die Suche. Orchestriert Vektorsuche und Graph-Expansion.
|
|
WP-15c Update: Note-Level Diversity Pooling & Super-Edge Aggregation.
|
|
WP-24c v4.1.0: Gold-Standard - Scope-Awareness, Section-Filtering, Authority-Priorisierung.
|
|
VERSION: 0.8.0 (WP-24c: Gold-Standard v4.1.0)
|
|
STATUS: Active
|
|
DEPENDENCIES: app.config, app.models.dto, app.core.database*, app.core.graph_adapter
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
from typing import Any, Dict, List, Tuple, Iterable, Optional
|
|
from collections import defaultdict
|
|
|
|
from app.config import get_settings
|
|
from app.models.dto import (
|
|
QueryRequest, QueryResponse, QueryHit,
|
|
Explanation, ScoreBreakdown, Reason, EdgeDTO
|
|
)
|
|
|
|
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
|
|
import app.core.database.qdrant as qdr
|
|
import app.core.database.qdrant_points as qp
|
|
|
|
import app.services.embeddings_client as ec
|
|
import app.core.graph.graph_subgraph as ga
|
|
import app.core.graph.graph_db_adapter as gdb
|
|
from app.core.graph.graph_utils import PROVENANCE_PRIORITY
|
|
from qdrant_client.http import models as rest
|
|
|
|
# Mathematische Engine importieren
|
|
from app.core.retrieval.retriever_scoring import get_weights, compute_wp22_score
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ==============================================================================
|
|
# 1. CORE HELPERS & CONFIG LOADERS
|
|
# ==============================================================================
|
|
|
|
def _get_client_and_prefix() -> Tuple[Any, str]:
|
|
"""Initialisiert Qdrant Client und lädt Collection-Prefix via database-Paket."""
|
|
cfg = qdr.QdrantConfig.from_env()
|
|
return qdr.get_client(cfg), cfg.prefix
|
|
|
|
|
|
def _get_query_vector(req: QueryRequest) -> List[float]:
|
|
"""
|
|
Vektorisiert die Anfrage.
|
|
FIX: Enthält try-except Block für unterschiedliche Signaturen von ec.embed_text.
|
|
"""
|
|
if req.query_vector:
|
|
return list(req.query_vector)
|
|
if not req.query:
|
|
raise ValueError("Kein Text oder Vektor für die Suche angegeben.")
|
|
|
|
settings = get_settings()
|
|
|
|
try:
|
|
# Versuch mit modernem Interface (WP-03 kompatibel)
|
|
return ec.embed_text(req.query, model_name=settings.MODEL_NAME)
|
|
except TypeError:
|
|
# Fallback für Signaturen, die 'model_name' nicht als Keyword akzeptieren
|
|
logger.debug("ec.embed_text does not accept 'model_name' keyword. Falling back.")
|
|
return ec.embed_text(req.query)
|
|
|
|
|
|
def _get_chunk_ids_for_notes(
|
|
client: Any,
|
|
prefix: str,
|
|
note_ids: List[str]
|
|
) -> List[str]:
|
|
"""
|
|
WP-24c v4.1.0: Lädt alle Chunk-IDs für gegebene Note-IDs.
|
|
Wird für Scope-Aware Edge Retrieval benötigt.
|
|
"""
|
|
if not note_ids:
|
|
return []
|
|
|
|
_, chunks_col, _ = qp._names(prefix)
|
|
chunk_ids = []
|
|
|
|
try:
|
|
# Filter: note_id IN note_ids
|
|
note_filter = rest.Filter(should=[
|
|
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=str(nid)))
|
|
for nid in note_ids
|
|
])
|
|
|
|
pts, _ = client.scroll(
|
|
collection_name=chunks_col,
|
|
scroll_filter=note_filter,
|
|
limit=2048,
|
|
with_payload=True,
|
|
with_vectors=False
|
|
)
|
|
|
|
for pt in pts:
|
|
pl = pt.payload or {}
|
|
cid = pl.get("chunk_id")
|
|
if cid:
|
|
chunk_ids.append(str(cid))
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load chunk IDs for notes: {e}")
|
|
|
|
return chunk_ids
|
|
|
|
def _semantic_hits(
|
|
client: Any,
|
|
prefix: str,
|
|
vector: List[float],
|
|
top_k: int,
|
|
filters: Optional[Dict] = None,
|
|
target_section: Optional[str] = None
|
|
) -> List[Tuple[str, float, Dict[str, Any]]]:
|
|
"""
|
|
Führt die Vektorsuche via database-Points-Modul durch.
|
|
WP-24c v4.1.0: Unterstützt optionales Section-Filtering.
|
|
"""
|
|
# WP-24c v4.1.0: Section-Filtering für präzise Section-Links
|
|
if target_section and filters:
|
|
filters = {**filters, "section": target_section}
|
|
elif target_section:
|
|
filters = {"section": target_section}
|
|
|
|
raw_hits = qp.search_chunks_by_vector(client, prefix, vector, top=top_k, filters=filters)
|
|
|
|
# WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Protokollierung der rohen Qdrant-Antwort
|
|
logger.debug(f"📊 [RAW-HITS] Qdrant lieferte {len(raw_hits)} Roh-Treffer (Top-K: {top_k})")
|
|
if filters:
|
|
logger.debug(f" ⚙️ [FILTER] Angewandte Filter: {filters}")
|
|
|
|
# Logge die Top 3 Roh-Scores für Diagnose
|
|
for i, hit in enumerate(raw_hits[:3]):
|
|
hit_id = str(hit[0]) if hit else "N/A"
|
|
hit_score = float(hit[1]) if hit and len(hit) > 1 else 0.0
|
|
hit_payload = dict(hit[2] or {}) if hit and len(hit) > 2 else {}
|
|
hit_path = hit_payload.get('path', 'N/A')
|
|
logger.debug(f" [{i+1}] ID: {hit_id} | Raw-Score: {hit_score:.4f} | Path: {hit_path}")
|
|
|
|
# Strikte Typkonvertierung für Stabilität
|
|
return [(str(hit[0]), float(hit[1]), dict(hit[2] or {})) for hit in raw_hits]
|
|
|
|
# ==============================================================================
|
|
# 2. EXPLANATION LAYER (DEBUG & VERIFIABILITY)
|
|
# ==============================================================================
|
|
|
|
def _build_explanation(
|
|
semantic_score: float,
|
|
payload: Dict[str, Any],
|
|
scoring_debug: Dict[str, Any],
|
|
subgraph: Optional[ga.Subgraph],
|
|
target_note_id: Optional[str],
|
|
applied_boosts: Optional[Dict[str, float]] = None
|
|
) -> Explanation:
|
|
"""
|
|
Transformiert mathematische Scores und Graph-Signale in eine menschenlesbare Erklärung.
|
|
"""
|
|
_, edge_w_cfg, _ = get_weights()
|
|
base_val = scoring_debug["base_val"]
|
|
|
|
# 1. Detaillierter mathematischer Breakdown
|
|
breakdown = ScoreBreakdown(
|
|
semantic_contribution=base_val,
|
|
edge_contribution=base_val * scoring_debug["edge_impact_final"],
|
|
centrality_contribution=base_val * scoring_debug["cent_impact_final"],
|
|
raw_semantic=semantic_score,
|
|
raw_edge_bonus=scoring_debug["edge_bonus"],
|
|
raw_centrality=scoring_debug["cent_bonus"],
|
|
node_weight=float(payload.get("retriever_weight", 1.0)),
|
|
status_multiplier=scoring_debug["status_multiplier"],
|
|
graph_boost_factor=scoring_debug["graph_boost_factor"]
|
|
)
|
|
|
|
reasons: List[Reason] = []
|
|
edges_dto: List[EdgeDTO] = []
|
|
|
|
# 2. Gründe für Semantik hinzufügen
|
|
if semantic_score > 0.85:
|
|
reasons.append(Reason(kind="semantic", message="Sehr hohe textuelle Übereinstimmung.", score_impact=base_val))
|
|
elif semantic_score > 0.70:
|
|
reasons.append(Reason(kind="semantic", message="Inhaltliche Übereinstimmung.", score_impact=base_val))
|
|
|
|
# 3. Gründe für Typ und Lifecycle (WP-25 Vorbereitung)
|
|
type_weight = float(payload.get("retriever_weight", 1.0))
|
|
if type_weight != 1.0:
|
|
msg = "Bevorzugt" if type_weight > 1.0 else "De-priorisiert"
|
|
reasons.append(Reason(kind="type", message=f"{msg} durch Typ-Profil.", score_impact=base_val * (type_weight - 1.0)))
|
|
|
|
# NEU: Explizite Ausweisung des Lifecycle-Status (WP-22)
|
|
status_mult = scoring_debug.get("status_multiplier", 1.0)
|
|
if status_mult != 1.0:
|
|
status_msg = "Belohnt (Stable)" if status_mult > 1.0 else "De-priorisiert (Draft)"
|
|
reasons.append(Reason(
|
|
kind="status",
|
|
message=f"{status_msg} durch Content-Lifecycle.",
|
|
score_impact=semantic_score * (status_mult - 1.0)
|
|
))
|
|
|
|
# 4. Kanten-Verarbeitung (Graph-Intelligence)
|
|
if subgraph and target_note_id and scoring_debug["edge_bonus"] > 0:
|
|
raw_edges = []
|
|
if hasattr(subgraph, "get_incoming_edges"):
|
|
raw_edges.extend(subgraph.get_incoming_edges(target_note_id) or [])
|
|
if hasattr(subgraph, "get_outgoing_edges"):
|
|
raw_edges.extend(subgraph.get_outgoing_edges(target_note_id) or [])
|
|
|
|
for edge in raw_edges:
|
|
src = str(edge.get("source") or "note_root")
|
|
tgt = str(edge.get("target") or target_note_id or "unknown_target")
|
|
kind = str(edge.get("kind", "related_to"))
|
|
prov = str(edge.get("provenance", "rule"))
|
|
conf = float(edge.get("confidence", 1.0))
|
|
|
|
direction = "in" if tgt == target_note_id else "out"
|
|
|
|
edge_obj = EdgeDTO(
|
|
id=f"{src}->{tgt}:{kind}",
|
|
kind=kind,
|
|
source=src,
|
|
target=tgt,
|
|
weight=conf,
|
|
direction=direction,
|
|
provenance=prov,
|
|
confidence=conf
|
|
)
|
|
edges_dto.append(edge_obj)
|
|
|
|
# Die 3 wichtigsten Kanten als Begründung formulieren
|
|
top_edges = sorted(edges_dto, key=lambda e: e.confidence, reverse=True)
|
|
for e in top_edges[:3]:
|
|
peer = e.source if e.direction == "in" else e.target
|
|
prov_txt = "Bestätigte" if e.provenance == "explicit" else "KI-basierte"
|
|
boost_txt = f" [Boost x{applied_boosts.get(e.kind)}]" if applied_boosts and e.kind in applied_boosts else ""
|
|
|
|
reasons.append(Reason(
|
|
kind="edge",
|
|
message=f"{prov_txt} Kante '{e.kind}'{boost_txt} von/zu '{peer}'.",
|
|
score_impact=edge_w_cfg * e.confidence
|
|
))
|
|
|
|
if scoring_debug["cent_bonus"] > 0.01:
|
|
reasons.append(Reason(kind="centrality", message="Die Notiz ist ein zentraler Informations-Hub.", score_impact=breakdown.centrality_contribution))
|
|
|
|
return Explanation(
|
|
breakdown=breakdown,
|
|
reasons=reasons,
|
|
related_edges=edges_dto if edges_dto else None,
|
|
applied_boosts=applied_boosts
|
|
)
|
|
|
|
# ==============================================================================
|
|
# 3. CORE RETRIEVAL PIPELINE
|
|
# ==============================================================================
|
|
|
|
def _build_hits_from_semantic(
|
|
hits: Iterable[Tuple[str, float, Dict[str, Any]]],
|
|
top_k: int,
|
|
used_mode: str,
|
|
subgraph: ga.Subgraph | None = None,
|
|
explain: bool = False,
|
|
dynamic_edge_boosts: Dict[str, float] = None
|
|
) -> QueryResponse:
|
|
"""
|
|
Wandelt semantische Roh-Treffer in bewertete QueryHits um.
|
|
WP-15c: Implementiert Note-Level Diversity Pooling.
|
|
"""
|
|
t0 = time.time()
|
|
enriched = []
|
|
|
|
# Erstes Scoring für alle Kandidaten
|
|
for pid, semantic_score, payload in hits:
|
|
edge_bonus, cent_bonus = 0.0, 0.0
|
|
target_id = payload.get("note_id")
|
|
|
|
if subgraph and target_id:
|
|
try:
|
|
edge_bonus = float(subgraph.edge_bonus(target_id))
|
|
cent_bonus = float(subgraph.centrality_bonus(target_id))
|
|
except Exception:
|
|
pass
|
|
|
|
debug_data = compute_wp22_score(
|
|
semantic_score, payload, edge_bonus, cent_bonus, dynamic_edge_boosts
|
|
)
|
|
enriched.append((pid, semantic_score, payload, debug_data))
|
|
|
|
# 1. Sortierung nach finalem mathematischen Score
|
|
enriched_sorted = sorted(enriched, key=lambda h: h[3]["total"], reverse=True)
|
|
|
|
# 2. WP-15c: Note-Level Diversity Pooling
|
|
# Wir behalten pro note_id nur den Hit mit dem höchsten total_score.
|
|
# Dies verhindert, dass 10 Chunks derselben Note andere KeyNotes verdrängen.
|
|
unique_note_hits = []
|
|
seen_notes = set()
|
|
|
|
for item in enriched_sorted:
|
|
_, _, payload, _ = item
|
|
note_id = str(payload.get("note_id", "unknown"))
|
|
|
|
if note_id not in seen_notes:
|
|
unique_note_hits.append(item)
|
|
seen_notes.add(note_id)
|
|
|
|
# 3. Begrenzung auf top_k nach dem Diversity-Pooling
|
|
limited_hits = unique_note_hits[: max(1, top_k)]
|
|
|
|
results: List[QueryHit] = []
|
|
for pid, s_score, pl, dbg in limited_hits:
|
|
explanation_obj = None
|
|
if explain:
|
|
explanation_obj = _build_explanation(
|
|
semantic_score=float(s_score),
|
|
payload=pl,
|
|
scoring_debug=dbg,
|
|
subgraph=subgraph,
|
|
target_note_id=pl.get("note_id"),
|
|
applied_boosts=dynamic_edge_boosts
|
|
)
|
|
|
|
text_content = pl.get("page_content") or pl.get("text") or pl.get("content", "[Kein Text]")
|
|
|
|
# WP-24c v4.1.0: RAG-Kontext - source_chunk_id aus Edge-Payload extrahieren
|
|
source_chunk_id = None
|
|
if explanation_obj and explanation_obj.related_edges:
|
|
# Finde die erste Edge mit chunk_id als source
|
|
for edge in explanation_obj.related_edges:
|
|
# Prüfe, ob source eine Chunk-ID ist (enthält # oder ist chunk_id)
|
|
if edge.source and ("#" in edge.source or edge.source.startswith("chunk:")):
|
|
source_chunk_id = edge.source
|
|
break
|
|
|
|
results.append(QueryHit(
|
|
node_id=str(pid),
|
|
note_id=str(pl.get("note_id", "unknown")),
|
|
semantic_score=float(s_score),
|
|
edge_bonus=dbg["edge_bonus"],
|
|
centrality_bonus=dbg["cent_bonus"],
|
|
total_score=dbg["total"],
|
|
source={
|
|
"path": pl.get("path"),
|
|
"section": pl.get("section") or pl.get("section_title"),
|
|
"text": text_content
|
|
},
|
|
payload=pl,
|
|
explanation=explanation_obj,
|
|
source_chunk_id=source_chunk_id # WP-24c v4.1.0: RAG-Kontext
|
|
))
|
|
|
|
# WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Finale Ergebnisse
|
|
latency_ms = int((time.time() - t0) * 1000)
|
|
if not results:
|
|
logger.warning(f"⚠️ [EMPTY] Hybride Suche lieferte 0 Ergebnisse (Latency: {latency_ms}ms)")
|
|
else:
|
|
logger.info(f"✨ [SUCCESS] Hybride Suche lieferte {len(results)} Treffer (Latency: {latency_ms}ms)")
|
|
# Top 3 finale Scores loggen
|
|
for i, hit in enumerate(results[:3]):
|
|
logger.debug(f" [{i+1}] Final: Chunk={hit.chunk_id} | Total-Score={hit.total_score:.4f} | Semantic={hit.semantic_score:.4f} | Edge={hit.edge_bonus:.4f}")
|
|
|
|
return QueryResponse(results=results, used_mode=used_mode, latency_ms=latency_ms)
|
|
|
|
|
|
def hybrid_retrieve(req: QueryRequest) -> QueryResponse:
|
|
"""
|
|
Die Haupt-Einstiegsfunktion für die hybride Suche.
|
|
WP-15c: Implementiert Edge-Aggregation (Super-Kanten).
|
|
WP-24c v4.5.0-DEBUG: Retrieval-Tracer für Diagnose.
|
|
"""
|
|
# WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Start der hybriden Suche
|
|
logger.info(f"🔍 [RETRIEVAL] Starte hybride Suche")
|
|
logger.info(f" -> Query: '{req.query[:100]}...' (Länge: {len(req.query)})")
|
|
logger.debug(f" ⚙️ [FILTER] Request-Filter: {req.filters}")
|
|
logger.debug(f" ⚙️ [FILTER] Top-K: {req.top_k}, Expand: {req.expand}, Target-Section: {req.target_section}")
|
|
client, prefix = _get_client_and_prefix()
|
|
vector = list(req.query_vector) if req.query_vector else _get_query_vector(req)
|
|
top_k = req.top_k or 10
|
|
|
|
# 1. Semantische Seed-Suche (Wir laden etwas mehr für das Pooling)
|
|
# WP-24c v4.1.0: Section-Filtering unterstützen
|
|
target_section = getattr(req, "target_section", None)
|
|
|
|
# WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Vor semantischer Suche
|
|
logger.debug(f"🔍 [RETRIEVAL] Starte semantische Seed-Suche (Top-K: {top_k * 3}, Target-Section: {target_section})")
|
|
|
|
hits = _semantic_hits(client, prefix, vector, top_k=top_k * 3, filters=req.filters, target_section=target_section)
|
|
|
|
# WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Nach semantischer Suche
|
|
logger.debug(f"📊 [SEED-HITS] Semantische Suche lieferte {len(hits)} Seed-Treffer")
|
|
|
|
# 2. Graph Expansion Konfiguration
|
|
expand_cfg = req.expand if isinstance(req.expand, dict) else {}
|
|
depth = int(expand_cfg.get("depth", 1))
|
|
boost_edges = getattr(req, "boost_edges", {}) or {}
|
|
|
|
subgraph: ga.Subgraph | None = None
|
|
if depth > 0 and hits:
|
|
# WP-24c v4.5.2: Chunk-Aware Graph Traversal
|
|
# Extrahiere sowohl note_id als auch chunk_id (pid) direkt aus den Hits
|
|
# Dies stellt sicher, dass Chunk-Scope Edges gefunden werden
|
|
seed_note_ids = list({h[2].get("note_id") for h in hits if h[2].get("note_id")})
|
|
seed_chunk_ids = list({h[0] for h in hits if h[0]}) # pid ist die Chunk-ID
|
|
|
|
# Kombiniere beide Sets für vollständige Seed-Abdeckung
|
|
# Chunk-IDs können auch als Note-IDs fungieren (für Note-Scope Edges)
|
|
all_seed_ids = list(set(seed_note_ids + seed_chunk_ids))
|
|
|
|
if all_seed_ids:
|
|
try:
|
|
# WP-24c v4.5.2: Chunk-IDs sind bereits aus Hits extrahiert
|
|
# Zusätzlich können wir noch weitere Chunk-IDs für die Note-IDs laden
|
|
# (für den Fall, dass nicht alle Chunks in den Top-K Hits sind)
|
|
additional_chunk_ids = _get_chunk_ids_for_notes(client, prefix, seed_note_ids)
|
|
# Kombiniere direkte Chunk-IDs aus Hits mit zusätzlich geladenen
|
|
all_chunk_ids = list(set(seed_chunk_ids + additional_chunk_ids))
|
|
|
|
# WP-24c v4.5.2: Erweiterte Edge-Retrieval mit Chunk-Scope und Section-Filtering
|
|
# Verwende all_seed_ids (enthält sowohl note_id als auch chunk_id)
|
|
# und all_chunk_ids für explizite Chunk-Scope Edge-Suche
|
|
subgraph = ga.expand(
|
|
client, prefix, all_seed_ids,
|
|
depth=depth,
|
|
edge_types=expand_cfg.get("edge_types"),
|
|
chunk_ids=all_chunk_ids,
|
|
target_section=target_section
|
|
)
|
|
|
|
# WP-24c v4.5.2: Debug-Logging für Chunk-Awareness
|
|
logger.debug(f"🔍 [SEEDS] Note-IDs: {len(seed_note_ids)}, Chunk-IDs: {len(seed_chunk_ids)}, Total Seeds: {len(all_seed_ids)}")
|
|
logger.debug(f" -> Zusätzliche Chunk-IDs geladen: {len(additional_chunk_ids)}, Total Chunk-IDs: {len(all_chunk_ids)}")
|
|
|
|
# --- WP-24c v4.1.0: Chunk-Level Edge-Aggregation & Deduplizierung ---
|
|
# Verhindert Score-Explosion durch multiple Links auf versch. Abschnitte.
|
|
# Logik: 1. Kante zählt voll, weitere dämpfen auf Faktor 0.1.
|
|
# Erweitert um Chunk-Level Tracking für präzise In-Degree-Berechnung.
|
|
if subgraph and hasattr(subgraph, "adj"):
|
|
# WP-24c v4.1.0: Chunk-Level In-Degree Tracking
|
|
chunk_level_in_degree = defaultdict(int) # target -> count of chunk sources
|
|
|
|
for src, edge_list in subgraph.adj.items():
|
|
# Gruppiere Kanten nach Ziel-Note (Deduplizierung ID_A -> ID_B)
|
|
by_target = defaultdict(list)
|
|
for e in edge_list:
|
|
by_target[e["target"]].append(e)
|
|
|
|
# WP-24c v4.1.0: Chunk-Level In-Degree Tracking
|
|
# Wenn source eine Chunk-ID ist, zähle für Chunk-Level In-Degree
|
|
if e.get("chunk_id") or (src and ("#" in src or src.startswith("chunk:"))):
|
|
chunk_level_in_degree[e["target"]] += 1
|
|
|
|
aggregated_list = []
|
|
for tgt, edges in by_target.items():
|
|
if len(edges) > 1:
|
|
# Sortiere: Stärkste Kante zuerst (Authority-Priorisierung)
|
|
sorted_edges = sorted(
|
|
edges,
|
|
key=lambda x: (
|
|
x.get("weight", 0.0) *
|
|
(1.0 if not x.get("virtual", False) else 0.5) * # Virtual-Penalty
|
|
float(x.get("confidence", 1.0)) # Confidence-Boost
|
|
),
|
|
reverse=True
|
|
)
|
|
primary = sorted_edges[0]
|
|
|
|
# Aggregiertes Gewicht berechnen (Sättigungs-Logik)
|
|
total_w = primary.get("weight", 0.0)
|
|
chunk_count = 0
|
|
for secondary in sorted_edges[1:]:
|
|
total_w += secondary.get("weight", 0.0) * 0.1
|
|
if secondary.get("chunk_id") or (secondary.get("source") and ("#" in secondary.get("source", "") or secondary.get("source", "").startswith("chunk:"))):
|
|
chunk_count += 1
|
|
|
|
primary["weight"] = total_w
|
|
primary["is_super_edge"] = True # Flag für Explanation Layer
|
|
primary["edge_count"] = len(edges)
|
|
primary["chunk_source_count"] = chunk_count + (1 if (primary.get("chunk_id") or (primary.get("source") and ("#" in primary.get("source", "") or primary.get("source", "").startswith("chunk:")))) else 0)
|
|
aggregated_list.append(primary)
|
|
else:
|
|
edge = edges[0]
|
|
# WP-24c v4.1.0: Chunk-Count auch für einzelne Edges
|
|
if edge.get("chunk_id") or (edge.get("source") and ("#" in edge.get("source", "") or edge.get("source", "").startswith("chunk:"))):
|
|
edge["chunk_source_count"] = 1
|
|
aggregated_list.append(edge)
|
|
|
|
# In-Place Update der Adjazenzliste des Graphen
|
|
subgraph.adj[src] = aggregated_list
|
|
|
|
# Re-Sync der In-Degrees für Centrality-Bonus (Aggregation konsistent halten)
|
|
subgraph.in_degree = defaultdict(int)
|
|
for src, edges in subgraph.adj.items():
|
|
for e in edges:
|
|
subgraph.in_degree[e["target"]] += 1
|
|
|
|
# WP-24c v4.1.0: Chunk-Level In-Degree als Attribut speichern
|
|
subgraph.chunk_level_in_degree = chunk_level_in_degree
|
|
|
|
# --- WP-24c v4.1.0: Authority-Priorisierung (Provenance & Confidence) ---
|
|
if subgraph and hasattr(subgraph, "adj"):
|
|
for src, edges in subgraph.adj.items():
|
|
for e in edges:
|
|
# A. Provenance Weighting (nutzt PROVENANCE_PRIORITY aus graph_utils)
|
|
prov = e.get("provenance", "rule")
|
|
prov_key = f"{prov}:{e.get('kind', 'related_to')}" if ":" not in prov else prov
|
|
prov_w = PROVENANCE_PRIORITY.get(prov_key, PROVENANCE_PRIORITY.get(prov, 0.7))
|
|
|
|
# B. Confidence-Weighting (aus Edge-Payload)
|
|
confidence = float(e.get("confidence", 1.0))
|
|
|
|
# C. Virtual-Flag De-Priorisierung
|
|
is_virtual = e.get("virtual", False)
|
|
virtual_penalty = 0.5 if is_virtual else 1.0
|
|
|
|
# D. Intent Boost Multiplikator
|
|
kind = e.get("kind")
|
|
intent_multiplier = boost_edges.get(kind, 1.0)
|
|
|
|
# Gewichtung anpassen (Authority-Priorisierung)
|
|
e["weight"] = e.get("weight", 1.0) * prov_w * confidence * virtual_penalty * intent_multiplier
|
|
|
|
except Exception as e:
|
|
logger.error(f"Graph Expansion failed: {e}")
|
|
subgraph = None
|
|
|
|
# 3. Scoring & Explanation Generierung
|
|
# top_k wird erst hier final angewandt
|
|
# WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Vor finaler Hit-Erstellung
|
|
if subgraph:
|
|
# WP-24c v4.5.1: Subgraph hat kein .edges Attribut, sondern .adj (Adjazenzliste)
|
|
# Zähle alle Kanten aus der Adjazenzliste
|
|
edge_count = sum(len(edges) for edges in subgraph.adj.values()) if hasattr(subgraph, 'adj') else 0
|
|
logger.debug(f"📊 [GRAPH] Subgraph enthält {edge_count} Kanten")
|
|
else:
|
|
logger.debug(f"📊 [GRAPH] Kein Subgraph (depth=0 oder keine Seed-IDs)")
|
|
|
|
result = _build_hits_from_semantic(hits, top_k, "hybrid", subgraph, req.explain, boost_edges)
|
|
|
|
# WP-24c v4.5.0-DEBUG: Retrieval-Tracer - Nach finaler Hit-Erstellung
|
|
if not result.results:
|
|
logger.warning(f"⚠️ [EMPTY] Hybride Suche lieferte nach Scoring 0 finale Ergebnisse")
|
|
else:
|
|
logger.info(f"✨ [SUCCESS] Hybride Suche lieferte {len(result.results)} finale Treffer (Mode: {result.used_mode})")
|
|
|
|
return result
|
|
|
|
|
|
def semantic_retrieve(req: QueryRequest) -> QueryResponse:
|
|
"""Standard Vektorsuche ohne Graph-Einfluss."""
|
|
client, prefix = _get_client_and_prefix()
|
|
vector = _get_query_vector(req)
|
|
hits = _semantic_hits(client, prefix, vector, req.top_k or 10, req.filters)
|
|
return _build_hits_from_semantic(hits, req.top_k or 10, "semantic", explain=req.explain)
|
|
|
|
|
|
class Retriever:
|
|
"""Schnittstelle für die asynchrone Suche."""
|
|
async def search(self, request: QueryRequest) -> QueryResponse:
|
|
return hybrid_retrieve(request) |