diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 6a89950..0a10426 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,435 +1,286 @@ -#!/usr/bin/env python3 +# app/core/derive_edges.py # -*- coding: utf-8 -*- """ -Modul: app/core/derive_edges.py -Zweck: -- Bewahrt bestehende Edgelogik (belongs_to, prev/next, references, backlink) -- Ergänzt typenbasierte Default-Kanten (edge_defaults aus config/types.yaml) -- Unterstützt "typed inline relations": - * [[rel:KIND | Target]] - * [[rel:KIND Target]] - * rel: KIND [[Target]] -- Unterstützt Obsidian-Callouts: - * > [!edge] KIND: [[Target]] [[Target2]] ... -Kompatibilität: -- build_edges_for_note(...) Signatur unverändert -- rule_id Werte: - * structure:belongs_to - * structure:order - * explicit:wikilink - * inline:rel - * callout:edge - * edge_defaults:: - * derived:backlink +Mindnet V2 — Edge derivation + +Features preserved & extended: +- Structure edges: belongs_to, next, prev (deterministic order by index/ord/chunk_id) +- Explicit references from wikilinks [[Title]] -> kind="references", rule_id="explicit:wikilink", confidence=1.0 +- Inline relations: + A) [[rel: ]] -> kind from link, rule_id="inline:rel", confidence=0.95 + B) rel: [[Target1]] [[Target2]] ... -> supports multiple wikilinks on a single line +- Callout relations: + > [!edge] : [[Target1]] [[Target2]] -> rule_id="callout:edge", confidence=0.9 +- Type-based defaults (from types.yaml): + For each note type's `edge_defaults`, derive additional edges for every explicit/inline/callout target. + rule_id="edge_defaults::", confidence=0.7 + Symmetric kinds ("related_to","similar_to") also add reversed edges. +- De-duplication across all sources by (kind, scope, source_id, target_id, rule_id) +- Backward-compatible function signature + +Return: list[dict] with at least: + kind, scope, source_id, target_id, note_id, chunk_id, rule_id, confidence """ from __future__ import annotations -import os import re -from typing import Iterable, List, Optional, Tuple, Set, Dict +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple -try: - import yaml # optional, nur für types.yaml -except Exception: # pragma: no cover - yaml = None +# Kanten, die symmetrisch interpretiert werden (Rückkante wird erzeugt) +SYMMETRIC_KINDS = {"related_to", "similar_to"} -# --------------------------------------------------------------------------- # +# ----------------------------------------------------------------------------- # Utilities -# --------------------------------------------------------------------------- # +# ----------------------------------------------------------------------------- -def _get(d: dict, *keys, default=None): +def _get(d: Dict[str, Any], *keys: str, default: Any = None) -> Any: for k in keys: - if isinstance(d, dict) and k in d and d[k] is not None: + if isinstance(d, dict) and k in d: return d[k] return default -def _chunk_text_for_refs(chunk: dict) -> str: - # bevorzugt 'window' → dann 'text' → 'content' → 'raw' - return ( - _get(chunk, "window") - or _get(chunk, "text") - or _get(chunk, "content") - or _get(chunk, "raw") - or "" - ) +def _safe_int(x: Any, default: int = 10**9) -> int: + try: + return int(x) + except Exception: + return default -def _dedupe_seq(seq: Iterable[str]) -> List[str]: - seen: Set[str] = set() - out: List[str] = [] - for s in seq: - if s not in seen: - seen.add(s) - out.append(s) - return out +def _sort_chunks(chs: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Sort chunks primarily by index, then ord, then chunk_id to guarantee stable next/prev.""" + return sorted(chs, key=lambda ch: ( + _get(ch, "index") is None, _safe_int(_get(ch, "index")), + _get(ch, "ord") is None, _safe_int(_get(ch, "ord")), + str(_get(ch, "chunk_id") or _get(ch, "id") or "") + )) -def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: - pl = { +def _make_edge(kind: str, + source_id: str, + target_id: str, + note_id: str, + chunk_id: str, + rule_id: str, + confidence: float, + scope: str = "chunk") -> Dict[str, Any]: + return { "kind": kind, - "relation": kind, # Alias (v2) - "scope": scope, # "chunk" | "note" + "scope": scope, "source_id": source_id, "target_id": target_id, - "note_id": note_id, # Träger-Note der Kante + "note_id": note_id, + "chunk_id": chunk_id, + "rule_id": rule_id, + "confidence": float(confidence), } - if extra: - pl.update(extra) - return pl -def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: - base = f"{kind}:{s}->{t}#{scope}" - if rule_id: - base += f"|{rule_id}" - try: - import hashlib - return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() - except Exception: # pragma: no cover - return base +def _dedupe_key(e: Dict[str, Any]) -> Tuple[Any, ...]: + return ( + e.get("kind"), + e.get("scope"), + e.get("source_id"), + e.get("target_id"), + e.get("rule_id"), + ) -# --------------------------------------------------------------------------- # -# Typen-Registry (types.yaml) -# --------------------------------------------------------------------------- # +# ----------------------------------------------------------------------------- +# Parsing helpers +# ----------------------------------------------------------------------------- -def _env(n: str, default: Optional[str] = None) -> str: - v = os.getenv(n) - return v if v is not None else (default or "") - -def _load_types_registry() -> dict: - """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" - p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") - if not os.path.isfile(p) or yaml is None: - return {} - try: - with open(p, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) or {} - return data - except Exception: - return {} - -def _get_types_map(reg: dict) -> dict: - if isinstance(reg, dict) and isinstance(reg.get("types"), dict): - return reg["types"] - return reg if isinstance(reg, dict) else {} - -def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: - """ - Liefert die edge_defaults-Liste für den gegebenen Notiztyp. - Fallback-Reihenfolge: - 1) reg['types'][note_type]['edge_defaults'] - 2) reg['defaults']['edge_defaults'] (oder 'default'/'global') - 3) [] - """ - types_map = _get_types_map(reg) - if note_type and isinstance(types_map, dict): - t = types_map.get(note_type) - if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): - return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] - for key in ("defaults", "default", "global"): - v = reg.get(key) - if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): - return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] - return [] - -# --------------------------------------------------------------------------- # -# Parser für Links / Relationen -# --------------------------------------------------------------------------- # - -# Normale Wikilinks (Fallback) -_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") - -# Getypte Inline-Relationen: -# [[rel:KIND | Target]] -# [[rel:KIND Target]] -_REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -_REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -# rel: KIND [[Target]] (reines Textmuster) -_REL_TEXT = re.compile(r"rel\s*:\s*(?P[a-z_]+)\s*\[\[\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) - -def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """ - Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, - damit die generische Wikilink-Erkennung sie nicht doppelt zählt. - Unterstützt drei Varianten: - - [[rel:KIND | Target]] - - [[rel:KIND Target]] - - rel: KIND [[Target]] - """ - pairs: List[Tuple[str,str]] = [] - def _collect(m): - k = (m.group("kind") or "").strip().lower() - t = (m.group("target") or "").strip() - if k and t: - pairs.append((k, t)) - return "" # Link entfernen - - text = _REL_PIPE.sub(_collect, text) - text = _REL_SPACE.sub(_collect, text) - text = _REL_TEXT.sub(_collect, text) - return pairs, text - -# Obsidian Callout Parser -_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) -_REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) -_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") - -def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """ - Findet [!edge]-Callouts und extrahiert (kind, target). Entfernt den gesamten - Callout-Block aus dem Text (damit Wikilinks daraus nicht zusätzlich als - "references" gezählt werden). - """ - if not text: - return [], text - - lines = text.splitlines() - out_pairs: List[Tuple[str,str]] = [] - keep_lines: List[str] = [] - i = 0 - - while i < len(lines): - m = _CALLOUT_START.match(lines[i]) - if not m: - keep_lines.append(lines[i]) - i += 1 - continue - - block_lines: List[str] = [] - first_rest = m.group(1) or "" - if first_rest.strip(): - block_lines.append(first_rest) - - i += 1 - while i < len(lines) and lines[i].lstrip().startswith('>'): - block_lines.append(lines[i].lstrip()[1:].lstrip()) - i += 1 - - for bl in block_lines: - mrel = _REL_LINE.match(bl) - if not mrel: - continue - kind = (mrel.group("kind") or "").strip().lower() - targets = mrel.group("targets") or "" - found = _WIKILINKS_IN_LINE.findall(targets) - if found: - for t in found: - t = t.strip() - if t: - out_pairs.append((kind, t)) - else: - for raw in re.split(r"[,;]", targets): - t = raw.strip() - if t: - out_pairs.append((kind, t)) - - # Callout wird NICHT in keep_lines übernommen - continue - - remainder = "\n".join(keep_lines) - return out_pairs, remainder +RE_WIKILINK = re.compile(r"\[\[([^\]]+?)\]\]") +RE_INLINE_BRACKET = re.compile( + r"\[\[\s*rel\s*:\s*(?P[a-z_][a-z0-9_]*)\s+(?P[^\]]+?)\s*\]\]", + flags=re.IGNORECASE, +) +RE_INLINE_PREFIX = re.compile( + r"(?m)\brel\s*:\s*(?P[a-z_][a-z0-9_]*)\s+(?P(?:\[\[[^\]]+\]\]\s*)+)", + flags=re.IGNORECASE, +) +RE_CALLOUT = re.compile( + r"(?m)^\s*>\s*\[!edge\]\s*(?P[a-z_][a-z0-9_]*)\s*:\s*(?P.+)$", + flags=re.IGNORECASE, +) def _extract_wikilinks(text: str) -> List[str]: - ids: List[str] = [] - for m in _WIKILINK_RE.finditer(text or ""): - ids.append(m.group(1).strip()) - return ids - -# --------------------------------------------------------------------------- # -# Hauptfunktion -# --------------------------------------------------------------------------- # - -def build_edges_for_note( - note_id: str, - chunks: List[dict], - note_level_references: Optional[List[str]] = None, - include_note_scope_refs: bool = False, -) -> List[dict]: - """ - Erzeugt Kanten für eine Note. - - - belongs_to: für jeden Chunk (chunk -> note) - - next / prev: zwischen aufeinanderfolgenden Chunks - - references: pro Chunk aus window/text (via Wikilinks) - - typed inline relations: [[rel:KIND | Target]] / [[rel:KIND Target]] / rel: KIND [[Target]] - - Obsidian Callouts: > [!edge] KIND: [[Target]] [[Target2]] - - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references - - typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz - """ - edges: List[dict] = [] - - # Note-Typ (aus erstem Chunk erwartet) - note_type = None - if chunks: - note_type = _get(chunks[0], "type") - - # 1) belongs_to - for ch in chunks: - cid = _get(ch, "chunk_id", "id") - if not cid: + """All [[Title]] except those starting with 'rel:' (reserved for inline relations).""" + targets: List[str] = [] + for m in RE_WIKILINK.finditer(text or ""): + label = m.group(1).strip() + if label.lower().startswith("rel:"): + # handled by explicit inline relation parser continue - edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), - "provenance": "rule", - "rule_id": "structure:belongs_to", - "confidence": 1.0, - })) + if label: + targets.append(label) + return targets - # 2) next / prev - for i in range(len(chunks) - 1): - a, b = chunks[i], chunks[i + 1] - a_id = _get(a, "chunk_id", "id") - b_id = _get(b, "chunk_id", "id") +def _extract_inline_relations(text: str) -> List[Tuple[str, str]]: + """Find inline relations from: + A) [[rel:kind Target]] + B) rel: kind [[Target1]] [[Target2]] (supports multiple targets on one line) + """ + out: List[Tuple[str, str]] = [] + + # A) Bracket form + for m in RE_INLINE_BRACKET.finditer(text or ""): + kind = m.group("kind").strip().lower() + target = m.group("target").strip() + if kind and target: + out.append((kind, target)) + + # B) Prefix form with multiple wikilinks + for m in RE_INLINE_PREFIX.finditer(text or ""): + kind = m.group("kind").strip().lower() + links = m.group("links") + for lm in RE_WIKILINK.finditer(links or ""): + target = lm.group(1).strip() + if target: + out.append((kind, target)) + + return out + +def _extract_callout_relations(text: str) -> List[Tuple[str, str]]: + """Find callout > [!edge] kind: [[Target1]] [[Target2]] | Target, Target2""" + out: List[Tuple[str, str]] = [] + for m in RE_CALLOUT.finditer(text or ""): + kind = (m.group("kind") or "").strip().lower() + body = m.group("body") or "" + # Prefer wikilinks; fallback: split by comma + wl = [mm.group(1).strip() for mm in RE_WIKILINK.finditer(body)] + if wl: + out.extend((kind, t) for t in wl if t) + else: + for raw in body.split(","): + t = raw.strip() + if t: + out.append((kind, t)) + return out + +# ----------------------------------------------------------------------------- +# Edge builder +# ----------------------------------------------------------------------------- + +def _edge_defaults_for_type(types_cfg: Optional[Dict[str, Any]], note_type: Optional[str]) -> List[str]: + """Read defaults from types.yaml structure: {'types': {: {'edge_defaults': [..]}}}""" + if not types_cfg or not note_type: + return [] + t = types_cfg.get("types", {}).get(note_type, {}) + vals = t.get("edge_defaults", []) or [] + return [str(v) for v in vals if isinstance(v, (str,))] + +def _append_with_dedupe(edges: List[Dict[str, Any]], new_edges: Iterable[Dict[str, Any]]) -> None: + seen = { _dedupe_key(e) for e in edges } + for e in new_edges: + k = _dedupe_key(e) + if k in seen: + continue + edges.append(e) + seen.add(k) + +def _structure_edges(note_id: str, title: str, chunks: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]: + edges: List[Dict[str, Any]] = [] + ordered = _sort_chunks(chunks) + # belongs_to + for ch in ordered: + cid = _get(ch, "chunk_id", "id") + if not cid: + continue + edges.append(_make_edge("belongs_to", cid, note_id, note_id, cid, "structure:belongs_to", 1.0)) + # next/prev + for i in range(len(ordered) - 1): + a, b = ordered[i], ordered[i + 1] + a_id = _get(a, "chunk_id", "id"); b_id = _get(b, "chunk_id", "id") if not a_id or not b_id: continue - edges.append(_edge("next", "chunk", a_id, b_id, note_id, { - "chunk_id": a_id, - "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"), - "provenance": "rule", - "rule_id": "structure:order", - "confidence": 0.95, - })) - edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { - "chunk_id": b_id, - "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"), - "provenance": "rule", - "rule_id": "structure:order", - "confidence": 0.95, - })) + edges.append(_make_edge("next", a_id, b_id, note_id, a_id, "structure:next", 1.0)) + edges.append(_make_edge("prev", b_id, a_id, note_id, b_id, "structure:prev", 1.0)) + return edges - # 3) references + typed inline + callouts + defaults (chunk-scope) - reg = _load_types_registry() - defaults = _edge_defaults_for(note_type, reg) - refs_all: List[str] = [] +def _explicit_and_inline_edges(note_id: str, + title: str, + note_type: Optional[str], + chunks: Sequence[Dict[str, Any]], + types_cfg: Optional[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[str]]: + """Return (edges, referenced_targets)""" + edges: List[Dict[str, Any]] = [] + referenced: List[str] = [] - for ch in chunks: + defaults_kinds = set(_edge_defaults_for_type(types_cfg, note_type)) + + for ch in _sort_chunks(chunks): cid = _get(ch, "chunk_id", "id") - if not cid: - continue - raw = _chunk_text_for_refs(ch) + text = _get(ch, "text", "window", default="") or "" - # 3a) typed inline relations - typed, remainder = _extract_typed_relations(raw) - for kind, target in typed: - kind = kind.strip().lower() - if not kind or not target: + # --- Explicit wikilinks -> references + for target in _extract_wikilinks(text): + referenced.append(target) + edges.append(_make_edge("references", cid, target, note_id, cid, "explicit:wikilink", 1.0)) + + # --- Inline relations (both forms) + for kind, target in _extract_inline_relations(text): + referenced.append(target) + # forward + edges.append(_make_edge(kind, cid, target, note_id, cid, "inline:rel", 0.95)) + # symmetric reverse + if kind in SYMMETRIC_KINDS: + edges.append(_make_edge(kind, target, cid, note_id, cid, "inline:rel", 0.95)) + + # --- Callout relations + for kind, target in _extract_callout_relations(text): + referenced.append(target) + edges.append(_make_edge(kind, cid, target, note_id, cid, "callout:edge", 0.90)) + if kind in SYMMETRIC_KINDS: + edges.append(_make_edge(kind, target, cid, note_id, cid, "callout:edge", 0.90)) + + # --- Type-based defaults: apply to the union of referenced targets + if defaults_kinds and referenced: + uniq_targets = list(dict.fromkeys(referenced)) # preserve order, drop dups + for ch in _sort_chunks(chunks): + cid = _get(ch, "chunk_id", "id") + if not cid: continue - edges.append(_edge(kind, "chunk", cid, target, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel"), - "provenance": "explicit", - "rule_id": "inline:rel", - "confidence": 0.95, - })) - if kind in {"related_to", "similar_to"}: - edges.append(_edge(kind, "chunk", target, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel"), - "provenance": "explicit", - "rule_id": "inline:rel", - "confidence": 0.95, - })) + for kind in defaults_kinds: + for target in uniq_targets: + edges.append(_make_edge(kind, cid, target, note_id, cid, f"edge_defaults:{note_type}:{kind}", 0.70)) + if kind in SYMMETRIC_KINDS: + edges.append(_make_edge(kind, target, cid, note_id, cid, f"edge_defaults:{note_type}:{kind}", 0.70)) - # 3b) callouts - call_pairs, remainder2 = _extract_callout_relations(remainder) - for kind, target in call_pairs: - k = (kind or "").strip().lower() - if not k or not target: - continue - edges.append(_edge(k, "chunk", cid, target, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"), - "provenance": "explicit", - "rule_id": "callout:edge", - "confidence": 0.95, - })) - if k in {"related_to", "similar_to"}: - edges.append(_edge(k, "chunk", target, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge"), - "provenance": "explicit", - "rule_id": "callout:edge", - "confidence": 0.95, - })) + return edges, referenced - # 3c) generische Wikilinks → references (+ defaults je Ref) - refs = _extract_wikilinks(remainder2) - for r in refs: - edges.append(_edge("references", "chunk", cid, r, note_id, { - "chunk_id": cid, - "ref_text": r, - "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), - "provenance": "explicit", - "rule_id": "explicit:wikilink", - "confidence": 1.0, - })) - for rel in defaults: - if rel == "references": - continue - edges.append(_edge(rel, "chunk", cid, r, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) - if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "chunk", r, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) +# ----------------------------------------------------------------------------- +# Public API (backward-compatible) +# ----------------------------------------------------------------------------- - refs_all.extend(refs) +def build_edges_for_note(*args, **kwargs) -> List[Dict[str, Any]]: + """ + Backward-compatible entry point. - # 4) optional note-scope refs/backlinks (+ defaults) - if include_note_scope_refs: - refs_note = list(refs_all or []) - if note_level_references: - refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) - refs_note = _dedupe_seq(refs_note) - for r in refs_note: - edges.append(_edge("references", "note", note_id, r, note_id, { - "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), - "provenance": "explicit", - "rule_id": "explicit:note_scope", - "confidence": 1.0, - })) - edges.append(_edge("backlink", "note", r, note_id, note_id, { - "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), - "provenance": "rule", - "rule_id": "derived:backlink", - "confidence": 0.9, - })) - for rel in defaults: - if rel == "references": - continue - edges.append(_edge(rel, "note", note_id, r, note_id, { - "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) - if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "note", r, note_id, note_id, { - "edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) + Supported call styles: + 1) build_edges_for_note(note_id, title, note_type, chunks, types_cfg) + 2) build_edges_for_note(note_payload=dict, chunks=[...], types_cfg=dict) + where note_payload contains: note_id/id, title, type + """ + # Detect signature form + if args and isinstance(args[0], dict) and ("title" in args[0] or "type" in args[0] or "note_id" in args[0] or "id" in args[0]): + note = args[0] + chunks = args[1] if len(args) > 1 else kwargs.get("chunks", []) + types_cfg = args[2] if len(args) > 2 else kwargs.get("types_cfg") + note_id = _get(note, "note_id", "id") + title = _get(note, "title") or "" + note_type = _get(note, "type") + else: + # Positional legacy form + note_id = args[0] if len(args) > 0 else kwargs.get("note_id") + title = args[1] if len(args) > 1 else kwargs.get("title", "") + note_type = args[2] if len(args) > 2 else kwargs.get("note_type") + chunks = args[3] if len(args) > 3 else kwargs.get("chunks", []) + types_cfg = args[4] if len(args) > 4 else kwargs.get("types_cfg") - # 5) De-Dupe (source_id, target_id, relation, rule_id) - seen: Set[Tuple[str,str,str,str]] = set() - out: List[dict] = [] - for e in edges: - s = str(e.get("source_id") or "") - t = str(e.get("target_id") or "") - rel = str(e.get("relation") or e.get("kind") or "edge") - rule = str(e.get("rule_id") or "") - key = (s, t, rel, rule) - if key in seen: - continue - seen.add(key) - out.append(e) - return out + chunks = list(chunks or []) + + # Structure edges + edges = _structure_edges(note_id, title, chunks) + + # Explicit / inline / callout / defaults + more, _ = _explicit_and_inline_edges(note_id, title, note_type, chunks, types_cfg) + _append_with_dedupe(edges, more) + + return edges