diff --git a/app/graph/service.py b/app/graph/service.py new file mode 100644 index 0000000..40b430e --- /dev/null +++ b/app/graph/service.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Modul: app/graph/service.py +Version: 0.1.0 +Datum: 2025-09-10 + +Zweck +----- +Leichtgewichtiger Graph-Layer über Qdrant: + - get_note(note_id) + - get_chunks(note_id) + - neighbors(source_id, kinds=[...], scope=['note','chunk'], depth=1) + - walk_bfs(source_id, kinds, max_depth) + - context_for_note(note_id, max_neighbors): heuristische Kontextsammlung + +Hinweise +-------- +- Nutzt die bestehenden Collections _notes/_chunks/_edges. +- Edges werden über Payload-Felder (`kind`, `source_id`, `target_id`) abgefragt. +""" +from __future__ import annotations +from typing import List, Dict, Any, Optional, Iterable, Set, Tuple +from qdrant_client.http import models as rest +from app.core.qdrant import QdrantConfig, get_client + +def _cols(prefix: str): + return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" + +class GraphService: + def __init__(self, cfg: Optional[QdrantConfig] = None, prefix: Optional[str] = None): + self.cfg = cfg or QdrantConfig.from_env() + if prefix: + self.cfg.prefix = prefix + self.client = get_client(self.cfg) + self.notes_col, self.chunks_col, self.edges_col = _cols(self.cfg.prefix) + + # ------------------------ fetch helpers ------------------------ + def _scroll(self, col: str, flt: Optional[rest.Filter] = None, limit: int = 256): + out = [] + nextp = None + while True: + pts, nextp = self.client.scroll( + collection_name=col, + with_payload=True, + with_vectors=False, + limit=limit, + offset=nextp, + scroll_filter=flt, + ) + if not pts: + break + out.extend(pts) + if nextp is None: + break + return out + + # ------------------------ public API --------------------------- + def get_note(self, note_id: str) -> Optional[Dict[str, Any]]: + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + pts, _ = self.client.scroll(self.notes_col, with_payload=True, with_vectors=False, limit=1, scroll_filter=f) + return (pts[0].payload or None) if pts else None + + def get_chunks(self, note_id: str) -> List[Dict[str, Any]]: + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + pts = self._scroll(self.chunks_col, f) + # Sortierung analog Export + def key(pl): + p = pl.payload or {} + s = p.get("seq") or 0 + ci = p.get("chunk_index") or 0 + n = 0 + cid = p.get("chunk_id") or "" + if isinstance(cid, str) and "#" in cid: + try: + n = int(cid.rsplit("#", 1)[-1]) + except Exception: + n = 0 + return (int(s), int(ci), n) + pts_sorted = sorted(pts, key=key) + return [p.payload or {} for p in pts_sorted] + + def neighbors(self, source_id: str, kinds: Optional[Iterable[str]] = None, + scope: Optional[Iterable[str]] = None, depth: int = 1) -> Dict[str, List[Dict[str, Any]]]: + """ + Liefert eingehende & ausgehende Nachbarn (nur nach kind gefiltert). + depth==1: direkte Kanten. + """ + kinds = list(kinds) if kinds else None + must = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=source_id))] + if kinds: + must.append(rest.FieldCondition(key="kind", match=rest.MatchAny(any=kinds))) + f = rest.Filter(must=must) + edges = self._scroll(self.edges_col, f) + out = {"out": [], "in": []} + for e in edges: + out["out"].append(e.payload or {}) + # Inverse Richtung (eingehend) + must_in = [rest.FieldCondition(key="target_id", match=rest.MatchValue(value=source_id))] + if kinds: + must_in.append(rest.FieldCondition(key="kind", match=rest.MatchAny(any=kinds))) + f_in = rest.Filter(must=must_in) + edges_in = self._scroll(self.edges_col, f_in) + for e in edges_in: + out["in"].append(e.payload or {}) + return out + + def walk_bfs(self, source_id: str, kinds: Iterable[str], max_depth: int = 2) -> Set[str]: + visited: Set[str] = {source_id} + frontier: Set[str] = {source_id} + kinds = list(kinds) + for _ in range(max_depth): + nxt: Set[str] = set() + for s in frontier: + neigh = self.neighbors(s, kinds=kinds) + for e in neigh["out"]: + t = e.get("target_id") + if isinstance(t, str) and t not in visited: + visited.add(t) + nxt.add(t) + frontier = nxt + if not frontier: + break + return visited + + def context_for_note(self, note_id: str, kinds: Iterable[str] = ("references","backlink"), max_neighbors: int = 12) -> Dict[str, Any]: + """ + Heuristischer Kontext: eigene Chunks + Nachbarn nach Kantenarten, dedupliziert. + """ + note = self.get_note(note_id) or {} + chunks = self.get_chunks(note_id) + neigh = self.neighbors(note_id, kinds=list(kinds)) + targets = [] + for e in neigh["out"]: + t = e.get("target_id") + if isinstance(t, str): + targets.append(t) + for e in neigh["in"]: + s = e.get("source_id") + if isinstance(s, str): + targets.append(s) + # de-dupe + seen = set() + uniq = [] + for t in targets: + if t not in seen: + seen.add(t) + uniq.append(t) + uniq = uniq[:max_neighbors] + neighbor_notes = [self.get_note(t) for t in uniq] + return { + "note": note, + "chunks": chunks, + "neighbors": [n for n in neighbor_notes if n], + "edges_out": neigh["out"], + "edges_in": neigh["in"], + } + +# Optional: Mini-CLI +if __name__ == "__main__": # pragma: no cover + import argparse, json + ap = argparse.ArgumentParser() + ap.add_argument("--prefix", help="Collection-Prefix (überschreibt ENV)") + ap.add_argument("--note-id", required=True) + ap.add_argument("--neighbors", action="store_true", help="Nur Nachbarn anzeigen") + args = ap.parse_args() + svc = GraphService(prefix=args.prefix) + if args.neighbors: + out = svc.neighbors(args.note_id, kinds=["references","backlink","prev","next","belongs_to"]) + else: + out = svc.context_for_note(args.note_id) + print(json.dumps(out, ensure_ascii=False, indent=2))