mindnet/app/core/graph_adapter.py
Lars 5baac082f9
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
app/core/graph_adapter.py aktualisiert
2025-12-04 11:52:18 +01:00

306 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
app/core/graph_adapter.py — Adjazenzaufbau & Subgraph-Expansion (WP-04)
Zweck:
Baut aus Qdrant-Edges (Collection: *_edges) einen leichten In-Memory-Graph
und liefert Edge-basierte Kennzahlen (In-Degree, Out-Degree, edge_bonus).
Kompatibilität:
- Python 3.12+, qdrant-client 1.x
- Wird von app/core/retriever.py im Hybrid-Modus genutzt.
- Signaturen bleiben kompatibel zu den bestehenden Tests
(tests/test_retriever_edges.py patcht expand()).
Version:
0.3.1 (2025-12-04 note_id-bewusste Expansion:
* Seeds = Note-IDs
* Subgraph pflegt zusätzlich Adjazenz auf note_id-Basis,
damit der Retriever mit note_id arbeiten kann)
"""
from __future__ import annotations
from typing import Dict, List, Optional, DefaultDict
from collections import defaultdict
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.qdrant import collection_names
# Legacy-Import (wird aktuell nicht mehr verwendet, bleibt aber erhalten,
# damit bestehende Importe/Mocks nicht brechen).
try: # pragma: no cover
from app.core.qdrant_points import get_edges_for_sources # type: ignore
except Exception: # pragma: no cover
get_edges_for_sources = None # type: ignore
# Basisgewichte je Edge-Typ.
# Diese Werte werden mit der in der Edge-Payload hinterlegten "confidence"
# multipliziert, falls vorhanden.
#
# Hinweis:
# - Strukturkanten (belongs_to, next/prev) sind schwächer gewichtet.
# - Wissenskanten (depends_on, related_to, similar_to, references) erhalten
# höhere Basisgewichte, damit sie sich im Retriever bemerkbar machen.
EDGE_BASE_WEIGHTS: Dict[str, float] = {
# Struktur / Navigationskanten
"belongs_to": 0.10,
"next": 0.06,
"prev": 0.06,
"backlink": 0.04,
"references_at": 0.08,
# Wissenskanten
"references": 0.20,
"depends_on": 0.18,
"related_to": 0.15,
"similar_to": 0.12,
# weitere Typen erhalten per Default 0.0 und wirken nur über centrality
}
def _edge_weight(pl: Dict) -> float:
"""Berechnet das effektive Edge-Gewicht aus kind + confidence."""
kind = pl.get("kind", "edge")
base = EDGE_BASE_WEIGHTS.get(kind, 0.0)
conf_raw = pl.get("confidence", None)
try:
conf = float(conf_raw) if conf_raw is not None else None
except Exception:
conf = None
if conf is None:
return base
# Confidence vorsichtig in [0.0, 1.0] clampen
if conf < 0.0:
conf = 0.0
if conf > 1.0:
conf = 1.0
return base * conf
def _fetch_edges(
client: QdrantClient,
prefix: str,
seeds: List[str],
edge_types: Optional[List[str]] = None,
limit: int = 2048,
) -> List[Dict]:
"""
Holt Edges direkt aus der *_edges Collection.
Filter:
- source_id IN seeds ODER target_id IN seeds ODER note_id IN seeds
- optional: kind IN edge_types
"""
if not seeds or limit <= 0:
return []
_, _, edges_col = collection_names(prefix)
# OR über source_id / target_id / note_id für alle Seeds
seed_conditions = []
for field in ("source_id", "target_id", "note_id"):
for s in seeds:
seed_conditions.append(
rest.FieldCondition(
key=field,
match=rest.MatchValue(value=str(s)),
)
)
seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None
# Optional: Filter auf bestimmte Edge-Typen (kind)
type_filter = None
if edge_types:
type_conds = [
rest.FieldCondition(
key="kind",
match=rest.MatchValue(value=str(k)),
)
for k in edge_types
]
type_filter = rest.Filter(should=type_conds)
flt = None
must = []
if seeds_filter:
must.append(seeds_filter)
if type_filter:
must.append(type_filter)
if must:
flt = rest.Filter(must=must)
pts, _ = client.scroll(
collection_name=edges_col,
scroll_filter=flt,
limit=limit,
with_payload=True,
with_vectors=False,
)
out: List[Dict] = []
for p in pts or []:
pl = dict(p.payload or {})
if pl:
out.append(pl)
return out
class Subgraph:
"""Leichtgewichtiger Subgraph mit Adjazenzlisten & einfachen Kennzahlen."""
def __init__(self) -> None:
self.adj: DefaultDict[str, List[Dict]] = defaultdict(list)
self.in_degree: DefaultDict[str, int] = defaultdict(int)
self.out_degree: DefaultDict[str, int] = defaultdict(int)
def add_edge(self, e: Dict) -> None:
"""
Fügt eine Kante zum Subgraph hinzu.
e enthält:
- source: Knoten-ID (z. B. source_id)
- target: Knoten-ID (z. B. target_id)
- kind: Kantentyp
- weight: bereits berechnetes Gewicht
- note_id: (optional) Kontext-Note dieser Kante
Wichtig für den Retriever:
- Wir pflegen Adjazenz primär unter `source`.
- Zusätzlich pflegen wir, falls vorhanden, eine Adjazenz
unter `note_id`, damit der Retriever mit note_id arbeiten kann.
"""
src = e.get("source")
tgt = e.get("target")
kind = e.get("kind")
weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0))
owner = e.get("note_id")
if not src or not tgt:
return
# Primäre Adjazenz (z. B. source_id → target_id)
self.adj[src].append(
{
"target": tgt,
"kind": kind,
"weight": weight,
}
)
self.out_degree[src] += 1
self.in_degree[tgt] += 1
# Sekundäre Adjazenz auf Note-Ebene:
# Falls eine Kontext-Note existiert, bekommt diese ebenfalls
# "Ausgangskanten" angerechnet. So kann der Retriever über note_id
# edge_bonus(...) und centrality_bonus(...) abfragen.
if owner:
# same edge unter owner registrieren (falls nicht identisch mit src),
# damit edge_bonus(owner) > 0 werden kann.
if owner != src:
self.adj[owner].append(
{
"target": tgt,
"kind": kind,
"weight": weight,
}
)
self.out_degree[owner] += 1
# Zentralität der Kontext-Note leicht erhöhen
if owner != tgt:
self.in_degree[owner] += 1
def aggregate_edge_bonus(self, node_id: str) -> float:
"""
Summe der ausgehenden Kantengewichte für einen Knoten.
"""
return sum(edge["weight"] for edge in self.adj.get(node_id, []))
def edge_bonus(self, node_id: str) -> float:
"""
Kompatibilitäts-Methode für den Retriever.
Der Retriever ruft subgraph.edge_bonus(node_id) auf. Intern verwenden
wir aggregate_edge_bonus(...), um bestehende Tests nicht zu brechen.
"""
return self.aggregate_edge_bonus(node_id)
def centrality_bonus(self, node_id: str) -> float:
"""
Einfache log-gedämpfte Zentralität auf Basis des In-Degree.
Obergrenze: 0.15
"""
import math
indeg = self.in_degree.get(node_id, 0)
if indeg <= 0:
return 0.0
return min(math.log1p(indeg) / 10.0, 0.15)
def expand(
client: QdrantClient,
prefix: str,
seeds: List[str],
depth: int = 1,
edge_types: Optional[List[str]] = None,
) -> Subgraph:
"""
Expandiert ab Seeds entlang von Edges (bis `depth`), optional gefiltert
nach Edge-Typen.
Seeds sind stabile payload-IDs (in unserem Fall vor allem note_id).
Es werden Edges berücksichtigt, bei denen source_id ODER target_id ODER
note_id einem der Seeds entspricht.
"""
sg = Subgraph()
frontier = set(seeds)
visited = set()
max_depth = max(depth, 0)
for _ in range(max_depth):
if not frontier:
break
edges_payloads = _fetch_edges(
client=client,
prefix=prefix,
seeds=list(frontier),
edge_types=edge_types,
limit=2048,
)
next_frontier = set()
for pl in edges_payloads:
src = pl.get("source_id")
tgt = pl.get("target_id")
kind = pl.get("kind", "edge")
owner = pl.get("note_id")
e = {
"source": src,
"target": tgt,
"kind": kind,
"weight": _edge_weight(pl),
"note_id": owner,
}
sg.add_edge(e)
if tgt:
next_frontier.add(tgt)
visited |= frontier
frontier = next_frontier - visited
return sg