#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ app/core/graph_adapter.py — Adjazenzaufbau & Subgraph-Expansion Zweck: Baut aus Qdrant-Edges (Collection: *_edges) einen leichten In-Memory-Graph. Kompatibilität: - WP-04a: Liefert Scores (edge_bonus, centrality). - WP-04b: Liefert jetzt auch Struktur-Daten für Erklärungen (Reverse-Lookup). Version: 0.4.0 (Update für WP-04b: Reverse Adjacency für Explainability) """ from __future__ import annotations from typing import Dict, List, Optional, DefaultDict, Any from collections import defaultdict from qdrant_client import QdrantClient from qdrant_client.http import models as rest from app.core.qdrant import collection_names # Legacy-Import Fallback try: # pragma: no cover from app.core.qdrant_points import get_edges_for_sources # type: ignore except Exception: # pragma: no cover get_edges_for_sources = None # type: ignore # Basisgewichte je Edge-Typ (WP-04a Config) EDGE_BASE_WEIGHTS: Dict[str, float] = { # Struktur "belongs_to": 0.10, "next": 0.06, "prev": 0.06, "backlink": 0.04, "references_at": 0.08, # Wissen "references": 0.20, "depends_on": 0.18, "related_to": 0.15, "similar_to": 0.12, } def _edge_weight(pl: Dict) -> float: """Berechnet das effektive Edge-Gewicht aus kind + confidence.""" kind = pl.get("kind", "edge") base = EDGE_BASE_WEIGHTS.get(kind, 0.0) conf_raw = pl.get("confidence", None) try: conf = float(conf_raw) if conf_raw is not None else None except Exception: conf = None if conf is None: return base if conf < 0.0: conf = 0.0 if conf > 1.0: conf = 1.0 return base * conf def _fetch_edges( client: QdrantClient, prefix: str, seeds: List[str], edge_types: Optional[List[str]] = None, limit: int = 2048, ) -> List[Dict]: """ Holt Edges direkt aus der *_edges Collection. Filter: source_id IN seeds OR target_id IN seeds OR note_id IN seeds """ if not seeds or limit <= 0: return [] _, _, edges_col = collection_names(prefix) seed_conditions = [] for field in ("source_id", "target_id", "note_id"): for s in seeds: seed_conditions.append( rest.FieldCondition(key=field, match=rest.MatchValue(value=str(s))) ) seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None type_filter = None if edge_types: type_conds = [ rest.FieldCondition(key="kind", match=rest.MatchValue(value=str(k))) for k in edge_types ] type_filter = rest.Filter(should=type_conds) must = [] if seeds_filter: must.append(seeds_filter) if type_filter: must.append(type_filter) flt = rest.Filter(must=must) if must else None pts, _ = client.scroll( collection_name=edges_col, scroll_filter=flt, limit=limit, with_payload=True, with_vectors=False, ) out: List[Dict] = [] for p in pts or []: pl = dict(p.payload or {}) if pl: out.append(pl) return out class Subgraph: """Leichtgewichtiger Subgraph mit Adjazenzlisten & Kennzahlen.""" def __init__(self) -> None: # Forward: source -> [targets] self.adj: DefaultDict[str, List[Dict]] = defaultdict(list) # Reverse: target -> [sources] (Neu für WP-04b Explanation) self.reverse_adj: DefaultDict[str, List[Dict]] = defaultdict(list) self.in_degree: DefaultDict[str, int] = defaultdict(int) self.out_degree: DefaultDict[str, int] = defaultdict(int) def add_edge(self, e: Dict) -> None: """ Fügt eine Kante hinzu und aktualisiert Forward/Reverse Indizes. e muss enthalten: source, target, kind, weight. """ src = e.get("source") tgt = e.get("target") kind = e.get("kind") weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)) owner = e.get("note_id") if not src or not tgt: return # 1. Primäre Adjazenz (Forward) edge_data = {"target": tgt, "kind": kind, "weight": weight} self.adj[src].append(edge_data) self.out_degree[src] += 1 self.in_degree[tgt] += 1 # 2. Reverse Adjazenz (Neu für Explanation) # Wir speichern, woher die Kante kam. rev_data = {"source": src, "kind": kind, "weight": weight} self.reverse_adj[tgt].append(rev_data) # 3. Kontext-Note Handling (Forward & Reverse) # Wenn eine Kante "im Kontext einer Note" (owner) definiert ist, # schreiben wir sie der Note gut, damit der Retriever Scores auf Note-Ebene findet. if owner and owner != src: # Forward: Owner -> Target self.adj[owner].append(edge_data) self.out_degree[owner] += 1 # Reverse: Target wird vom Owner referenziert (indirekt) if owner != tgt: rev_owner_data = {"source": owner, "kind": kind, "weight": weight, "via_context": True} self.reverse_adj[tgt].append(rev_owner_data) self.in_degree[owner] += 1 # Leichter Centrality Boost für den Owner def aggregate_edge_bonus(self, node_id: str) -> float: """Summe der ausgehenden Kantengewichte (Hub-Score).""" return sum(edge["weight"] for edge in self.adj.get(node_id, [])) def edge_bonus(self, node_id: str) -> float: """API für Retriever (WP-04a Kompatibilität).""" return self.aggregate_edge_bonus(node_id) def centrality_bonus(self, node_id: str) -> float: """Log-gedämpfte Zentralität (In-Degree).""" import math indeg = self.in_degree.get(node_id, 0) if indeg <= 0: return 0.0 return min(math.log1p(indeg) / 10.0, 0.15) # --- WP-04b Explanation Helpers --- def get_outgoing_edges(self, node_id: str) -> List[Dict[str, Any]]: """Liefert Liste aller Ziele, auf die dieser Knoten zeigt.""" return self.adj.get(node_id, []) def get_incoming_edges(self, node_id: str) -> List[Dict[str, Any]]: """Liefert Liste aller Quellen, die auf diesen Knoten zeigen.""" return self.reverse_adj.get(node_id, []) def expand( client: QdrantClient, prefix: str, seeds: List[str], depth: int = 1, edge_types: Optional[List[str]] = None, ) -> Subgraph: """ Expandiert ab Seeds entlang von Edges (bis `depth`). """ sg = Subgraph() frontier = set(seeds) visited = set() max_depth = max(depth, 0) for _ in range(max_depth): if not frontier: break edges_payloads = _fetch_edges( client=client, prefix=prefix, seeds=list(frontier), edge_types=edge_types, limit=2048, ) next_frontier = set() for pl in edges_payloads: src = pl.get("source_id") tgt = pl.get("target_id") # Skip invalid edges if not src or not tgt: continue e = { "source": src, "target": tgt, "kind": pl.get("kind", "edge"), "weight": _edge_weight(pl), "note_id": pl.get("note_id"), } sg.add_edge(e) # Nur weitersuchen, wenn Target noch nicht besucht if tgt and tgt not in visited: next_frontier.add(tgt) visited |= frontier frontier = next_frontier - visited return sg