mindnet/app/core/graph/graph_subgraph.py
2025-12-27 14:44:44 +01:00

106 lines
3.6 KiB
Python

"""
FILE: app/core/graph/graph_subgraph.py
DESCRIPTION: In-Memory Repräsentation eines Graphen für Scoring und Analyse.
"""
import math
from collections import defaultdict
from typing import Dict, List, Optional, DefaultDict, Any, Set
from qdrant_client import QdrantClient
from .graph_weights import EDGE_BASE_WEIGHTS, calculate_edge_weight
from .graph_db_adapter import fetch_edges_from_qdrant
class Subgraph:
"""Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen."""
def __init__(self) -> None:
self.adj: DefaultDict[str, List[Dict]] = defaultdict(list)
self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list)
self.in_degree: DefaultDict[str, int] = defaultdict(int)
self.out_degree: DefaultDict[str, int] = defaultdict(int)
def add_edge(self, e: Dict) -> None:
"""Fügt eine Kante hinzu und aktualisiert Indizes."""
src = e.get("source")
tgt = e.get("target")
kind = e.get("kind")
weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0))
owner = e.get("note_id")
if not src or not tgt:
return
# 1. Forward
self.adj[src].append({"target": tgt, "kind": kind, "weight": weight})
self.out_degree[src] += 1
self.in_degree[tgt] += 1
# 2. Reverse (WP-04b Explanation)
self.reverse_adj[tgt].append({"source": src, "kind": kind, "weight": weight})
# 3. Kontext-Note Handling
if owner and owner != src:
self.adj[owner].append({"target": tgt, "kind": kind, "weight": weight})
self.out_degree[owner] += 1
if owner != tgt:
self.reverse_adj[tgt].append({"source": owner, "kind": kind, "weight": weight, "via_context": True})
self.in_degree[owner] += 1
def aggregate_edge_bonus(self, node_id: str) -> float:
"""Summe der ausgehenden Kantengewichte (Hub-Score)."""
return sum(edge["weight"] for edge in self.adj.get(node_id, []))
def edge_bonus(self, node_id: str) -> float:
"""API für Retriever (WP-04a Kompatibilität)."""
return self.aggregate_edge_bonus(node_id)
def centrality_bonus(self, node_id: str) -> float:
"""Log-gedämpfte Zentralität (In-Degree)."""
indeg = self.in_degree.get(node_id, 0)
if indeg <= 0:
return 0.0
return min(math.log1p(indeg) / 10.0, 0.15)
def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]:
return self.adj.get(node_id, [])
def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]:
return self.reverse_adj.get(node_id, [])
def expand(
client: QdrantClient,
prefix: str,
seeds: List[str],
depth: int = 1,
edge_types: Optional[List[str]] = None,
) -> Subgraph:
"""Expandiert ab Seeds entlang von Edges bis zu einer bestimmten Tiefe."""
sg = Subgraph()
frontier = set(seeds)
visited = set()
for _ in range(max(depth, 0)):
if not frontier:
break
payloads = fetch_edges_from_qdrant(client, prefix, list(frontier), edge_types)
next_frontier: Set[str] = set()
for pl in payloads:
src, tgt = pl.get("source_id"), pl.get("target_id")
if not src or not tgt: continue
sg.add_edge({
"source": src, "target": tgt,
"kind": pl.get("kind", "edge"),
"weight": calculate_edge_weight(pl),
"note_id": pl.get("note_id"),
})
if tgt not in visited:
next_frontier.add(str(tgt))
visited |= frontier
frontier = next_frontier - visited
return sg