mindnet/app/core/graph_adapter.py
2025-12-15 15:40:39 +01:00

249 lines
7.4 KiB
Python

"""
FILE: app/core/graph_adapter.py
DESCRIPTION: Lädt Kanten aus Qdrant und baut einen In-Memory Subgraphen für Scoring (Centrality) und Explanation.
VERSION: 0.4.0
STATUS: Active
DEPENDENCIES: qdrant_client, app.core.qdrant
LAST_ANALYSIS: 2025-12-15
"""
from __future__ import annotations
from typing import Dict, List, Optional, DefaultDict, Any
from collections import defaultdict
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.qdrant import collection_names
# Legacy-Import Fallback
try: # pragma: no cover
from app.core.qdrant_points import get_edges_for_sources # type: ignore
except Exception: # pragma: no cover
get_edges_for_sources = None # type: ignore
# Basisgewichte je Edge-Typ (WP-04a Config)
EDGE_BASE_WEIGHTS: Dict[str, float] = {
# Struktur
"belongs_to": 0.10,
"next": 0.06,
"prev": 0.06,
"backlink": 0.04,
"references_at": 0.08,
# Wissen
"references": 0.20,
"depends_on": 0.18,
"related_to": 0.15,
"similar_to": 0.12,
}
def _edge_weight(pl: Dict) -> float:
"""Berechnet das effektive Edge-Gewicht aus kind + confidence."""
kind = pl.get("kind", "edge")
base = EDGE_BASE_WEIGHTS.get(kind, 0.0)
conf_raw = pl.get("confidence", None)
try:
conf = float(conf_raw) if conf_raw is not None else None
except Exception:
conf = None
if conf is None:
return base
if conf < 0.0: conf = 0.0
if conf > 1.0: conf = 1.0
return base * conf
def _fetch_edges(
client: QdrantClient,
prefix: str,
seeds: List[str],
edge_types: Optional[List[str]] = None,
limit: int = 2048,
) -> List[Dict]:
"""
Holt Edges direkt aus der *_edges Collection.
Filter: source_id IN seeds OR target_id IN seeds OR note_id IN seeds
"""
if not seeds or limit <= 0:
return []
_, _, edges_col = collection_names(prefix)
seed_conditions = []
for field in ("source_id", "target_id", "note_id"):
for s in seeds:
seed_conditions.append(
rest.FieldCondition(key=field, match=rest.MatchValue(value=str(s)))
)
seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None
type_filter = None
if edge_types:
type_conds = [
rest.FieldCondition(key="kind", match=rest.MatchValue(value=str(k)))
for k in edge_types
]
type_filter = rest.Filter(should=type_conds)
must = []
if seeds_filter: must.append(seeds_filter)
if type_filter: must.append(type_filter)
flt = rest.Filter(must=must) if must else None
pts, _ = client.scroll(
collection_name=edges_col,
scroll_filter=flt,
limit=limit,
with_payload=True,
with_vectors=False,
)
out: List[Dict] = []
for p in pts or []:
pl = dict(p.payload or {})
if pl:
out.append(pl)
return out
class Subgraph:
"""Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen."""
def __init__(self) -> None:
# Forward: source -> [targets]
self.adj: DefaultDict[str, List[Dict]] = defaultdict(list)
# Reverse: target -> [sources] (Neu für WP-04b Explanation)
self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list)
self.in_degree: DefaultDict[str, int] = defaultdict(int)
self.out_degree: DefaultDict[str, int] = defaultdict(int)
def add_edge(self, e: Dict) -> None:
"""
Fügt eine Kante hinzu und aktualisiert Forward/Reverse Indizes.
e muss enthalten: source, target, kind, weight.
"""
src = e.get("source")
tgt = e.get("target")
kind = e.get("kind")
weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0))
owner = e.get("note_id")
if not src or not tgt:
return
# 1. Primäre Adjazenz (Forward)
edge_data = {"target": tgt, "kind": kind, "weight": weight}
self.adj[src].append(edge_data)
self.out_degree[src] += 1
self.in_degree[tgt] += 1
# 2. Reverse Adjazenz (Neu für Explanation)
# Wir speichern, woher die Kante kam.
rev_data = {"source": src, "kind": kind, "weight": weight}
self.reverse_adj[tgt].append(rev_data)
# 3. Kontext-Note Handling (Forward & Reverse)
# Wenn eine Kante "im Kontext einer Note" (owner) definiert ist,
# schreiben wir sie der Note gut, damit der Retriever Scores auf Note-Ebene findet.
if owner and owner != src:
# Forward: Owner -> Target
self.adj[owner].append(edge_data)
self.out_degree[owner] += 1
# Reverse: Target wird vom Owner referenziert (indirekt)
if owner != tgt:
rev_owner_data = {"source": owner, "kind": kind, "weight": weight, "via_context": True}
self.reverse_adj[tgt].append(rev_owner_data)
self.in_degree[owner] += 1 # Leichter Centrality Boost für den Owner
def aggregate_edge_bonus(self, node_id: str) -> float:
"""Summe der ausgehenden Kantengewichte (Hub-Score)."""
return sum(edge["weight"] for edge in self.adj.get(node_id, []))
def edge_bonus(self, node_id: str) -> float:
"""API für Retriever (WP-04a Kompatibilität)."""
return self.aggregate_edge_bonus(node_id)
def centrality_bonus(self, node_id: str) -> float:
"""Log-gedämpfte Zentralität (In-Degree)."""
import math
indeg = self.in_degree.get(node_id, 0)
if indeg <= 0:
return 0.0
return min(math.log1p(indeg) / 10.0, 0.15)
# --- WP-04b Explanation Helpers ---
def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]:
"""Liefert Liste aller Ziele, auf die dieser Knoten zeigt."""
return self.adj.get(node_id, [])
def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]:
"""Liefert Liste aller Quellen, die auf diesen Knoten zeigen."""
return self.reverse_adj.get(node_id, [])
def expand(
client: QdrantClient,
prefix: str,
seeds: List[str],
depth: int = 1,
edge_types: Optional[List[str]] = None,
) -> Subgraph:
"""
Expandiert ab Seeds entlang von Edges (bis `depth`).
"""
sg = Subgraph()
frontier = set(seeds)
visited = set()
max_depth = max(depth, 0)
for _ in range(max_depth):
if not frontier:
break
edges_payloads = _fetch_edges(
client=client,
prefix=prefix,
seeds=list(frontier),
edge_types=edge_types,
limit=2048,
)
next_frontier = set()
for pl in edges_payloads:
src = pl.get("source_id")
tgt = pl.get("target_id")
# Skip invalid edges
if not src or not tgt:
continue
e = {
"source": src,
"target": tgt,
"kind": pl.get("kind", "edge"),
"weight": _edge_weight(pl),
"note_id": pl.get("note_id"),
}
sg.add_edge(e)
# Nur weitersuchen, wenn Target noch nicht besucht
if tgt and tgt not in visited:
next_frontier.add(tgt)
visited |= frontier
frontier = next_frontier - visited
return sg