""" FILE: app/core/graph/graph_subgraph.py DESCRIPTION: In-Memory Repräsentation eines Graphen für Scoring und Analyse. """ import math from collections import defaultdict from typing import Dict, List, Optional, DefaultDict, Any, Set from qdrant_client import QdrantClient from .graph_weights import EDGE_BASE_WEIGHTS, calculate_edge_weight from .graph_db_adapter import fetch_edges_from_qdrant class Subgraph: """Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen.""" def __init__(self) -> None: self.adj: DefaultDict[str, List[Dict]] = defaultdict(list) self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list) self.in_degree: DefaultDict[str, int] = defaultdict(int) self.out_degree: DefaultDict[str, int] = defaultdict(int) def add_edge(self, e: Dict) -> None: """Fügt eine Kante hinzu und aktualisiert Indizes.""" src = e.get("source") tgt = e.get("target") kind = e.get("kind") weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)) owner = e.get("note_id") if not src or not tgt: return # 1. Forward self.adj[src].append({"target": tgt, "kind": kind, "weight": weight}) self.out_degree[src] += 1 self.in_degree[tgt] += 1 # 2. Reverse (WP-04b Explanation) self.reverse_adj[tgt].append({"source": src, "kind": kind, "weight": weight}) # 3. Kontext-Note Handling if owner and owner != src: self.adj[owner].append({"target": tgt, "kind": kind, "weight": weight}) self.out_degree[owner] += 1 if owner != tgt: self.reverse_adj[tgt].append({"source": owner, "kind": kind, "weight": weight, "via_context": True}) self.in_degree[owner] += 1 def aggregate_edge_bonus(self, node_id: str) -> float: """Summe der ausgehenden Kantengewichte (Hub-Score).""" return sum(edge["weight"] for edge in self.adj.get(node_id, [])) def edge_bonus(self, node_id: str) -> float: """API für Retriever (WP-04a Kompatibilität).""" return self.aggregate_edge_bonus(node_id) def centrality_bonus(self, node_id: str) -> float: """Log-gedämpfte Zentralität (In-Degree).""" indeg = self.in_degree.get(node_id, 0) if indeg <= 0: return 0.0 return min(math.log1p(indeg) / 10.0, 0.15) def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]: return self.adj.get(node_id, []) def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]: return self.reverse_adj.get(node_id, []) def expand( client: QdrantClient, prefix: str, seeds: List[str], depth: int = 1, edge_types: Optional[List[str]] = None, ) -> Subgraph: """Expandiert ab Seeds entlang von Edges bis zu einer bestimmten Tiefe.""" sg = Subgraph() frontier = set(seeds) visited = set() for _ in range(max(depth, 0)): if not frontier: break payloads = fetch_edges_from_qdrant(client, prefix, list(frontier), edge_types) next_frontier: Set[str] = set() for pl in payloads: src, tgt = pl.get("source_id"), pl.get("target_id") if not src or not tgt: continue sg.add_edge({ "source": src, "target": tgt, "kind": pl.get("kind", "edge"), "weight": calculate_edge_weight(pl), "note_id": pl.get("note_id"), }) if tgt not in visited: next_frontier.add(str(tgt)) visited |= frontier frontier = next_frontier - visited return sg