#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Modul: app/graph/service.py Version: 0.1.0 Datum: 2025-09-10 Zweck ----- Leichtgewichtiger Graph-Layer über Qdrant: - get_note(note_id) - get_chunks(note_id) - neighbors(source_id, kinds=[...], scope=['note','chunk'], depth=1) - walk_bfs(source_id, kinds, max_depth) - context_for_note(note_id, max_neighbors): heuristische Kontextsammlung Hinweise -------- - Nutzt die bestehenden Collections _notes/_chunks/_edges. - Edges werden über Payload-Felder (`kind`, `source_id`, `target_id`) abgefragt. """ from __future__ import annotations from typing import List, Dict, Any, Optional, Iterable, Set, Tuple from qdrant_client.http import models as rest from app.core.qdrant import QdrantConfig, get_client def _cols(prefix: str): return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" class GraphService: def __init__(self, cfg: Optional[QdrantConfig] = None, prefix: Optional[str] = None): self.cfg = cfg or QdrantConfig.from_env() if prefix: self.cfg.prefix = prefix self.client = get_client(self.cfg) self.notes_col, self.chunks_col, self.edges_col = _cols(self.cfg.prefix) # ------------------------ fetch helpers ------------------------ def _scroll(self, col: str, flt: Optional[rest.Filter] = None, limit: int = 256): out = [] nextp = None while True: pts, nextp = self.client.scroll( collection_name=col, with_payload=True, with_vectors=False, limit=limit, offset=nextp, scroll_filter=flt, ) if not pts: break out.extend(pts) if nextp is None: break return out # ------------------------ public API --------------------------- def get_note(self, note_id: str) -> Optional[Dict[str, Any]]: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) pts, _ = self.client.scroll(self.notes_col, with_payload=True, with_vectors=False, limit=1, scroll_filter=f) return (pts[0].payload or None) if pts else None def get_chunks(self, note_id: str) -> List[Dict[str, Any]]: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) pts = self._scroll(self.chunks_col, f) # Sortierung analog Export def key(pl): p = pl.payload or {} s = p.get("seq") or 0 ci = p.get("chunk_index") or 0 n = 0 cid = p.get("chunk_id") or "" if isinstance(cid, str) and "#" in cid: try: n = int(cid.rsplit("#", 1)[-1]) except Exception: n = 0 return (int(s), int(ci), n) pts_sorted = sorted(pts, key=key) return [p.payload or {} for p in pts_sorted] def neighbors(self, source_id: str, kinds: Optional[Iterable[str]] = None, scope: Optional[Iterable[str]] = None, depth: int = 1) -> Dict[str, List[Dict[str, Any]]]: """ Liefert eingehende & ausgehende Nachbarn (nur nach kind gefiltert). depth==1: direkte Kanten. """ kinds = list(kinds) if kinds else None must = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=source_id))] if kinds: must.append(rest.FieldCondition(key="kind", match=rest.MatchAny(any=kinds))) f = rest.Filter(must=must) edges = self._scroll(self.edges_col, f) out = {"out": [], "in": []} for e in edges: out["out"].append(e.payload or {}) # Inverse Richtung (eingehend) must_in = [rest.FieldCondition(key="target_id", match=rest.MatchValue(value=source_id))] if kinds: must_in.append(rest.FieldCondition(key="kind", match=rest.MatchAny(any=kinds))) f_in = rest.Filter(must=must_in) edges_in = self._scroll(self.edges_col, f_in) for e in edges_in: out["in"].append(e.payload or {}) return out def walk_bfs(self, source_id: str, kinds: Iterable[str], max_depth: int = 2) -> Set[str]: visited: Set[str] = {source_id} frontier: Set[str] = {source_id} kinds = list(kinds) for _ in range(max_depth): nxt: Set[str] = set() for s in frontier: neigh = self.neighbors(s, kinds=kinds) for e in neigh["out"]: t = e.get("target_id") if isinstance(t, str) and t not in visited: visited.add(t) nxt.add(t) frontier = nxt if not frontier: break return visited def context_for_note(self, note_id: str, kinds: Iterable[str] = ("references","backlink"), max_neighbors: int = 12) -> Dict[str, Any]: """ Heuristischer Kontext: eigene Chunks + Nachbarn nach Kantenarten, dedupliziert. """ note = self.get_note(note_id) or {} chunks = self.get_chunks(note_id) neigh = self.neighbors(note_id, kinds=list(kinds)) targets = [] for e in neigh["out"]: t = e.get("target_id") if isinstance(t, str): targets.append(t) for e in neigh["in"]: s = e.get("source_id") if isinstance(s, str): targets.append(s) # de-dupe seen = set() uniq = [] for t in targets: if t not in seen: seen.add(t) uniq.append(t) uniq = uniq[:max_neighbors] neighbor_notes = [self.get_note(t) for t in uniq] return { "note": note, "chunks": chunks, "neighbors": [n for n in neighbor_notes if n], "edges_out": neigh["out"], "edges_in": neigh["in"], } # Optional: Mini-CLI if __name__ == "__main__": # pragma: no cover import argparse, json ap = argparse.ArgumentParser() ap.add_argument("--prefix", help="Collection-Prefix (überschreibt ENV)") ap.add_argument("--note-id", required=True) ap.add_argument("--neighbors", action="store_true", help="Nur Nachbarn anzeigen") args = ap.parse_args() svc = GraphService(prefix=args.prefix) if args.neighbors: out = svc.neighbors(args.note_id, kinds=["references","backlink","prev","next","belongs_to"]) else: out = svc.context_for_note(args.note_id) print(json.dumps(out, ensure_ascii=False, indent=2))