#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Modul: app/core/edges.py Version: 1.0.0 Datum: 2025-09-09 Zweck ----- Zentrale, konsistente Erzeugung von Edge-Payloads im **neuen Schema**: - kind : "belongs_to" | "next" | "prev" | "references" | "backlink" - source_id : ID des Quellknotens (Chunk- oder Note-ID) - target_id : ID des Zielknotens - scope : "chunk" | "note" - note_id : Owner-Note (für performantes Filtern/Löschen) - seq : optional (z. B. Reihenfolge von Vorkommen) Hinweise -------- - Edges werden dedupliziert (key=(kind,source_id,target_id,scope)). - Für Chunk-Edges wird `note_id` aus dem Chunk-Payload entnommen. - Für Note-Scope-Edges ist `note_id` die Quell-Note-ID. """ from __future__ import annotations from typing import Dict, List def build_edges_for_note( note_id: str, chunk_payloads: List[Dict], note_level_refs: List[str] | None, *, include_note_scope_refs: bool = False, ) -> List[Dict]: edges: List[Dict] = [] # Chunk-Scope: belongs_to / prev / next / references for ch in chunk_payloads: cid = ch["id"] owner = ch.get("note_id") or note_id # belongs_to edges.append({ "kind": "belongs_to", "source_id": cid, "target_id": note_id, "scope": "chunk", "note_id": owner, }) # Nachbarn nb = ch.get("neighbors") or {} prev_id = nb.get("prev") next_id = nb.get("next") if prev_id: edges.append({ "kind": "prev", "source_id": cid, "target_id": prev_id, "scope": "chunk", "note_id": owner, }) edges.append({ "kind": "next", "source_id": prev_id, "target_id": cid, "scope": "chunk", "note_id": owner, }) if next_id: edges.append({ "kind": "next", "source_id": cid, "target_id": next_id, "scope": "chunk", "note_id": owner, }) edges.append({ "kind": "prev", "source_id": next_id, "target_id": cid, "scope": "chunk", "note_id": owner, }) # references aus Chunk for ref in (ch.get("references") or []): tid = ref.get("target_id") if not tid: continue edges.append({ "kind": "references", "source_id": cid, "target_id": tid, "scope": "chunk", "note_id": owner, }) # Note-Scope: backlink (immer); references (optional) unique_refs = list(dict.fromkeys(note_level_refs or [])) for tid in unique_refs: if include_note_scope_refs: edges.append({ "kind": "references", "source_id": note_id, "target_id": tid, "scope": "note", "note_id": note_id, }) edges.append({ "kind": "backlink", "source_id": tid, "target_id": note_id, "scope": "note", "note_id": note_id, }) # Dedupe dedup = {} for e in edges: k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", "")) dedup[k] = e return list(dedup.values())