#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Modul: app/core/derive_edges.py Zweck: - Bewahrt bestehende Edgelogik (belongs_to, prev/next, references, backlink) - Ergänzt typenbasierte Default-Kanten (edge_defaults aus config/types.yaml) - Unterstützt "typed inline relations" ([[rel:KIND | Target]] / [[rel:KIND Target]]) - Unterstützt Obsidian-Callouts (> [!edge] KIND: [[Target]] [[Target2]] ...) Kompatibilität: - build_edges_for_note(...) Signatur unverändert - rule_id Werte exakt wie zuvor erwartet (ohne Versionssuffix): * structure:belongs_to * structure:order * explicit:wikilink * inline:rel * callout:edge * edge_defaults:: * derived:backlink """ from __future__ import annotations import os import re from typing import Iterable, List, Optional, Tuple, Set, Dict try: import yaml # optional, nur für types.yaml except Exception: # pragma: no cover yaml = None # --------------------------------------------------------------------------- # # Utilities # --------------------------------------------------------------------------- # def _get(d: dict, *keys, default=None): for k in keys: if k in d and d[k] is not None: return d[k] return default def _chunk_text_for_refs(chunk: dict) -> str: # bevorzugt 'window' → dann 'text' → 'content' → 'raw' return ( _get(chunk, "window") or _get(chunk, "text") or _get(chunk, "content") or _get(chunk, "raw") or "" ) def _dedupe_seq(seq: Iterable[str]) -> List[str]: seen: Set[str] = set() out: List[str] = [] for s in seq: if s not in seen: seen.add(s) out.append(s) return out def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: pl = { "kind": kind, "relation": kind, # Alias (v2) "scope": scope, # "chunk" | "note" "source_id": source_id, "target_id": target_id, "note_id": note_id, # Träger-Note der Kante } if extra: pl.update(extra) return pl def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: base = f"{kind}:{s}->{t}#{scope}" if rule_id: base += f"|{rule_id}" try: import hashlib return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() except Exception: # pragma: no cover return base # --------------------------------------------------------------------------- # # Typen-Registry (types.yaml) # --------------------------------------------------------------------------- # def _env(n: str, default: Optional[str] = None) -> str: v = os.getenv(n) return v if v is not None else (default or "") def _load_types_registry() -> dict: """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") if not os.path.isfile(p) or yaml is None: return {} try: with open(p, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} return data except Exception: return {} def _get_types_map(reg: dict) -> dict: if isinstance(reg, dict) and isinstance(reg.get("types"), dict): return reg["types"] return reg if isinstance(reg, dict) else {} def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: """ Liefert die edge_defaults-Liste für den gegebenen Notiztyp. Fallback-Reihenfolge: 1) reg['types'][note_type]['edge_defaults'] 2) reg['defaults']['edge_defaults'] (oder 'default'/'global') 3) [] """ types_map = _get_types_map(reg) if note_type and isinstance(types_map, dict): t = types_map.get(note_type) if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] for key in ("defaults", "default", "global"): v = reg.get(key) if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] return [] # --------------------------------------------------------------------------- # # Parser für Links / Relationen # --------------------------------------------------------------------------- # # Normale Wikilinks (Fallback) _WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") # Getypte Inline-Relationen: # [[rel:depends_on | Target]] # [[rel:related_to Target]] _REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) _REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: """ Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, damit die generische Wikilink-Erkennung sie nicht doppelt zählt. """ pairs: List[Tuple[str,str]] = [] def _collect(m): k = (m.group("kind") or "").strip().lower() t = (m.group("target") or "").strip() if k and t: pairs.append((k, t)) return "" # Link entfernen text = _REL_PIPE.sub(_collect, text) text = _REL_SPACE.sub(_collect, text) return pairs, text # Obsidian Callout Parser _CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) _REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) _WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: """ Findet [!edge]-Callouts und extrahiert (kind, target). Entfernt den gesamten Callout-Block aus dem Text (damit Wikilinks daraus nicht zusätzlich als "references" gezählt werden). """ if not text: return [], text lines = text.splitlines() out_pairs: List[Tuple[str,str]] = [] keep_lines: List[str] = [] i = 0 while i < len(lines): m = _CALLOUT_START.match(lines[i]) if not m: keep_lines.append(lines[i]) i += 1 continue block_lines: List[str] = [] first_rest = m.group(1) or "" if first_rest.strip(): block_lines.append(first_rest) i += 1 while i < len(lines) and lines[i].lstrip().startswith('>'): block_lines.append(lines[i].lstrip()[1:].lstrip()) i += 1 for bl in block_lines: mrel = _REL_LINE.match(bl) if not mrel: continue kind = (mrel.group("kind") or "").strip().lower() targets = mrel.group("targets") or "" found = _WIKILINKS_IN_LINE.findall(targets) if found: for t in found: t = t.strip() if t: out_pairs.append((kind, t)) else: for raw in re.split(r"[,;]", targets): t = raw.strip() if t: out_pairs.append((kind, t)) # Callout wird NICHT in keep_lines übernommen continue remainder = "\n".join(keep_lines) return out_pairs, remainder def _extract_wikilinks(text: str) -> List[str]: ids: List[str] = [] for m in _WIKILINK_RE.finditer(text or ""): ids.append(m.group(1).strip()) return ids # --------------------------------------------------------------------------- # # Hauptfunktion # --------------------------------------------------------------------------- # def build_edges_for_note( note_id: str, chunks: List[dict], note_level_references: Optional[List[str]] = None, include_note_scope_refs: bool = False, ) -> List[dict]: """ Erzeugt Kanten für eine Note. - belongs_to: für jeden Chunk (chunk -> note) - next / prev: zwischen aufeinanderfolgenden Chunks - references: pro Chunk aus window/text (via Wikilinks) - typed inline relations: [[rel:KIND | Target]] oder [[rel:KIND Target]] - Obsidian Callouts: > [!edge] KIND: [[Target]] [[Target2]] - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references - typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz """ edges: List[dict] = [] # Note-Typ (aus erstem Chunk erwartet) note_type = None if chunks: note_type = _get(chunks[0], "type") # 1) belongs_to for ch in chunks: cid = _get(ch, "chunk_id", "id") if not cid: continue edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), "provenance": "rule", "rule_id": "structure:belongs_to", "confidence": 1.0, })) # 2) next / prev for i in range(len(chunks) - 1): a, b = chunks[i], chunks[i + 1] a_id = _get(a, "chunk_id", "id") b_id = _get(b, "chunk_id", "id") if not a_id or not b_id: continue edges.append(_edge("next", "chunk", a_id, b_id, note_id, { "chunk_id": a_id, "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"), "provenance": "rule", "rule_id": "structure:order", "confidence": 0.95, })) edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { "chunk_id": b_id, "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"), "provenance": "rule", "rule_id": "structure:order", "confidence": 0.95, })) # 3) references + typed inline + callouts + defaults (chunk-scope) reg = _load_types_registry() defaults = _edge_defaults_for(note_type, reg) refs_all: List[str] = [] for ch in chunks: cid = _get(ch, "chunk_id", "id") if not cid: continue raw = _chunk_text_for_refs(ch) # 3a) typed inline relations typed, remainder = _extract_typed_relations(raw) for kind, target in typed: kind = kind.strip().lower() if not kind or not target: continue edges.append(_edge(kind, "chunk", cid, target, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel"), "provenance": "explicit", "rule_id": "inline:rel", "confidence": 0.95, })) if kind in {"related_to", "similar_to"}: edges.append(_edge(kind, "chunk", target, cid, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel"), "provenance": "explicit", "rule_id": "inline:rel", "confidence": 0.95, })) # 3b) callouts call_pairs, remainder2 = _extract_callout_relations(remainder) for kind, target in call_pairs: k = (kind or "").strip().lower() if not k or not target: continue edges.append(_edge(k, "chunk", cid, target, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"), "provenance": "explicit", "rule_id": "callout:edge", "confidence": 0.95, })) if k in {"related_to", "similar_to"}: edges.append(_edge(k, "chunk", target, cid, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge"), "provenance": "explicit", "rule_id": "callout:edge", "confidence": 0.95, })) # 3c) generische Wikilinks → references (+ defaults je Ref) refs = _extract_wikilinks(remainder2) for r in refs: edges.append(_edge("references", "chunk", cid, r, note_id, { "chunk_id": cid, "ref_text": r, "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), "provenance": "explicit", "rule_id": "explicit:wikilink", "confidence": 1.0, })) for rel in defaults: if rel == "references": continue edges.append(_edge(rel, "chunk", cid, r, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7, })) if rel in {"related_to", "similar_to"}: edges.append(_edge(rel, "chunk", r, cid, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7, })) refs_all.extend(refs) # 4) optional note-scope refs/backlinks (+ defaults) if include_note_scope_refs: refs_note = list(refs_all or []) if note_level_references: refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) refs_note = _dedupe_seq(refs_note) for r in refs_note: edges.append(_edge("references", "note", note_id, r, note_id, { "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), "provenance": "explicit", "rule_id": "explicit:note_scope", "confidence": 1.0, })) edges.append(_edge("backlink", "note", r, note_id, note_id, { "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), "provenance": "rule", "rule_id": "derived:backlink", "confidence": 0.9, })) for rel in defaults: if rel == "references": continue edges.append(_edge(rel, "note", note_id, r, note_id, { "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7, })) if rel in {"related_to", "similar_to"}: edges.append(_edge(rel, "note", r, note_id, note_id, { "edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7, })) # 5) De-Dupe (source_id, target_id, relation, rule_id) seen: Set[Tuple[str,str,str,str]] = set() out: List[dict] = [] for e in edges: s = str(e.get("source_id") or "") t = str(e.get("target_id") or "") rel = str(e.get("relation") or e.get("kind") or "edge") rule = str(e.get("rule_id") or "") key = (s, t, rel, rule) if key in seen: continue seen.add(key) out.append(e) return out