#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ app/core/graph_adapter.py — Adjazenzaufbau & Subgraph-Expansion (WP-04) Zweck: Baut aus Qdrant-Edges (Collection: *_edges) einen leichten In-Memory-Graph und liefert Edge-basierte Kennzahlen (In-Degree, Out-Degree, edge_bonus). Kompatibilität: - Python 3.12+, qdrant-client 1.x - Wird von app/core/retriever.py im Hybrid-Modus genutzt. - Signaturen bleiben kompatibel zu den bestehenden Tests (tests/test_retriever_edges.py patcht expand()). Version: 0.3.0 (2025-12-04 – note_id-bewusste Expansion, zusätzliche Edge-Typ-Gewichte, Subgraph.edge_bonus() für Kompatibilität zum Retriever) """ from __future__ import annotations from typing import Dict, List, Optional, DefaultDict from collections import defaultdict from qdrant_client import QdrantClient from qdrant_client.http import models as rest from app.core.qdrant import collection_names # Legacy-Import (wird aktuell nicht mehr verwendet, bleibt aber erhalten, # damit bestehende Importe/Mocks nicht brechen). try: # pragma: no cover from app.core.qdrant_points import get_edges_for_sources # type: ignore except Exception: # pragma: no cover get_edges_for_sources = None # type: ignore # Basisgewichte je Edge-Typ. # Diese Werte werden mit der in der Edge-Payload hinterlegten "confidence" # multipliziert, falls vorhanden. # # Hinweis: # - Strukturkanten (belongs_to, next/prev) sind schwächer gewichtet. # - Wissenskanten (depends_on, related_to, similar_to, references) erhalten # höhere Basisgewichte, damit sie sich im Retriever bemerkbar machen. EDGE_BASE_WEIGHTS: Dict[str, float] = { # Struktur / Navigationskanten "belongs_to": 0.10, "next": 0.06, "prev": 0.06, "backlink": 0.04, "references_at": 0.08, # Wissenskanten "references": 0.20, "depends_on": 0.18, "related_to": 0.15, "similar_to": 0.12, # weitere Typen erhalten per Default 0.0 und wirken nur über centrality } def _edge_weight(pl: Dict) -> float: """Berechnet das effektive Edge-Gewicht aus kind + confidence.""" kind = pl.get("kind", "edge") base = EDGE_BASE_WEIGHTS.get(kind, 0.0) conf_raw = pl.get("confidence", None) try: conf = float(conf_raw) if conf_raw is not None else None except Exception: conf = None if conf is None: return base # Confidence vorsichtig in [0.0, 1.0] clampen if conf < 0.0: conf = 0.0 if conf > 1.0: conf = 1.0 return base * conf def _fetch_edges( client: QdrantClient, prefix: str, seeds: List[str], edge_types: Optional[List[str]] = None, limit: int = 2048, ) -> List[Dict]: """ Holt Edges direkt aus der *_edges Collection. Filter: - source_id IN seeds ODER target_id IN seeds ODER note_id IN seeds - optional: kind IN edge_types """ if not seeds or limit <= 0: return [] _, _, edges_col = collection_names(prefix) # OR über source_id / target_id / note_id für alle Seeds seed_conditions = [] for field in ("source_id", "target_id", "note_id"): for s in seeds: seed_conditions.append( rest.FieldCondition( key=field, match=rest.MatchValue(value=str(s)), ) ) seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None # Optional: Filter auf bestimmte Edge-Typen (kind) type_filter = None if edge_types: type_conds = [ rest.FieldCondition( key="kind", match=rest.MatchValue(value=str(k)), ) for k in edge_types ] type_filter = rest.Filter(should=type_conds) flt = None must = [] if seeds_filter: must.append(seeds_filter) if type_filter: must.append(type_filter) if must: flt = rest.Filter(must=must) pts, _ = client.scroll( collection_name=edges_col, scroll_filter=flt, limit=limit, with_payload=True, with_vectors=False, ) out: List[Dict] = [] for p in pts or []: pl = dict(p.payload or {}) if pl: out.append(pl) return out class Subgraph: """Leichtgewichtiger Subgraph mit Adjazenzlisten & einfachen Kennzahlen.""" def __init__(self) -> None: self.adj: DefaultDict[str, List[Dict]] = defaultdict(list) self.in_degree: DefaultDict[str, int] = defaultdict(int) self.out_degree: DefaultDict[str, int] = defaultdict(int) def add_edge(self, e: Dict) -> None: src = e["source"] tgt = e["target"] kind = e["kind"] weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)) if not src or not tgt: return self.adj[src].append( { "target": tgt, "kind": kind, "weight": weight, } ) self.out_degree[src] += 1 self.in_degree[tgt] += 1 def aggregate_edge_bonus(self, node_id: str) -> float: """ Summe der ausgehenden Kantengewichte für einen Knoten. """ return sum(edge["weight"] for edge in self.adj.get(node_id, [])) def edge_bonus(self, node_id: str) -> float: """ Kompatibilitäts-Methode für den Retriever. Der Retriever ruft subgraph.edge_bonus(node_id) auf. Intern verwenden wir aggregate_edge_bonus(...), um bestehende Tests nicht zu brechen. """ return self.aggregate_edge_bonus(node_id) def centrality_bonus(self, node_id: str) -> float: """ Einfache log-gedämpfte Zentralität auf Basis des In-Degree. Obergrenze: 0.15 """ import math indeg = self.in_degree.get(node_id, 0) if indeg <= 0: return 0.0 return min(math.log1p(indeg) / 10.0, 0.15) def expand( client: QdrantClient, prefix: str, seeds: List[str], depth: int = 1, edge_types: Optional[List[str]] = None, ) -> Subgraph: """ Expandiert ab Seeds entlang von Edges (bis `depth`), optional gefiltert nach Edge-Typen. Seeds sind stabile payload-IDs (in unserem Fall vor allem note_id). Es werden Edges berücksichtigt, bei denen source_id ODER target_id ODER note_id einem der Seeds entspricht. """ sg = Subgraph() frontier = set(seeds) visited = set() max_depth = max(depth, 0) for _ in range(max_depth): if not frontier: break edges_payloads = _fetch_edges( client=client, prefix=prefix, seeds=list(frontier), edge_types=edge_types, limit=2048, ) next_frontier = set() for pl in edges_payloads: src = pl.get("source_id") tgt = pl.get("target_id") kind = pl.get("kind", "edge") e = { "source": src, "target": tgt, "kind": kind, "weight": _edge_weight(pl), } sg.add_edge(e) if tgt: next_frontier.add(tgt) visited |= frontier frontier = next_frontier - visited return sg