From 13b7c8858a24c1a7ee705b0100b458bb2586e70b Mon Sep 17 00:00:00 2001 From: Lars Date: Wed, 3 Dec 2025 18:40:06 +0100 Subject: [PATCH] app/core/graph_adapter.py aktualisiert --- app/core/graph_adapter.py | 226 +++++++++++++++++++++++++++++++------- 1 file changed, 188 insertions(+), 38 deletions(-) diff --git a/app/core/graph_adapter.py b/app/core/graph_adapter.py index 0c39771..d324b96 100644 --- a/app/core/graph_adapter.py +++ b/app/core/graph_adapter.py @@ -1,90 +1,240 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ app/core/graph_adapter.py — Adjazenzaufbau & Subgraph-Expansion (WP-04) Zweck: Baut aus Qdrant-Edges (Collection: *_edges) einen leichten In-Memory-Graph und liefert Edge-basierte Kennzahlen (In-Degree, Out-Degree, edge_bonus). + Kompatibilität: - Python 3.12+, qdrant-client 1.x + - Python 3.12+, qdrant-client 1.x + - Wird von app/core/retriever.py im Hybrid-Modus genutzt. + - Signaturen bleiben kompatibel zu den bestehenden Tests + (tests/test_retriever_edges.py patcht expand()). + Version: - 0.1.0 (Erstanlage) -Stand: - 2025-10-07 -Bezug: - - WP-04 Edge-Gewichtungen und Heuristiken - - app/core/qdrant_points.py (get_edges_for_sources) -Nutzung: - from app.core.graph_adapter import expand -Änderungsverlauf: - 0.1.0 (2025-10-07) – Erstanlage. + 0.2.0 (2025-11-30 – direkte Qdrant-Abfrage, confidence-basiertes Gewicht) """ from __future__ import annotations + from typing import Dict, List, Optional, DefaultDict from collections import defaultdict -from qdrant_client import QdrantClient -from app.core.qdrant_points import get_edges_for_sources -EDGE_BASE_WEIGHTS = { - "references": 0.20, - "belongs_to": 0.10, - "next": 0.06, - "prev": 0.06, - "backlink": 0.04, +from qdrant_client import QdrantClient +from qdrant_client.http import models as rest + +from app.core.qdrant import collection_names + +# Legacy-Import (wird aktuell nicht mehr verwendet, bleibt aber erhalten, +# damit bestehende Importe/Mocks nicht brechen). +try: # pragma: no cover + from app.core.qdrant_points import get_edges_for_sources # type: ignore +except Exception: # pragma: no cover + get_edges_for_sources = None # type: ignore + + +# Basisgewichte je Edge-Typ. +# Diese Werte werden mit der in der Edge-Payload hinterlegten "confidence" +# multipliziert, falls vorhanden. +EDGE_BASE_WEIGHTS: Dict[str, float] = { + "references": 0.20, + "belongs_to": 0.10, + "next": 0.06, + "prev": 0.06, + "backlink": 0.04, "references_at": 0.08, + # weitere Typen erhalten per Default 0.0 und wirken nur über centrality } +def _edge_weight(pl: Dict) -> float: + """Berechnet das effektive Edge-Gewicht aus kind + confidence.""" + kind = pl.get("kind", "edge") + base = EDGE_BASE_WEIGHTS.get(kind, 0.0) + + conf_raw = pl.get("confidence", None) + try: + conf = float(conf_raw) if conf_raw is not None else None + except Exception: + conf = None + + if conf is None: + return base + + # Confidence vorsichtig in [0.0, 1.0] clampen + if conf < 0.0: + conf = 0.0 + if conf > 1.0: + conf = 1.0 + + return base * conf + + +def _fetch_edges( + client: QdrantClient, + prefix: str, + seeds: List[str], + edge_types: Optional[List[str]] = None, + limit: int = 2048, +) -> List[Dict]: + """ + Holt Edges direkt aus der *_edges Collection. + + Filter: + - source_id IN seeds ODER target_id IN seeds ODER note_id IN seeds + - optional: kind IN edge_types + """ + if not seeds or limit <= 0: + return [] + + _, _, edges_col = collection_names(prefix) + + # OR über source_id / target_id / note_id für alle Seeds + seed_conditions = [] + for field in ("source_id", "target_id", "note_id"): + for s in seeds: + seed_conditions.append( + rest.FieldCondition( + key=field, + match=rest.MatchValue(value=str(s)), + ) + ) + seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None + + # Optional: Filter auf bestimmte Edge-Typen (kind) + type_filter = None + if edge_types: + type_conds = [ + rest.FieldCondition( + key="kind", + match=rest.MatchValue(value=str(k)), + ) + for k in edge_types + ] + type_filter = rest.Filter(should=type_conds) + + flt = None + must = [] + if seeds_filter: + must.append(seeds_filter) + if type_filter: + must.append(type_filter) + if must: + flt = rest.Filter(must=must) + + pts, _ = client.scroll( + collection_name=edges_col, + scroll_filter=flt, + limit=limit, + with_payload=True, + with_vectors=False, + ) + + out: List[Dict] = [] + for p in pts or []: + pl = dict(p.payload or {}) + if pl: + out.append(pl) + return out + + class Subgraph: """Leichtgewichtiger Subgraph mit Adjazenzlisten & einfachen Kennzahlen.""" - def __init__(self): + def __init__(self) -> None: self.adj: DefaultDict[str, List[Dict]] = defaultdict(list) self.in_degree: DefaultDict[str, int] = defaultdict(int) self.out_degree: DefaultDict[str, int] = defaultdict(int) - def add_edge(self, e: Dict): - src = e["source"]; tgt = e["target"]; kind = e["kind"] + def add_edge(self, e: Dict) -> None: + src = e["source"] + tgt = e["target"] + kind = e["kind"] weight = e.get("weight", EDGE_BASE_WEIGHTS.get(kind, 0.0)) - self.adj[src].append({"target": tgt, "kind": kind, "weight": weight}) + + if not src or not tgt: + return + + self.adj[src].append( + { + "target": tgt, + "kind": kind, + "weight": weight, + } + ) self.out_degree[src] += 1 self.in_degree[tgt] += 1 def aggregate_edge_bonus(self, node_id: str) -> float: + """ + Summe der ausgehenden Kantengewichte für einen Knoten. + """ return sum(edge["weight"] for edge in self.adj.get(node_id, [])) def centrality_bonus(self, node_id: str) -> float: + """ + Einfache log-gedämpfte Zentralität auf Basis der In-Degree. + Obergrenze: 0.15 + """ import math - # Log-gedämpfter Bonus, hart begrenzt - return min(math.log1p(self.in_degree.get(node_id, 0)) / 10.0, 0.15) + + indeg = self.in_degree.get(node_id, 0) + if indeg <= 0: + return 0.0 + return min(math.log1p(indeg) / 10.0, 0.15) -def expand(client: QdrantClient, prefix: str, seeds: List[str], - depth: int = 1, edge_types: Optional[List[str]] = None) -> Subgraph: +def expand( + client: QdrantClient, + prefix: str, + seeds: List[str], + depth: int = 1, + edge_types: Optional[List[str]] = None, +) -> Subgraph: """ - Expandiert ab Seeds entlang von Edges (bis depth), optional gefiltert nach Typen. - Seeds sind stabile payload-IDs (z. B. note_id, chunk_id). + Expandiert ab Seeds entlang von Edges (bis `depth`), optional gefiltert + nach Edge-Typen. + + Seeds sind stabile payload-IDs (z. B. note_id, chunk_id). Es werden Edges + berücksichtigt, bei denen source_id ODER target_id ODER note_id einem der + Seeds entspricht. """ sg = Subgraph() frontier = set(seeds) visited = set() - for _ in range(max(depth, 0)): + max_depth = max(depth, 0) + + for _ in range(max_depth): if not frontier: break - edges = get_edges_for_sources(client, prefix, list(frontier), edge_types=edge_types, limit=2048) + edges_payloads = _fetch_edges( + client=client, + prefix=prefix, + seeds=list(frontier), + edge_types=edge_types, + limit=2048, + ) + next_frontier = set() - for pl in edges: + for pl in edges_payloads: + src = pl.get("source_id") + tgt = pl.get("target_id") + kind = pl.get("kind", "edge") + e = { - "source": pl.get("source_id"), - "target": pl.get("target_id"), - "kind": pl.get("kind", "edge"), - "weight": pl.get("weight", EDGE_BASE_WEIGHTS.get(pl.get("kind", "edge"), 0.0)), + "source": src, + "target": tgt, + "kind": kind, + "weight": _edge_weight(pl), } sg.add_edge(e) - if e["target"]: - next_frontier.add(e["target"]) + + if tgt: + next_frontier.add(tgt) visited |= frontier frontier = next_frontier - visited