mindnet/app/core/graph_adapter.py
Lars 13b7c8858a
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
app/core/graph_adapter.py aktualisiert
2025-12-03 18:40:06 +01:00

243 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
app/core/graph_adapter.py — Adjazenzaufbau & Subgraph-Expansion (WP-04)
Zweck:
Baut aus Qdrant-Edges (Collection: *_edges) einen leichten In-Memory-Graph
und liefert Edge-basierte Kennzahlen (In-Degree, Out-Degree, edge_bonus).
Kompatibilität:
- Python 3.12+, qdrant-client 1.x
- Wird von app/core/retriever.py im Hybrid-Modus genutzt.
- Signaturen bleiben kompatibel zu den bestehenden Tests
(tests/test_retriever_edges.py patcht expand()).
Version:
0.2.0 (2025-11-30 direkte Qdrant-Abfrage, confidence-basiertes Gewicht)
"""
from __future__ import annotations
from typing import Dict, List, Optional, DefaultDict
from collections import defaultdict
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.qdrant import collection_names
# Legacy-Import (wird aktuell nicht mehr verwendet, bleibt aber erhalten,
# damit bestehende Importe/Mocks nicht brechen).
try: # pragma: no cover
from app.core.qdrant_points import get_edges_for_sources # type: ignore
except Exception: # pragma: no cover
get_edges_for_sources = None # type: ignore
# Basisgewichte je Edge-Typ.
# Diese Werte werden mit der in der Edge-Payload hinterlegten "confidence"
# multipliziert, falls vorhanden.
EDGE_BASE_WEIGHTS: Dict[str, float] = {
"references": 0.20,
"belongs_to": 0.10,
"next": 0.06,
"prev": 0.06,
"backlink": 0.04,
"references_at": 0.08,
# weitere Typen erhalten per Default 0.0 und wirken nur über centrality
}
def _edge_weight(pl: Dict) -> float:
"""Berechnet das effektive Edge-Gewicht aus kind + confidence."""
kind = pl.get("kind", "edge")
base = EDGE_BASE_WEIGHTS.get(kind, 0.0)
conf_raw = pl.get("confidence", None)
try:
conf = float(conf_raw) if conf_raw is not None else None
except Exception:
conf = None
if conf is None:
return base
# Confidence vorsichtig in [0.0, 1.0] clampen
if conf < 0.0:
conf = 0.0
if conf > 1.0:
conf = 1.0
return base * conf
def _fetch_edges(
client: QdrantClient,
prefix: str,
seeds: List[str],
edge_types: Optional[List[str]] = None,
limit: int = 2048,
) -> List[Dict]:
"""
Holt Edges direkt aus der *_edges Collection.
Filter:
- source_id IN seeds ODER target_id IN seeds ODER note_id IN seeds
- optional: kind IN edge_types
"""
if not seeds or limit <= 0:
return []
_, _, edges_col = collection_names(prefix)
# OR über source_id / target_id / note_id für alle Seeds
seed_conditions = []
for field in ("source_id", "target_id", "note_id"):
for s in seeds:
seed_conditions.append(
rest.FieldCondition(
key=field,
match=rest.MatchValue(value=str(s)),
)
)
seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None
# Optional: Filter auf bestimmte Edge-Typen (kind)
type_filter = None
if edge_types:
type_conds = [
rest.FieldCondition(
key="kind",
match=rest.MatchValue(value=str(k)),
)
for k in edge_types
]
type_filter = rest.Filter(should=type_conds)
flt = None
must = []
if seeds_filter:
must.append(seeds_filter)
if type_filter:
must.append(type_filter)
if must:
flt = rest.Filter(must=must)
pts, _ = client.scroll(
collection_name=edges_col,
scroll_filter=flt,
limit=limit,
with_payload=True,
with_vectors=False,
)
out: List[Dict] = []
for p in pts or []:
pl = dict(p.payload or {})
if pl:
out.append(pl)
return out
class Subgraph:
"""Leichtgewichtiger Subgraph mit Adjazenzlisten & einfachen Kennzahlen."""
def __init__(self) -> None:
self.adj: DefaultDict[str, List[Dict]] = defaultdict(list)
self.in_degree: DefaultDict[str, int] = defaultdict(int)
self.out_degree: DefaultDict[str, int] = defaultdict(int)
def add_edge(self, e: Dict) -> None:
src = e["source"]
tgt = e["target"]
kind = e["kind"]
weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0))
if not src or not tgt:
return
self.adj[src].append(
{
"target": tgt,
"kind": kind,
"weight": weight,
}
)
self.out_degree[src] += 1
self.in_degree[tgt] += 1
def aggregate_edge_bonus(self, node_id: str) -> float:
"""
Summe der ausgehenden Kantengewichte für einen Knoten.
"""
return sum(edge["weight"] for edge in self.adj.get(node_id, []))
def centrality_bonus(self, node_id: str) -> float:
"""
Einfache log-gedämpfte Zentralität auf Basis der In-Degree.
Obergrenze: 0.15
"""
import math
indeg = self.in_degree.get(node_id, 0)
if indeg <= 0:
return 0.0
return min(math.log1p(indeg) / 10.0, 0.15)
def expand(
client: QdrantClient,
prefix: str,
seeds: List[str],
depth: int = 1,
edge_types: Optional[List[str]] = None,
) -> Subgraph:
"""
Expandiert ab Seeds entlang von Edges (bis `depth`), optional gefiltert
nach Edge-Typen.
Seeds sind stabile payload-IDs (z. B. note_id, chunk_id). Es werden Edges
berücksichtigt, bei denen source_id ODER target_id ODER note_id einem der
Seeds entspricht.
"""
sg = Subgraph()
frontier = set(seeds)
visited = set()
max_depth = max(depth, 0)
for _ in range(max_depth):
if not frontier:
break
edges_payloads = _fetch_edges(
client=client,
prefix=prefix,
seeds=list(frontier),
edge_types=edge_types,
limit=2048,
)
next_frontier = set()
for pl in edges_payloads:
src = pl.get("source_id")
tgt = pl.get("target_id")
kind = pl.get("kind", "edge")
e = {
"source": src,
"target": tgt,
"kind": kind,
"weight": _edge_weight(pl),
}
sg.add_edge(e)
if tgt:
next_frontier.add(tgt)
visited |= frontier
frontier = next_frontier - visited
return sg