From aab010ff17b43a6546987918b76e3e6187b64b1e Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 18 Nov 2025 07:50:46 +0100 Subject: [PATCH] app/core/derive_edges.py aktualisiert --- app/core/derive_edges.py | 731 ++++++++++++++++++--------------------- 1 file changed, 330 insertions(+), 401 deletions(-) diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 6a89950..a55a62c 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,435 +1,364 @@ -#!/usr/bin/env python3 +# app/core/derive_edges.py # -*- coding: utf-8 -*- """ -Modul: app/core/derive_edges.py -Zweck: -- Bewahrt bestehende Edgelogik (belongs_to, prev/next, references, backlink) -- Ergänzt typenbasierte Default-Kanten (edge_defaults aus config/types.yaml) -- Unterstützt "typed inline relations": - * [[rel:KIND | Target]] - * [[rel:KIND Target]] - * rel: KIND [[Target]] -- Unterstützt Obsidian-Callouts: - * > [!edge] KIND: [[Target]] [[Target2]] ... -Kompatibilität: -- build_edges_for_note(...) Signatur unverändert -- rule_id Werte: - * structure:belongs_to - * structure:order - * explicit:wikilink - * inline:rel - * callout:edge - * edge_defaults:: - * derived:backlink +Edge-Ableitung (V2) +Beibehaltung der bestehenden Funktionalität + Erweiterung: +- Mehrere Inline-Referenzen in einer Zeile: rel: [[A]] [[B]] ... +Kompatibel mit: +- Strukturkanten: belongs_to / next / prev +- Explizite Wikilinks -> references +- Inline-Relationen -> inline:rel +- Callout-Kanten -> callout:edge +- Typbasierte Default-Kanten (edge_defaults aus types.yaml) """ from __future__ import annotations - -import os import re -from typing import Iterable, List, Optional, Tuple, Set, Dict +from typing import Dict, List, Iterable, Tuple, Set -try: - import yaml # optional, nur für types.yaml -except Exception: # pragma: no cover - yaml = None +# ---------------------------------------------------------------------- +# Regex-Bausteine +# ---------------------------------------------------------------------- -# --------------------------------------------------------------------------- # +# Wikilinks: [[Title]] oder [[Title|Alias]] +RE_WIKILINK = re.compile(r"\[\[([^\]|#]+)(?:#[^\]|]+)?(?:\|[^\]]+)?\]\]") + +# Inline-Relationen (Variante B – von dir im Einsatz): +# rel: [[Target A]] [[Target B]] ... +RE_INLINE_REL_LINE = re.compile( + r"(?i)\brel\s*:\s*(?P[a-z_][a-z0-9_]+)\s+(?P.+)$" +) + +# Callout: +# > [!edge] : [[A]] [[B]] +RE_CALLOUT_HEADER = re.compile(r"^\s{0,3}>\s*\[\!edge\]\s*(?P[a-z_][a-z0-9_]+)\s*:\s*(?P.*)$", re.IGNORECASE) + +# ---------------------------------------------------------------------- # Utilities -# --------------------------------------------------------------------------- # +# ---------------------------------------------------------------------- -def _get(d: dict, *keys, default=None): - for k in keys: - if isinstance(d, dict) and k in d and d[k] is not None: - return d[k] - return default +def _neighbors_chain(chunk_ids: List[str]) -> Iterable[Tuple[str, str]]: + """Erzeugt (prev, next) Paare entlang der Chunk-Sequenz.""" + for i in range(len(chunk_ids) - 1): + yield chunk_ids[i], chunk_ids[i + 1] -def _chunk_text_for_refs(chunk: dict) -> str: - # bevorzugt 'window' → dann 'text' → 'content' → 'raw' - return ( - _get(chunk, "window") - or _get(chunk, "text") - or _get(chunk, "content") - or _get(chunk, "raw") - or "" - ) - -def _dedupe_seq(seq: Iterable[str]) -> List[str]: - seen: Set[str] = set() - out: List[str] = [] - for s in seq: - if s not in seen: - seen.add(s) - out.append(s) - return out - -def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: +def _mk_edge_payload( + *, + kind: str, + scope: str, + note_id: str, + chunk_id: str | None = None, + source_id: str, + target_id: str, + rule_id: str, + confidence: float, +) -> Dict: + """ + Einheitliches Edge-Payload-Format. + """ pl = { - "kind": kind, - "relation": kind, # Alias (v2) - "scope": scope, # "chunk" | "note" - "source_id": source_id, - "target_id": target_id, - "note_id": note_id, # Träger-Note der Kante + "kind": kind, # z.B. references, depends_on, related_to, similar_to + "scope": scope, # "chunk" oder "note" + "note_id": note_id, # Note-Kontext (Quelle) + "source_id": source_id, # id der Quelle (Chunk-ID oder Note-ID) + "target_id": target_id, # Ziel (Note-ID oder Titel, falls Auflösung extern erfolgt) + "rule_id": rule_id, + "confidence": confidence, } - if extra: - pl.update(extra) + if chunk_id: + pl["chunk_id"] = chunk_id return pl -def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: - base = f"{kind}:{s}->{t}#{scope}" - if rule_id: - base += f"|{rule_id}" - try: - import hashlib - return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() - except Exception: # pragma: no cover - return base - -# --------------------------------------------------------------------------- # -# Typen-Registry (types.yaml) -# --------------------------------------------------------------------------- # - -def _env(n: str, default: Optional[str] = None) -> str: - v = os.getenv(n) - return v if v is not None else (default or "") - -def _load_types_registry() -> dict: - """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" - p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") - if not os.path.isfile(p) or yaml is None: - return {} - try: - with open(p, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) or {} - return data - except Exception: - return {} - -def _get_types_map(reg: dict) -> dict: - if isinstance(reg, dict) and isinstance(reg.get("types"), dict): - return reg["types"] - return reg if isinstance(reg, dict) else {} - -def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: - """ - Liefert die edge_defaults-Liste für den gegebenen Notiztyp. - Fallback-Reihenfolge: - 1) reg['types'][note_type]['edge_defaults'] - 2) reg['defaults']['edge_defaults'] (oder 'default'/'global') - 3) [] - """ - types_map = _get_types_map(reg) - if note_type and isinstance(types_map, dict): - t = types_map.get(note_type) - if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): - return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] - for key in ("defaults", "default", "global"): - v = reg.get(key) - if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): - return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] - return [] - -# --------------------------------------------------------------------------- # -# Parser für Links / Relationen -# --------------------------------------------------------------------------- # - -# Normale Wikilinks (Fallback) -_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") - -# Getypte Inline-Relationen: -# [[rel:KIND | Target]] -# [[rel:KIND Target]] -_REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -_REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -# rel: KIND [[Target]] (reines Textmuster) -_REL_TEXT = re.compile(r"rel\s*:\s*(?P[a-z_]+)\s*\[\[\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) - -def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """ - Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, - damit die generische Wikilink-Erkennung sie nicht doppelt zählt. - Unterstützt drei Varianten: - - [[rel:KIND | Target]] - - [[rel:KIND Target]] - - rel: KIND [[Target]] - """ - pairs: List[Tuple[str,str]] = [] - def _collect(m): - k = (m.group("kind") or "").strip().lower() - t = (m.group("target") or "").strip() - if k and t: - pairs.append((k, t)) - return "" # Link entfernen - - text = _REL_PIPE.sub(_collect, text) - text = _REL_SPACE.sub(_collect, text) - text = _REL_TEXT.sub(_collect, text) - return pairs, text - -# Obsidian Callout Parser -_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) -_REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) -_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") - -def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: - """ - Findet [!edge]-Callouts und extrahiert (kind, target). Entfernt den gesamten - Callout-Block aus dem Text (damit Wikilinks daraus nicht zusätzlich als - "references" gezählt werden). - """ - if not text: - return [], text - - lines = text.splitlines() - out_pairs: List[Tuple[str,str]] = [] - keep_lines: List[str] = [] - i = 0 - - while i < len(lines): - m = _CALLOUT_START.match(lines[i]) - if not m: - keep_lines.append(lines[i]) - i += 1 - continue - - block_lines: List[str] = [] - first_rest = m.group(1) or "" - if first_rest.strip(): - block_lines.append(first_rest) - - i += 1 - while i < len(lines) and lines[i].lstrip().startswith('>'): - block_lines.append(lines[i].lstrip()[1:].lstrip()) - i += 1 - - for bl in block_lines: - mrel = _REL_LINE.match(bl) - if not mrel: - continue - kind = (mrel.group("kind") or "").strip().lower() - targets = mrel.group("targets") or "" - found = _WIKILINKS_IN_LINE.findall(targets) - if found: - for t in found: - t = t.strip() - if t: - out_pairs.append((kind, t)) - else: - for raw in re.split(r"[,;]", targets): - t = raw.strip() - if t: - out_pairs.append((kind, t)) - - # Callout wird NICHT in keep_lines übernommen - continue - - remainder = "\n".join(keep_lines) - return out_pairs, remainder - def _extract_wikilinks(text: str) -> List[str]: - ids: List[str] = [] - for m in _WIKILINK_RE.finditer(text or ""): - ids.append(m.group(1).strip()) - return ids - -# --------------------------------------------------------------------------- # -# Hauptfunktion -# --------------------------------------------------------------------------- # - -def build_edges_for_note( - note_id: str, - chunks: List[dict], - note_level_references: Optional[List[str]] = None, - include_note_scope_refs: bool = False, -) -> List[dict]: """ - Erzeugt Kanten für eine Note. - - - belongs_to: für jeden Chunk (chunk -> note) - - next / prev: zwischen aufeinanderfolgenden Chunks - - references: pro Chunk aus window/text (via Wikilinks) - - typed inline relations: [[rel:KIND | Target]] / [[rel:KIND Target]] / rel: KIND [[Target]] - - Obsidian Callouts: > [!edge] KIND: [[Target]] [[Target2]] - - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references - - typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz + Extrahiert alle Wikilink-Ziele (als Titel-Strings). """ - edges: List[dict] = [] + return [m.group(1).strip() for m in RE_WIKILINK.finditer(text or "")] - # Note-Typ (aus erstem Chunk erwartet) - note_type = None - if chunks: - note_type = _get(chunks[0], "type") +def _extract_inline_relations_lines(text: str) -> List[Tuple[str, List[str]]]: + """ + Findet Inline-Relationen in Zeilen wie: + rel: [[Target A]] [[Target B]] + Liefert Liste von (relation, [targets...]). + """ + out: List[Tuple[str, List[str]]] = [] + if not text: + return out + for line in text.splitlines(): + m = RE_INLINE_REL_LINE.search(line) + if not m: + continue + rel = m.group("rel").strip().lower() + body = m.group("body") + # alle [[...]] Ziele aus body herausziehen: + targets = _extract_wikilinks(body) + # falls im Body keine [[...]] vorkommen, versuche verbleibenden Text als ein Ziel (robust): + if not targets: + cleaned = body.strip() + if cleaned: + targets = [cleaned] + if targets: + out.append((rel, targets)) + return out - # 1) belongs_to - for ch in chunks: - cid = _get(ch, "chunk_id", "id") +def _extract_callout_edges(text: str) -> List[Tuple[str, List[str]]]: + """ + Callout-Edges: + > [!edge] : [[A]] [[B]] + pro Zeile eine Relation + 1..n Ziele + """ + out: List[Tuple[str, List[str]]] = [] + if not text: + return out + for line in text.splitlines(): + m = RE_CALLOUT_HEADER.match(line) + if not m: + continue + rel = m.group("rel").strip().lower() + body = m.group("body") + targets = _extract_wikilinks(body) + # Robustheit: wenn keine [[...]] vorhanden, restlicher body als ein Ziel + if not targets: + cleaned = body.strip() + if cleaned: + targets = [cleaned] + if targets: + out.append((rel, targets)) + return out + +# ---------------------------------------------------------------------- +# Haupt-API +# ---------------------------------------------------------------------- + +def derive_edges( + note: Dict, + chunks: List[Dict], + types_cfg: Dict | None = None, +) -> List[Dict]: + """ + Leitet Kanten für eine Note ab. + + Erwartete Felder: + note: { + "note_id": str, + "title": str, + "type": str, + "text": str + } + chunks: [{ + "chunk_id": str, + "index": int, + "text": str, + ... + }, ...] + + types_cfg (aus types.yaml geladen) mit: + types_cfg["types"][]["edge_defaults"] = [relation, ...] + (optional) + """ + edges: List[Dict] = [] + + note_id = note.get("note_id") or note.get("id") + note_title = note.get("title") or "" + note_type = (note.get("type") or "").strip().lower() + note_text = note.get("text") or "" + + # ------------------------------------------------------------------ + # 1) Strukturkanten je Chunk: belongs_to / next / prev + # ------------------------------------------------------------------ + chunk_ids = [c.get("chunk_id") for c in chunks if c.get("chunk_id")] + # belongs_to + for c in chunks: + cid = c.get("chunk_id") if not cid: continue - edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), - "provenance": "rule", - "rule_id": "structure:belongs_to", - "confidence": 1.0, - })) + edges.append( + _mk_edge_payload( + kind="belongs_to", + scope="chunk", + note_id=note_id, + chunk_id=cid, + source_id=cid, + target_id=note_id, + rule_id="structure:belongs_to", + confidence=1.0, + ) + ) + # next/prev + for prev_id, next_id in _neighbors_chain(chunk_ids): + # next + edges.append( + _mk_edge_payload( + kind="next", + scope="chunk", + note_id=note_id, + chunk_id=prev_id, + source_id=prev_id, + target_id=next_id, + rule_id="structure:next", + confidence=1.0, + ) + ) + # prev + edges.append( + _mk_edge_payload( + kind="prev", + scope="chunk", + note_id=note_id, + chunk_id=next_id, + source_id=next_id, + target_id=prev_id, + rule_id="structure:prev", + confidence=1.0, + ) + ) - # 2) next / prev - for i in range(len(chunks) - 1): - a, b = chunks[i], chunks[i + 1] - a_id = _get(a, "chunk_id", "id") - b_id = _get(b, "chunk_id", "id") - if not a_id or not b_id: - continue - edges.append(_edge("next", "chunk", a_id, b_id, note_id, { - "chunk_id": a_id, - "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"), - "provenance": "rule", - "rule_id": "structure:order", - "confidence": 0.95, - })) - edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { - "chunk_id": b_id, - "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"), - "provenance": "rule", - "rule_id": "structure:order", - "confidence": 0.95, - })) + # ------------------------------------------------------------------ + # 2) Explizite Referenzen (Wikilinks) + Inline-Relationen + Callouts + # - Alles chunk-scope, Quelle = chunk_id (falls vorhanden), + # sonst Note-scope als Fallback. + # ------------------------------------------------------------------ + # Sammle alle expliziten Ziele (für spätere edge_defaults) + explicit_targets: Set[str] = set() - # 3) references + typed inline + callouts + defaults (chunk-scope) - reg = _load_types_registry() - defaults = _edge_defaults_for(note_type, reg) - refs_all: List[str] = [] + # pro Chunk prüfen + for c in chunks: + cid = c.get("chunk_id") + ctxt = c.get("text") or "" - for ch in chunks: - cid = _get(ch, "chunk_id", "id") - if not cid: - continue - raw = _chunk_text_for_refs(ch) + # 2a) Wikilinks -> references + for tgt in _extract_wikilinks(ctxt): + explicit_targets.add(tgt) + edges.append( + _mk_edge_payload( + kind="references", + scope="chunk", + note_id=note_id, + chunk_id=cid, + source_id=cid, + target_id=tgt, + rule_id="explicit:wikilink", + confidence=1.0, + ) + ) - # 3a) typed inline relations - typed, remainder = _extract_typed_relations(raw) - for kind, target in typed: - kind = kind.strip().lower() - if not kind or not target: - continue - edges.append(_edge(kind, "chunk", cid, target, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel"), - "provenance": "explicit", - "rule_id": "inline:rel", - "confidence": 0.95, - })) - if kind in {"related_to", "similar_to"}: - edges.append(_edge(kind, "chunk", target, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel"), - "provenance": "explicit", - "rule_id": "inline:rel", - "confidence": 0.95, - })) + # 2b) Inline-Relationen (mehrere Ziele erlaubt) + for rel, targets in _extract_inline_relations_lines(ctxt): + for tgt in targets: + explicit_targets.add(tgt) + edges.append( + _mk_edge_payload( + kind=rel, + scope="chunk", + note_id=note_id, + chunk_id=cid, + source_id=cid, + target_id=tgt, + rule_id="inline:rel", + confidence=0.95, + ) + ) - # 3b) callouts - call_pairs, remainder2 = _extract_callout_relations(remainder) - for kind, target in call_pairs: - k = (kind or "").strip().lower() - if not k or not target: - continue - edges.append(_edge(k, "chunk", cid, target, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"), - "provenance": "explicit", - "rule_id": "callout:edge", - "confidence": 0.95, - })) - if k in {"related_to", "similar_to"}: - edges.append(_edge(k, "chunk", target, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge"), - "provenance": "explicit", - "rule_id": "callout:edge", - "confidence": 0.95, - })) + # 2c) Callout-Edges (mehrere Ziele erlaubt) + for rel, targets in _extract_callout_edges(ctxt): + for tgt in targets: + explicit_targets.add(tgt) + edges.append( + _mk_edge_payload( + kind=rel, + scope="chunk", + note_id=note_id, + chunk_id=cid, + source_id=cid, + target_id=tgt, + rule_id="callout:edge", + confidence=0.9, + ) + ) - # 3c) generische Wikilinks → references (+ defaults je Ref) - refs = _extract_wikilinks(remainder2) - for r in refs: - edges.append(_edge("references", "chunk", cid, r, note_id, { - "chunk_id": cid, - "ref_text": r, - "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), - "provenance": "explicit", - "rule_id": "explicit:wikilink", - "confidence": 1.0, - })) + # Fallback: Falls Note keinen Chunk-Text enthielt (theoretisch), + # prüfe Note-Text einmal global (liefert note-scope Kanten). + if not chunks and note_text: + # Wikilinks + for tgt in _extract_wikilinks(note_text): + explicit_targets.add(tgt) + edges.append( + _mk_edge_payload( + kind="references", + scope="note", + note_id=note_id, + source_id=note_id, + target_id=tgt, + rule_id="explicit:wikilink", + confidence=1.0, + ) + ) + # Inline + for rel, targets in _extract_inline_relations_lines(note_text): + for tgt in targets: + explicit_targets.add(tgt) + edges.append( + _mk_edge_payload( + kind=rel, + scope="note", + note_id=note_id, + source_id=note_id, + target_id=tgt, + rule_id="inline:rel", + confidence=0.95, + ) + ) + # Callouts + for rel, targets in _extract_callout_edges(note_text): + for tgt in targets: + explicit_targets.add(tgt) + edges.append( + _mk_edge_payload( + kind=rel, + scope="note", + note_id=note_id, + source_id=note_id, + target_id=tgt, + rule_id="callout:edge", + confidence=0.9, + ) + ) + + # ------------------------------------------------------------------ + # 3) Typbasierte Default-Kanten (edge_defaults) + # - nur, wenn es explizite Ziele gibt (sonst kein Ableitungsanker) + # ------------------------------------------------------------------ + if types_cfg and explicit_targets: + type_entry = (types_cfg.get("types") or {}).get(note_type) or {} + defaults: List[str] = type_entry.get("edge_defaults") or [] + defaults = [str(d).strip().lower() for d in defaults if str(d).strip()] + if defaults: + # default-Kanten als "note"-Scope (Konzeption: vom Note-Kontext aus) for rel in defaults: - if rel == "references": - continue - edges.append(_edge(rel, "chunk", cid, r, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) - if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "chunk", r, cid, note_id, { - "chunk_id": cid, - "edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) + rule = f"edge_defaults:{note_type}:{rel}" + for tgt in sorted(explicit_targets): + edges.append( + _mk_edge_payload( + kind=rel, + scope="note", + note_id=note_id, + source_id=note_id, + target_id=tgt, + rule_id=rule, + confidence=0.7, + ) + ) - refs_all.extend(refs) - - # 4) optional note-scope refs/backlinks (+ defaults) - if include_note_scope_refs: - refs_note = list(refs_all or []) - if note_level_references: - refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) - refs_note = _dedupe_seq(refs_note) - for r in refs_note: - edges.append(_edge("references", "note", note_id, r, note_id, { - "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), - "provenance": "explicit", - "rule_id": "explicit:note_scope", - "confidence": 1.0, - })) - edges.append(_edge("backlink", "note", r, note_id, note_id, { - "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), - "provenance": "rule", - "rule_id": "derived:backlink", - "confidence": 0.9, - })) - for rel in defaults: - if rel == "references": - continue - edges.append(_edge(rel, "note", note_id, r, note_id, { - "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) - if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "note", r, note_id, note_id, { - "edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}"), - "provenance": "rule", - "rule_id": f"edge_defaults:{note_type}:{rel}", - "confidence": 0.7, - })) - - # 5) De-Dupe (source_id, target_id, relation, rule_id) - seen: Set[Tuple[str,str,str,str]] = set() - out: List[dict] = [] + # ------------------------------------------------------------------ + # 4) De-Duplizierung (idempotent): Schlüssel (kind, scope, source_id, target_id, rule_id) + # ------------------------------------------------------------------ + seen: Set[Tuple[str, str, str, str, str]] = set() + uniq: List[Dict] = [] for e in edges: - s = str(e.get("source_id") or "") - t = str(e.get("target_id") or "") - rel = str(e.get("relation") or e.get("kind") or "edge") - rule = str(e.get("rule_id") or "") - key = (s, t, rel, rule) + key = (e["kind"], e["scope"], e["source_id"], e["target_id"], e["rule_id"]) if key in seen: continue seen.add(key) - out.append(e) - return out + uniq.append(e) + + return uniq