From fb2de78d6277d33e701de062452df8586be40c5d Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 17 Nov 2025 21:42:26 +0100 Subject: [PATCH] Dateien nach "app/core" hochladen --- app/core/derive_edges.py | 629 ++++++++++++++++++++++++--------------- 1 file changed, 383 insertions(+), 246 deletions(-) diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 0a10426..616f5d9 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,286 +1,423 @@ -# app/core/derive_edges.py +#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Mindnet V2 — Edge derivation +Modul: app/core/derive_edges.py +Zweck: +- Bewahrt bestehende Edgelogik (belongs_to, prev/next, references, backlink) +- Ergänzt typenbasierte Default-Kanten (edge_defaults aus config/types.yaml) +- Unterstützt "typed inline relations" ([[rel:KIND | Target]] / [[rel:KIND Target]]) +- Unterstützt Obsidian-Callouts (> [!edge] KIND: [[Target]] [[Target2]] ...) -Features preserved & extended: -- Structure edges: belongs_to, next, prev (deterministic order by index/ord/chunk_id) -- Explicit references from wikilinks [[Title]] -> kind="references", rule_id="explicit:wikilink", confidence=1.0 -- Inline relations: - A) [[rel: ]] -> kind from link, rule_id="inline:rel", confidence=0.95 - B) rel: [[Target1]] [[Target2]] ... -> supports multiple wikilinks on a single line -- Callout relations: - > [!edge] : [[Target1]] [[Target2]] -> rule_id="callout:edge", confidence=0.9 -- Type-based defaults (from types.yaml): - For each note type's `edge_defaults`, derive additional edges for every explicit/inline/callout target. - rule_id="edge_defaults::", confidence=0.7 - Symmetric kinds ("related_to","similar_to") also add reversed edges. -- De-duplication across all sources by (kind, scope, source_id, target_id, rule_id) -- Backward-compatible function signature - -Return: list[dict] with at least: - kind, scope, source_id, target_id, note_id, chunk_id, rule_id, confidence +Kompatibilität: +- build_edges_for_note(...) Signatur unverändert +- rule_id Werte exakt wie zuvor erwartet (ohne Versionssuffix): + * structure:belongs_to + * structure:order + * explicit:wikilink + * inline:rel + * callout:edge + * edge_defaults:: + * derived:backlink """ from __future__ import annotations +import os import re -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Iterable, List, Optional, Tuple, Set, Dict -# Kanten, die symmetrisch interpretiert werden (Rückkante wird erzeugt) -SYMMETRIC_KINDS = {"related_to", "similar_to"} +try: + import yaml # optional, nur für types.yaml +except Exception: # pragma: no cover + yaml = None -# ----------------------------------------------------------------------------- +# --------------------------------------------------------------------------- # # Utilities -# ----------------------------------------------------------------------------- +# --------------------------------------------------------------------------- # -def _get(d: Dict[str, Any], *keys: str, default: Any = None) -> Any: +def _get(d: dict, *keys, default=None): for k in keys: - if isinstance(d, dict) and k in d: + if k in d and d[k] is not None: return d[k] return default -def _safe_int(x: Any, default: int = 10**9) -> int: - try: - return int(x) - except Exception: - return default - -def _sort_chunks(chs: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Sort chunks primarily by index, then ord, then chunk_id to guarantee stable next/prev.""" - return sorted(chs, key=lambda ch: ( - _get(ch, "index") is None, _safe_int(_get(ch, "index")), - _get(ch, "ord") is None, _safe_int(_get(ch, "ord")), - str(_get(ch, "chunk_id") or _get(ch, "id") or "") - )) - -def _make_edge(kind: str, - source_id: str, - target_id: str, - note_id: str, - chunk_id: str, - rule_id: str, - confidence: float, - scope: str = "chunk") -> Dict[str, Any]: - return { - "kind": kind, - "scope": scope, - "source_id": source_id, - "target_id": target_id, - "note_id": note_id, - "chunk_id": chunk_id, - "rule_id": rule_id, - "confidence": float(confidence), - } - -def _dedupe_key(e: Dict[str, Any]) -> Tuple[Any, ...]: +def _chunk_text_for_refs(chunk: dict) -> str: + # bevorzugt 'window' → dann 'text' → 'content' → 'raw' return ( - e.get("kind"), - e.get("scope"), - e.get("source_id"), - e.get("target_id"), - e.get("rule_id"), + _get(chunk, "window") + or _get(chunk, "text") + or _get(chunk, "content") + or _get(chunk, "raw") + or "" ) -# ----------------------------------------------------------------------------- -# Parsing helpers -# ----------------------------------------------------------------------------- +def _dedupe_seq(seq: Iterable[str]) -> List[str]: + seen: Set[str] = set() + out: List[str] = [] + for s in seq: + if s not in seen: + seen.add(s) + out.append(s) + return out -RE_WIKILINK = re.compile(r"\[\[([^\]]+?)\]\]") -RE_INLINE_BRACKET = re.compile( - r"\[\[\s*rel\s*:\s*(?P[a-z_][a-z0-9_]*)\s+(?P[^\]]+?)\s*\]\]", - flags=re.IGNORECASE, -) -RE_INLINE_PREFIX = re.compile( - r"(?m)\brel\s*:\s*(?P[a-z_][a-z0-9_]*)\s+(?P(?:\[\[[^\]]+\]\]\s*)+)", - flags=re.IGNORECASE, -) -RE_CALLOUT = re.compile( - r"(?m)^\s*>\s*\[!edge\]\s*(?P[a-z_][a-z0-9_]*)\s*:\s*(?P.+)$", - flags=re.IGNORECASE, -) +def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: + pl = { + "kind": kind, + "relation": kind, # Alias (v2) + "scope": scope, # "chunk" | "note" + "source_id": source_id, + "target_id": target_id, + "note_id": note_id, # Träger-Note der Kante + } + if extra: + pl.update(extra) + return pl + +def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: + base = f"{kind}:{s}->{t}#{scope}" + if rule_id: + base += f"|{rule_id}" + try: + import hashlib + return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() + except Exception: # pragma: no cover + return base + +# --------------------------------------------------------------------------- # +# Typen-Registry (types.yaml) +# --------------------------------------------------------------------------- # + +def _env(n: str, default: Optional[str] = None) -> str: + v = os.getenv(n) + return v if v is not None else (default or "") + +def _load_types_registry() -> dict: + """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" + p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") + if not os.path.isfile(p) or yaml is None: + return {} + try: + with open(p, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + return data + except Exception: + return {} + +def _get_types_map(reg: dict) -> dict: + if isinstance(reg, dict) and isinstance(reg.get("types"), dict): + return reg["types"] + return reg if isinstance(reg, dict) else {} + +def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: + """ + Liefert die edge_defaults-Liste für den gegebenen Notiztyp. + Fallback-Reihenfolge: + 1) reg['types'][note_type]['edge_defaults'] + 2) reg['defaults']['edge_defaults'] (oder 'default'/'global') + 3) [] + """ + types_map = _get_types_map(reg) + if note_type and isinstance(types_map, dict): + t = types_map.get(note_type) + if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): + return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] + for key in ("defaults", "default", "global"): + v = reg.get(key) + if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): + return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] + return [] + +# --------------------------------------------------------------------------- # +# Parser für Links / Relationen +# --------------------------------------------------------------------------- # + +# Normale Wikilinks (Fallback) +_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") + +# Getypte Inline-Relationen: +# [[rel:depends_on | Target]] +# [[rel:related_to Target]] +_REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) +_REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) + +def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: + """ + Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, + damit die generische Wikilink-Erkennung sie nicht doppelt zählt. + """ + pairs: List[Tuple[str,str]] = [] + def _collect(m): + k = (m.group("kind") or "").strip().lower() + t = (m.group("target") or "").strip() + if k and t: + pairs.append((k, t)) + return "" # Link entfernen + text = _REL_PIPE.sub(_collect, text) + text = _REL_SPACE.sub(_collect, text) + return pairs, text + +# Obsidian Callout Parser +_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) +_REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) +_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") + +def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: + """ + Findet [!edge]-Callouts und extrahiert (kind, target). Entfernt den gesamten + Callout-Block aus dem Text (damit Wikilinks daraus nicht zusätzlich als + "references" gezählt werden). + """ + if not text: + return [], text + + lines = text.splitlines() + out_pairs: List[Tuple[str,str]] = [] + keep_lines: List[str] = [] + i = 0 + + while i < len(lines): + m = _CALLOUT_START.match(lines[i]) + if not m: + keep_lines.append(lines[i]) + i += 1 + continue + + block_lines: List[str] = [] + first_rest = m.group(1) or "" + if first_rest.strip(): + block_lines.append(first_rest) + + i += 1 + while i < len(lines) and lines[i].lstrip().startswith('>'): + block_lines.append(lines[i].lstrip()[1:].lstrip()) + i += 1 + + for bl in block_lines: + mrel = _REL_LINE.match(bl) + if not mrel: + continue + kind = (mrel.group("kind") or "").strip().lower() + targets = mrel.group("targets") or "" + found = _WIKILINKS_IN_LINE.findall(targets) + if found: + for t in found: + t = t.strip() + if t: + out_pairs.append((kind, t)) + else: + for raw in re.split(r"[,;]", targets): + t = raw.strip() + if t: + out_pairs.append((kind, t)) + + # Callout wird NICHT in keep_lines übernommen + continue + + remainder = "\n".join(keep_lines) + return out_pairs, remainder def _extract_wikilinks(text: str) -> List[str]: - """All [[Title]] except those starting with 'rel:' (reserved for inline relations).""" - targets: List[str] = [] - for m in RE_WIKILINK.finditer(text or ""): - label = m.group(1).strip() - if label.lower().startswith("rel:"): - # handled by explicit inline relation parser - continue - if label: - targets.append(label) - return targets + ids: List[str] = [] + for m in _WIKILINK_RE.finditer(text or ""): + ids.append(m.group(1).strip()) + return ids -def _extract_inline_relations(text: str) -> List[Tuple[str, str]]: - """Find inline relations from: - A) [[rel:kind Target]] - B) rel: kind [[Target1]] [[Target2]] (supports multiple targets on one line) +# --------------------------------------------------------------------------- # +# Hauptfunktion +# --------------------------------------------------------------------------- # + +def build_edges_for_note( + note_id: str, + chunks: List[dict], + note_level_references: Optional[List[str]] = None, + include_note_scope_refs: bool = False, +) -> List[dict]: """ - out: List[Tuple[str, str]] = [] + Erzeugt Kanten für eine Note. - # A) Bracket form - for m in RE_INLINE_BRACKET.finditer(text or ""): - kind = m.group("kind").strip().lower() - target = m.group("target").strip() - if kind and target: - out.append((kind, target)) + - belongs_to: für jeden Chunk (chunk -> note) + - next / prev: zwischen aufeinanderfolgenden Chunks + - references: pro Chunk aus window/text (via Wikilinks) + - typed inline relations: [[rel:KIND | Target]] oder [[rel:KIND Target]] + - Obsidian Callouts: > [!edge] KIND: [[Target]] [[Target2]] + - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references + - typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz + """ + edges: List[dict] = [] - # B) Prefix form with multiple wikilinks - for m in RE_INLINE_PREFIX.finditer(text or ""): - kind = m.group("kind").strip().lower() - links = m.group("links") - for lm in RE_WIKILINK.finditer(links or ""): - target = lm.group(1).strip() - if target: - out.append((kind, target)) + # Note-Typ (aus erstem Chunk erwartet) + note_type = None + if chunks: + note_type = _get(chunks[0], "type") - return out - -def _extract_callout_relations(text: str) -> List[Tuple[str, str]]: - """Find callout > [!edge] kind: [[Target1]] [[Target2]] | Target, Target2""" - out: List[Tuple[str, str]] = [] - for m in RE_CALLOUT.finditer(text or ""): - kind = (m.group("kind") or "").strip().lower() - body = m.group("body") or "" - # Prefer wikilinks; fallback: split by comma - wl = [mm.group(1).strip() for mm in RE_WIKILINK.finditer(body)] - if wl: - out.extend((kind, t) for t in wl if t) - else: - for raw in body.split(","): - t = raw.strip() - if t: - out.append((kind, t)) - return out - -# ----------------------------------------------------------------------------- -# Edge builder -# ----------------------------------------------------------------------------- - -def _edge_defaults_for_type(types_cfg: Optional[Dict[str, Any]], note_type: Optional[str]) -> List[str]: - """Read defaults from types.yaml structure: {'types': {: {'edge_defaults': [..]}}}""" - if not types_cfg or not note_type: - return [] - t = types_cfg.get("types", {}).get(note_type, {}) - vals = t.get("edge_defaults", []) or [] - return [str(v) for v in vals if isinstance(v, (str,))] - -def _append_with_dedupe(edges: List[Dict[str, Any]], new_edges: Iterable[Dict[str, Any]]) -> None: - seen = { _dedupe_key(e) for e in edges } - for e in new_edges: - k = _dedupe_key(e) - if k in seen: - continue - edges.append(e) - seen.add(k) - -def _structure_edges(note_id: str, title: str, chunks: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: - edges: List[Dict[str, Any]] = [] - ordered = _sort_chunks(chunks) - # belongs_to - for ch in ordered: + # 1) belongs_to + for ch in chunks: cid = _get(ch, "chunk_id", "id") - if not cid: + if not cid: continue - edges.append(_make_edge("belongs_to", cid, note_id, note_id, cid, "structure:belongs_to", 1.0)) - # next/prev - for i in range(len(ordered) - 1): - a, b = ordered[i], ordered[i + 1] - a_id = _get(a, "chunk_id", "id"); b_id = _get(b, "chunk_id", "id") + edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), + "provenance": "rule", + "rule_id": "structure:belongs_to", + "confidence": 1.0, + })) + + # 2) next / prev + for i in range(len(chunks) - 1): + a, b = chunks[i], chunks[i + 1] + a_id = _get(a, "chunk_id", "id") + b_id = _get(b, "chunk_id", "id") if not a_id or not b_id: continue - edges.append(_make_edge("next", a_id, b_id, note_id, a_id, "structure:next", 1.0)) - edges.append(_make_edge("prev", b_id, a_id, note_id, b_id, "structure:prev", 1.0)) - return edges + edges.append(_edge("next", "chunk", a_id, b_id, note_id, { + "chunk_id": a_id, + "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"), + "provenance": "rule", + "rule_id": "structure:order", + "confidence": 0.95, + })) + edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { + "chunk_id": b_id, + "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"), + "provenance": "rule", + "rule_id": "structure:order", + "confidence": 0.95, + })) -def _explicit_and_inline_edges(note_id: str, - title: str, - note_type: Optional[str], - chunks: Sequence[Dict[str, Any]], - types_cfg: Optional[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[str]]: - """Return (edges, referenced_targets)""" - edges: List[Dict[str, Any]] = [] - referenced: List[str] = [] + # 3) references + typed inline + callouts + defaults (chunk-scope) + reg = _load_types_registry() + defaults = _edge_defaults_for(note_type, reg) + refs_all: List[str] = [] - defaults_kinds = set(_edge_defaults_for_type(types_cfg, note_type)) - - for ch in _sort_chunks(chunks): + for ch in chunks: cid = _get(ch, "chunk_id", "id") - text = _get(ch, "text", "window", default="") or "" + if not cid: + continue + raw = _chunk_text_for_refs(ch) - # --- Explicit wikilinks -> references - for target in _extract_wikilinks(text): - referenced.append(target) - edges.append(_make_edge("references", cid, target, note_id, cid, "explicit:wikilink", 1.0)) - - # --- Inline relations (both forms) - for kind, target in _extract_inline_relations(text): - referenced.append(target) - # forward - edges.append(_make_edge(kind, cid, target, note_id, cid, "inline:rel", 0.95)) - # symmetric reverse - if kind in SYMMETRIC_KINDS: - edges.append(_make_edge(kind, target, cid, note_id, cid, "inline:rel", 0.95)) - - # --- Callout relations - for kind, target in _extract_callout_relations(text): - referenced.append(target) - edges.append(_make_edge(kind, cid, target, note_id, cid, "callout:edge", 0.90)) - if kind in SYMMETRIC_KINDS: - edges.append(_make_edge(kind, target, cid, note_id, cid, "callout:edge", 0.90)) - - # --- Type-based defaults: apply to the union of referenced targets - if defaults_kinds and referenced: - uniq_targets = list(dict.fromkeys(referenced)) # preserve order, drop dups - for ch in _sort_chunks(chunks): - cid = _get(ch, "chunk_id", "id") - if not cid: + # 3a) typed inline relations + typed, remainder = _extract_typed_relations(raw) + for kind, target in typed: + kind = kind.strip().lower() + if not kind or not target: continue - for kind in defaults_kinds: - for target in uniq_targets: - edges.append(_make_edge(kind, cid, target, note_id, cid, f"edge_defaults:{note_type}:{kind}", 0.70)) - if kind in SYMMETRIC_KINDS: - edges.append(_make_edge(kind, target, cid, note_id, cid, f"edge_defaults:{note_type}:{kind}", 0.70)) + edges.append(_edge(kind, "chunk", cid, target, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel"), + "provenance": "explicit", + "rule_id": "inline:rel", + "confidence": 0.95, + })) + if kind in {"related_to", "similar_to"}: + edges.append(_edge(kind, "chunk", target, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel"), + "provenance": "explicit", + "rule_id": "inline:rel", + "confidence": 0.95, + })) - return edges, referenced + # 3b) callouts + call_pairs, remainder2 = _extract_callout_relations(remainder) + for kind, target in call_pairs: + k = (kind or "").strip().lower() + if not k or not target: + continue + edges.append(_edge(k, "chunk", cid, target, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"), + "provenance": "explicit", + "rule_id": "callout:edge", + "confidence": 0.95, + })) + if k in {"related_to", "similar_to"}: + edges.append(_edge(k, "chunk", target, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge"), + "provenance": "explicit", + "rule_id": "callout:edge", + "confidence": 0.95, + })) -# ----------------------------------------------------------------------------- -# Public API (backward-compatible) -# ----------------------------------------------------------------------------- + # 3c) generische Wikilinks → references (+ defaults je Ref) + refs = _extract_wikilinks(remainder2) + for r in refs: + edges.append(_edge("references", "chunk", cid, r, note_id, { + "chunk_id": cid, + "ref_text": r, + "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), + "provenance": "explicit", + "rule_id": "explicit:wikilink", + "confidence": 1.0, + })) + for rel in defaults: + if rel == "references": + continue + edges.append(_edge(rel, "chunk", cid, r, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}", + "confidence": 0.7, + })) + if rel in {"related_to", "similar_to"}: + edges.append(_edge(rel, "chunk", r, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}", + "confidence": 0.7, + })) + refs_all.extend(refs) -def build_edges_for_note(*args, **kwargs) -> List[Dict[str, Any]]: - """ - Backward-compatible entry point. + # 4) optional note-scope refs/backlinks (+ defaults) + if include_note_scope_refs: + refs_note = list(refs_all or []) + if note_level_references: + refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) + refs_note = _dedupe_seq(refs_note) + for r in refs_note: + edges.append(_edge("references", "note", note_id, r, note_id, { + "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), + "provenance": "explicit", + "rule_id": "explicit:note_scope", + "confidence": 1.0, + })) + edges.append(_edge("backlink", "note", r, note_id, note_id, { + "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), + "provenance": "rule", + "rule_id": "derived:backlink", + "confidence": 0.9, + })) + for rel in defaults: + if rel == "references": + continue + edges.append(_edge(rel, "note", note_id, r, note_id, { + "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}", + "confidence": 0.7, + })) + if rel in {"related_to", "similar_to"}: + edges.append(_edge(rel, "note", r, note_id, note_id, { + "edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}", + "confidence": 0.7, + })) - Supported call styles: - 1) build_edges_for_note(note_id, title, note_type, chunks, types_cfg) - 2) build_edges_for_note(note_payload=dict, chunks=[...], types_cfg=dict) - where note_payload contains: note_id/id, title, type - """ - # Detect signature form - if args and isinstance(args[0], dict) and ("title" in args[0] or "type" in args[0] or "note_id" in args[0] or "id" in args[0]): - note = args[0] - chunks = args[1] if len(args) > 1 else kwargs.get("chunks", []) - types_cfg = args[2] if len(args) > 2 else kwargs.get("types_cfg") - note_id = _get(note, "note_id", "id") - title = _get(note, "title") or "" - note_type = _get(note, "type") - else: - # Positional legacy form - note_id = args[0] if len(args) > 0 else kwargs.get("note_id") - title = args[1] if len(args) > 1 else kwargs.get("title", "") - note_type = args[2] if len(args) > 2 else kwargs.get("note_type") - chunks = args[3] if len(args) > 3 else kwargs.get("chunks", []) - types_cfg = args[4] if len(args) > 4 else kwargs.get("types_cfg") - - chunks = list(chunks or []) - - # Structure edges - edges = _structure_edges(note_id, title, chunks) - - # Explicit / inline / callout / defaults - more, _ = _explicit_and_inline_edges(note_id, title, note_type, chunks, types_cfg) - _append_with_dedupe(edges, more) - - return edges + # 5) De-Dupe (source_id, target_id, relation, rule_id) + seen: Set[Tuple[str,str,str,str]] = set() + out: List[dict] = [] + for e in edges: + s = str(e.get("source_id") or "") + t = str(e.get("target_id") or "") + rel = str(e.get("relation") or e.get("kind") or "edge") + rule = str(e.get("rule_id") or "") + key = (s, t, rel, rule) + if key in seen: + continue + seen.add(key) + out.append(e) + return out