""" FILE: app/core/graph/graph_subgraph.py DESCRIPTION: In-Memory Repräsentation eines Graphen für Scoring und Analyse. Zentrale Komponente für die Graph-Expansion (BFS) und Bonus-Berechnung. WP-15c Update: Erhalt von Metadaten (target_section, provenance) für präzises Retrieval-Reasoning. VERSION: 1.2.0 STATUS: Active """ import math from collections import defaultdict from typing import Dict, List, Optional, DefaultDict, Any, Set from qdrant_client import QdrantClient # Lokale Paket-Imports from .graph_weights import EDGE_BASE_WEIGHTS, calculate_edge_weight from .graph_db_adapter import fetch_edges_from_qdrant class Subgraph: """ Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen. Wird für die Berechnung von Graph-Boni im Retriever genutzt. """ def __init__(self) -> None: # adj speichert nun vollständige Payloads statt nur Tripel self.adj: DefaultDict[str, List[Dict]] = defaultdict(list) self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list) self.in_degree: DefaultDict[str, int] = defaultdict(int) self.out_degree: DefaultDict[str, int] = defaultdict(int) def add_edge(self, e: Dict) -> None: """ Fügt eine Kante hinzu und aktualisiert Indizes. WP-15c: Speichert das vollständige Payload für den Explanation Layer. """ src = e.get("source") tgt = e.get("target") kind = e.get("kind") # Das gesamte Payload wird als Kanten-Objekt behalten # Wir stellen sicher, dass alle relevanten Metadaten vorhanden sind edge_data = { "source": src, "target": tgt, "kind": kind, "weight": e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)), "provenance": e.get("provenance", "rule"), "confidence": e.get("confidence", 1.0), "target_section": e.get("target_section"), # Essentiell für Präzision "is_super_edge": e.get("is_super_edge", False) } owner = e.get("note_id") if not src or not tgt: return # 1. Forward-Kante self.adj[src].append(edge_data) self.out_degree[src] += 1 self.in_degree[tgt] += 1 # 2. Reverse-Kante (für Explanation Layer & Backlinks) self.reverse_adj[tgt].append(edge_data) # 3. Kontext-Note Handling (erhöht die Zentralität der Parent-Note) if owner and owner != src: # Wir erstellen eine virtuelle Kontext-Kante ctx_edge = edge_data.copy() ctx_edge["source"] = owner ctx_edge["via_context"] = True self.adj[owner].append(ctx_edge) self.out_degree[owner] += 1 if owner != tgt: self.reverse_adj[tgt].append(ctx_edge) self.in_degree[owner] += 1 def aggregate_edge_bonus(self, node_id: str) -> float: """Summe der ausgehenden Kantengewichte (Hub-Score).""" return sum(edge["weight"] for edge in self.adj.get(node_id, [])) def edge_bonus(self, node_id: str) -> float: """API für Retriever (WP-04a Kompatibilität).""" return self.aggregate_edge_bonus(node_id) def centrality_bonus(self, node_id: str) -> float: """ Log-gedämpfte Zentralität basierend auf dem In-Degree. Begrenzt auf einen maximalen Boost von 0.15. """ indeg = self.in_degree.get(node_id, 0) if indeg <= 0: return 0.0 # math.log1p(x) entspricht log(1+x) return min(math.log1p(indeg) / 10.0, 0.15) def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]: """Gibt alle ausgehenden Kanten einer Node inkl. Metadaten zurück.""" return self.adj.get(node_id, []) def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]: """Gibt alle eingehenden Kanten einer Node inkl. Metadaten zurück.""" return self.reverse_adj.get(node_id, []) def expand( client: QdrantClient, prefix: str, seeds: List[str], depth: int = 1, edge_types: Optional[List[str]] = None, ) -> Subgraph: """ Expandiert ab Seeds entlang von Edges bis zu einer bestimmten Tiefe. Nutzt fetch_edges_from_qdrant für den Datenbankzugriff. """ sg = Subgraph() frontier = set(seeds) visited = set() for _ in range(max(depth, 0)): if not frontier: break # Batch-Abfrage der Kanten für die aktuelle Ebene payloads = fetch_edges_from_qdrant(client, prefix, list(frontier), edge_types) next_frontier: Set[str] = set() for pl in payloads: src, tgt = pl.get("source_id"), pl.get("target_id") if not src or not tgt: continue # WP-15c: Wir übergeben das vollständige Payload an add_edge edge_payload = { "source": src, "target": tgt, "kind": pl.get("kind", "edge"), "weight": calculate_edge_weight(pl), "note_id": pl.get("note_id"), "provenance": pl.get("provenance", "rule"), "confidence": pl.get("confidence", 1.0), "target_section": pl.get("target_section") } sg.add_edge(edge_payload) # BFS Logik: Neue Ziele in die nächste Frontier aufnehmen if tgt not in visited: next_frontier.add(str(tgt)) visited |= frontier frontier = next_frontier - visited return sg