mindnet/app/core/graph/graph_subgraph.py
2025-12-27 20:30:24 +01:00

129 lines
4.5 KiB
Python

"""
FILE: app/core/graph/graph_subgraph.py
DESCRIPTION: In-Memory Repräsentation eines Graphen für Scoring und Analyse.
Zentrale Komponente für die Graph-Expansion (BFS) und Bonus-Berechnung.
MODULARISIERUNG: Teil des graph-Pakets (WP-14).
VERSION: 1.1.0
STATUS: Active
"""
import math
from collections import defaultdict
from typing import Dict, List, Optional, DefaultDict, Any, Set
from qdrant_client import QdrantClient
# Lokale Paket-Imports
from .graph_weights import EDGE_BASE_WEIGHTS, calculate_edge_weight
from .graph_db_adapter import fetch_edges_from_qdrant
class Subgraph:
"""
Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen.
Wird für die Berechnung von Graph-Boni im Retriever genutzt.
"""
def __init__(self) -> None:
self.adj: DefaultDict[str, List[Dict]] = defaultdict(list)
self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list)
self.in_degree: DefaultDict[str, int] = defaultdict(int)
self.out_degree: DefaultDict[str, int] = defaultdict(int)
def add_edge(self, e: Dict) -> None:
"""
Fügt eine Kante hinzu und aktualisiert Indizes.
Unterstützt Kontext-Notes für verbesserte Graph-Konnektivität.
"""
src = e.get("source")
tgt = e.get("target")
kind = e.get("kind")
weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0))
owner = e.get("note_id")
if not src or not tgt:
return
# 1. Forward-Kante
self.adj[src].append({"target": tgt, "kind": kind, "weight": weight})
self.out_degree[src] += 1
self.in_degree[tgt] += 1
# 2. Reverse-Kante (für WP-04b Explanation Layer)
self.reverse_adj[tgt].append({"source": src, "kind": kind, "weight": weight})
# 3. Kontext-Note Handling (erhöht die Zentralität der Parent-Note)
if owner and owner != src:
self.adj[owner].append({"target": tgt, "kind": kind, "weight": weight})
self.out_degree[owner] += 1
if owner != tgt:
self.reverse_adj[tgt].append({"source": owner, "kind": kind, "weight": weight, "via_context": True})
self.in_degree[owner] += 1
def aggregate_edge_bonus(self, node_id: str) -> float:
"""Summe der ausgehenden Kantengewichte (Hub-Score)."""
return sum(edge["weight"] for edge in self.adj.get(node_id, []))
def edge_bonus(self, node_id: str) -> float:
"""API für Retriever (WP-04a Kompatibilität)."""
return self.aggregate_edge_bonus(node_id)
def centrality_bonus(self, node_id: str) -> float:
"""
Log-gedämpfte Zentralität basierend auf dem In-Degree.
Begrenzt auf einen maximalen Boost von 0.15.
"""
indeg = self.in_degree.get(node_id, 0)
if indeg <= 0:
return 0.0
return min(math.log1p(indeg) / 10.0, 0.15)
def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]:
"""Gibt alle ausgehenden Kanten einer Node zurück."""
return self.adj.get(node_id, [])
def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]:
"""Gibt alle eingehenden Kanten einer Node zurück."""
return self.reverse_adj.get(node_id, [])
def expand(
client: QdrantClient,
prefix: str,
seeds: List[str],
depth: int = 1,
edge_types: Optional[List[str]] = None,
) -> Subgraph:
"""
Expandiert ab Seeds entlang von Edges bis zu einer bestimmten Tiefe.
Nutzt fetch_edges_from_qdrant für den Datenbankzugriff.
"""
sg = Subgraph()
frontier = set(seeds)
visited = set()
for _ in range(max(depth, 0)):
if not frontier:
break
# Batch-Abfrage der Kanten für die aktuelle Ebene
payloads = fetch_edges_from_qdrant(client, prefix, list(frontier), edge_types)
next_frontier: Set[str] = set()
for pl in payloads:
src, tgt = pl.get("source_id"), pl.get("target_id")
if not src or not tgt: continue
sg.add_edge({
"source": src,
"target": tgt,
"kind": pl.get("kind", "edge"),
"weight": calculate_edge_weight(pl),
"note_id": pl.get("note_id"),
})
# BFS Logik: Neue Ziele in die nächste Frontier aufnehmen
if tgt not in visited:
next_frontier.add(str(tgt))
visited |= frontier
frontier = next_frontier - visited
return sg