From a4b81535e87fde930cfbbbedce57eb4451266722 Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 17 Nov 2025 15:53:38 +0100 Subject: [PATCH] Dateien nach "app/core" hochladen --- app/core/derive_edges.py | 617 ++++++++++++++++++++++++++------------- 1 file changed, 407 insertions(+), 210 deletions(-) diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index c1cbba9..0465cb4 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,241 +1,438 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +Modul: app/core/derive_edges.py +Version: 2.2.0 (V2-superset mit "typed inline relations" + Obsidian-Callouts) + +Zweck +----- +Bewahrt die bestehende Edgelogik (belongs_to, prev/next, references, backlink) +und ergänzt: +- Typ-Default-Kanten gemäß config/types.yaml (edge_defaults je Notiztyp) +- **Explizite, getypte Inline-Relationen** direkt im Chunk-Text: + * [[rel:depends_on | Target Title]] + * [[rel:related_to Target Title]] +- **Obsidian-Callouts** zur Pflege von Kanten im Markdown: + * > [!edge] related_to: [[Vector DB Basics]] + * Mehrere Zeilen im Callout werden unterstützt (alle Zeilen beginnen mit '>'). + +Konfiguration +------------- +- ENV MINDNET_TYPES_FILE (Default: ./config/types.yaml) + +Hinweis +------- +Diese Implementierung ist rückwärtskompatibel zur bisherigen Signatur. +""" from __future__ import annotations +import os import re -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Iterable, List, Optional, Tuple, Set +try: + import yaml # optional, nur für types.yaml +except Exception: # pragma: no cover + yaml = None -# ------------------------------ -# Edge payload helper -# ------------------------------ -def _edge_payload( - *, - note_id: str, - chunk_id: Optional[str], - kind: str, - source_id: str, - target_id: str, - rule_id: str, - scope: str = "chunk", - confidence: Optional[float] = None, -) -> Dict: - p = { - "note_id": note_id, - "chunk_id": chunk_id, +# ---------------------------- Utilities ------------------------------------ + +def _get(d: dict, *keys, default=None): + for k in keys: + if k in d and d[k] is not None: + return d[k] + return default + +def _chunk_text_for_refs(chunk: dict) -> str: + # bevorzugt 'window' → dann 'text' → 'content' → 'raw' + return ( + _get(chunk, "window") + or _get(chunk, "text") + or _get(chunk, "content") + or _get(chunk, "raw") + or "" + ) + +def _dedupe_seq(seq: Iterable[str]) -> List[str]: + seen: Set[str] = set() + out: List[str] = [] + for s in seq: + if s not in seen: + seen.add(s) + out.append(s) + return out + +def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: + pl = { "kind": kind, - "scope": scope, + "relation": kind, # v2 Feld (alias) + "scope": scope, # "chunk" | "note" "source_id": source_id, "target_id": target_id, - "rule_id": rule_id, + "note_id": note_id, # Träger/Quelle der Kante (aktuelle Note) } - if confidence is not None: - p["confidence"] = float(confidence) - return p + if extra: + pl.update(extra) + return pl +def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: + base = f"{kind}:{s}->{t}#{scope}" + if rule_id: + base += f"|{rule_id}" + try: + import hashlib + return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() + except Exception: # pragma: no cover + return base -# ------------------------------ -# Inline [[wikilink]] parser -# ------------------------------ -_WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]") +# ---------------------- Typen-Registry (types.yaml) ------------------------ -def _iter_wikilinks(text: str) -> Iterable[str]: - for m in _WIKILINK_RE.finditer(text): - yield m.group(1).strip() +def _env(n: str, default: Optional[str] = None) -> str: + v = os.getenv(n) + return v if v is not None else (default or "" ) +def _load_types_registry() -> dict: + """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" + p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") + if not os.path.isfile(p) or yaml is None: + return {} + try: + with open(p, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + return data + except Exception: + return {} -# ------------------------------ -# Callout parser -# Syntax: -# > [!edge] related_to: [[Vector DB Basics]] [[Embeddings 101]] -# Mehrere Ziele pro Zeile erlaubt. -# ------------------------------ -_CALLOUT_RE = re.compile( - r"^\s*>\s*\[!edge\]\s*([a-z_]+)\s*:\s*(.+)$", - flags=re.IGNORECASE, -) +def _get_types_map(reg: dict) -> dict: + if isinstance(reg, dict) and isinstance(reg.get("types"), dict): + return reg["types"] + return reg if isinstance(reg, dict) else {} -def _parse_callout_line(line: str) -> Optional[Tuple[str, List[str]]]: - m = _CALLOUT_RE.match(line) - if not m: - return None - relation = m.group(1).strip().lower() - rhs = m.group(2) - targets = [t.strip() for t in _WIKILINK_RE.findall(rhs) if t.strip()] - if not targets: - return None - return (relation, targets) +def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: + """ + Liefert die edge_defaults-Liste für den gegebenen Notiztyp. + Fallback-Reihenfolge: + 1) reg['types'][note_type]['edge_defaults'] + 2) reg['defaults']['edge_defaults'] (oder 'default'/'global') + 3) [] + """ + types_map = _get_types_map(reg) + # 1) exakter Typ + if note_type and isinstance(types_map, dict): + t = types_map.get(note_type) + if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): + return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] + # 2) Fallback + for key in ("defaults", "default", "global"): + v = reg.get(key) + if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): + return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] + # 3) leer + return [] +# ------------------------ Parser für Links --------------------------------- -# ------------------------------ -# Defaults aus types.yaml anwenden (wenn konfiguriert) -# types_cfg Beispiel: -# { "types": { "project": { "edge_defaults": ["references","depends_on"] }, ... } } -# ------------------------------ -def _edge_defaults_for_type(types_cfg: Dict, note_type: str) -> List[str]: - tdef = (types_cfg or {}).get("types", {}).get(note_type, {}) - vals = tdef.get("edge_defaults") or [] - return [str(v).strip().lower() for v in vals if str(v).strip()] +# Normale Wikilinks (Fallback) +_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") +# Getypte Inline-Relationen: +# [[rel:depends_on | Target]] +# [[rel:related_to Target]] +_REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) +_REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) -# ------------------------------ -# Hauptfunktion: Edges ableiten -# Erwartete Inputs: -# note: { "note_id","title","type","text", ... } -# chunks: [ { "chunk_id","note_id","index","ord","text","window", ... }, ... ] -# types_cfg: geladene types.yaml als Dict -# ------------------------------ -def derive_edges( - note: Dict, - chunks: List[Dict], - types_cfg: Optional[Dict] = None, -) -> List[Dict]: - note_id = note.get("note_id") or note.get("id") - note_title = note.get("title") or "" - note_type = (note.get("type") or "").strip().lower() - text = note.get("text") or "" +def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: + """ + Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, + damit die generische Wikilink-Erkennung sie nicht doppelt zählt. + """ + pairs: List[Tuple[str,str]] = [] + def _collect(m): + k = (m.group("kind") or "").strip().lower() + t = (m.group("target") or "").strip() + if k and t: + pairs.append((k, t)) + return "" # löschen + text = _REL_PIPE.sub(_collect, text) + text = _REL_SPACE.sub(_collect, text) + return pairs, text - edges: List[Dict] = [] +# ---- Obsidian Callout Parser ---------------------------------------------- +# Callout-Start erkennt Zeilen wie: > [!edge] ... (case-insensitive) +_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) - # 1) Sequenz-Edges je Note: belongs_to / next / prev - for i, ch in enumerate(chunks): - cid = ch.get("chunk_id") - # belongs_to - edges.append( - _edge_payload( - note_id=note_id, - chunk_id=cid, - kind="belongs_to", - source_id=cid, - target_id=note_id, - rule_id="structure:v1:belongs_to", - scope="chunk", - ) - ) - # next/prev - if i + 1 < len(chunks): - nxt = chunks[i + 1]["chunk_id"] - edges.append( - _edge_payload( - note_id=note_id, - chunk_id=cid, - kind="next", - source_id=cid, - target_id=nxt, - rule_id="structure:v1:next", - scope="chunk", - ) - ) - if i - 1 >= 0: - prv = chunks[i - 1]["chunk_id"] - edges.append( - _edge_payload( - note_id=note_id, - chunk_id=cid, - kind="prev", - source_id=cid, - target_id=prv, - rule_id="structure:v1:prev", - scope="chunk", - ) - ) +# Innerhalb von Callouts erwarten wir je Zeile Muster wie: +# related_to: [[Vector DB Basics]] +# depends_on: [[A]], [[B]] +# similar_to: Qdrant Vektordatenbank +_REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) - # 2) Inline-Wikilinks ([[Title]]) => references (note-scope + chunk-scope) - # - chunk-scope: pro Chunk in dessen Text/Window - # - note-scope: Gesamttext der Note - # Hinweis: target_id wird hier als Titel gespeichert; später kann ein Resolver auf note_id mappen. - # chunk-scope - for ch in chunks: - cid = ch.get("chunk_id") - body = (ch.get("window") or ch.get("text") or "") - touched = False - for tgt in _iter_wikilinks(body): - touched = True - edges.append( - _edge_payload( - note_id=note_id, - chunk_id=cid, - kind="references", - source_id=cid, - target_id=tgt, # Titel - rule_id="inline:rel:v1:references", - scope="chunk", - confidence=0.8, - ) - ) - # Optional: wenn in einem Chunk Wikilinks vorkamen, kannst du (später) einen counter o. ä. setzen. - _ = touched +_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") - # note-scope (Gesamttext) - for tgt in _iter_wikilinks(text): - edges.append( - _edge_payload( - note_id=note_id, - chunk_id=None, - kind="references", - source_id=note_id, - target_id=tgt, # Titel - rule_id="explicit:ref:v1:wikilink", - scope="note", - confidence=0.8, - ) - ) +def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: + """ + Findet Obsidian-Callouts vom Typ [!edge] und extrahiert (kind, target). + Entfernt den gesamten Callout-Block aus dem Text, damit Wikilinks daraus + nicht zusätzlich als "references" gezählt werden. + """ + if not text: + return [], text - # 3) Callouts: - # > [!edge] related_to: [[A]] [[B]] - # ⇒ pro Ziel A/B je ein Edge mit rule_id="callout:edge:v1:" - for ch in chunks: - cid = ch.get("chunk_id") - body = (ch.get("window") or ch.get("text") or "") - for line in body.splitlines(): - parsed = _parse_callout_line(line) - if not parsed: + lines = text.splitlines() + out_pairs: List[Tuple[str,str]] = [] + keep_lines: List[str] = [] + + i = 0 + while i < len(lines): + m = _CALLOUT_START.match(lines[i]) + if not m: + keep_lines.append(lines[i]) + i += 1 + continue + + # Wir sind in einem Callout-Block; erste Zeile nach dem Marker: + # Rest dieser Zeile nach [!edge] mitnehmen + block_lines: List[str] = [] + first_rest = m.group(1) or "" + if first_rest.strip(): + block_lines.append(first_rest) + + # Folgezeilen sind Teil des Callouts, solange sie weiterhin mit '>' beginnen + i += 1 + while i < len(lines) and lines[i].lstrip().startswith('>'): + # Entferne führendes '>' und evtl. Leerzeichen + block_lines.append(lines[i].lstrip()[1:].lstrip()) + i += 1 + + # Parse jede Blockzeile eigenständig + for bl in block_lines: + mrel = _REL_LINE.match(bl) + if not mrel: continue - relation, targets = parsed - # normalize relation name - relation = relation.lower() - # einheitliches Rule-Tagging für Callouts: - rule_tag = f"callout:edge:v1:{relation}" - for tgt in targets: - edges.append( - _edge_payload( - note_id=note_id, - chunk_id=cid, - kind=relation, - source_id=cid, - target_id=tgt, # Titel - rule_id=rule_tag, - scope="chunk", - confidence=0.7, - ) - ) + kind = (mrel.group("kind") or "").strip().lower() + targets = mrel.group("targets") or "" + # Wikilinks bevorzugt + found = _WIKILINKS_IN_LINE.findall(targets) + if found: + for t in found: + t = t.strip() + if t: + out_pairs.append((kind, t)) + else: + # Fallback: Split per ',' oder ';' + for raw in re.split(r"[,;]", targets): + t = raw.strip() + if t: + out_pairs.append((kind, t)) + # Wichtig: Callout wird NICHT in keep_lines übernommen (entfernt) + continue - # 4) Ableitungs-Edges (edge_defaults) aus types.yaml - # Beispiel: project -> ["references","depends_on"] - # Für jede Chunk-Einheit eine schwach gewichtete Default-Beziehung gegen den Note-Titel, - # damit es als Navigationskanten funktioniert, bis ein Resolver Titeleindeutigkeit herstellt. - defaults = _edge_defaults_for_type(types_cfg or {}, note_type) - if defaults: - rule_prefix = f"edge_defaults:{note_type}" - for ch in chunks: - cid = ch.get("chunk_id") + remainder = "\n".join(keep_lines) + return out_pairs, remainder + +def _extract_wikilinks(text: str) -> List[str]: + ids: List[str] = [] + for m in _WIKILINK_RE.finditer(text or ""): + ids.append(m.group(1).strip()) + return ids + +# --------------------------- Hauptfunktion --------------------------------- + +def build_edges_for_note( + note_id: str, + chunks: List[dict], + note_level_references: Optional[List[str]] = None, + include_note_scope_refs: bool = False, +) -> List[dict]: + """ + Erzeugt Kanten für eine Note. + + - belongs_to: für jeden Chunk (chunk -> note) + - next / prev: zwischen aufeinanderfolgenden Chunks + - references: pro Chunk aus window/text (via Wikilinks) + - typed inline relations: [[rel:KIND | Target]] oder [[rel:KIND Target]] + - Obsidian Callouts: > [!edge] KIND: [[Target]] + - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references + - typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz + """ + edges: List[dict] = [] + + # --- 0) Note-Typ ermitteln (aus erstem Chunk erwartet) --- + note_type = None + if chunks: + note_type = _get(chunks[0], "type") + + # --- 1) belongs_to --- + for ch in chunks: + cid = _get(ch, "chunk_id", "id") + if not cid: + continue + edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), + "provenance": "rule", + "rule_id": "structure:belongs_to:v1", + "confidence": 1.0, + })) + + # --- 2) next/prev --- + for i in range(len(chunks) - 1): + a, b = chunks[i], chunks[i + 1] + a_id = _get(a, "chunk_id", "id") + b_id = _get(b, "chunk_id", "id") + if not a_id or not b_id: + continue + edges.append(_edge("next", "chunk", a_id, b_id, note_id, { + "chunk_id": a_id, + "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"), + "provenance": "rule", + "rule_id": "structure:order:v1", + "confidence": 0.95, + })) + edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { + "chunk_id": b_id, + "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"), + "provenance": "rule", + "rule_id": "structure:order:v1", + "confidence": 0.95, + })) + + # --- 3) references (chunk-scope) + inline relations + callouts + abgeleitete Relationen je Ref --- + reg = _load_types_registry() + defaults = _edge_defaults_for(note_type, reg) + refs_all: List[str] = [] + + for ch in chunks: + cid = _get(ch, "chunk_id", "id") + if not cid: + continue + raw = _chunk_text_for_refs(ch) + + # a) typed inline relations zuerst extrahieren + typed, remainder = _extract_typed_relations(raw) + for kind, target in typed: + edges.append(_edge(kind, "chunk", cid, target, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel:v1"), + "provenance": "explicit", + "rule_id": "inline:rel:v1", + "confidence": 0.95, + })) + # symmetrische Relationen zusätzlich rückwärts + if kind in {"related_to", "similar_to"}: + edges.append(_edge(kind, "chunk", target, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel:v1"), + "provenance": "explicit", + "rule_id": "inline:rel:v1", + "confidence": 0.95, + })) + + # b) Obsidian Callouts extrahieren (und aus remainder entfernen) + call_pairs, remainder2 = _extract_callout_relations(remainder) + for kind, target in call_pairs: + k = (kind or "").strip().lower() + if not k or not target: + continue + edges.append(_edge(k, "chunk", cid, target, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge:v1"), + "provenance": "explicit", + "rule_id": "callout:edge:v1", + "confidence": 0.95, + })) + if k in {"related_to", "similar_to"}: + edges.append(_edge(k, "chunk", target, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge:v1"), + "provenance": "explicit", + "rule_id": "callout:edge:v1", + "confidence": 0.95, + })) + + # c) generische Wikilinks (remainder2) → "references" + refs = _extract_wikilinks(remainder2) + for r in refs: + # reale Referenz (wie bisher) + edges.append(_edge("references", "chunk", cid, r, note_id, { + "chunk_id": cid, + "ref_text": r, + "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink:v1"), + "provenance": "explicit", + "rule_id": "explicit:wikilink:v1", + "confidence": 1.0, + })) + # abgeleitete Kanten je default-Relation for rel in defaults: - edges.append( - _edge_payload( - note_id=note_id, - chunk_id=cid, - kind=rel, - source_id=cid, - target_id=note_title or note_id, # weiche Zielmarke - rule_id=f"{rule_prefix}:{rel}", - scope="chunk", - confidence=0.7, - ) - ) + if rel == "references": + continue # doppelt vermeiden + edges.append(_edge(rel, "chunk", cid, r, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) + # symmetrisch? + if rel in {"related_to", "similar_to"}: + edges.append(_edge(rel, "chunk", r, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) + refs_all.extend(refs) - # 5) De-Duplizierung (idempotent): key = (source_id, target_id, kind, rule_id) - unique: Dict[Tuple[str, str, str, str], Dict] = {} + # --- 4) optional: note-scope references/backlinks (+ defaults) --- + if include_note_scope_refs: + refs_note = list(refs_all or []) + if note_level_references: + refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) + refs_note = _dedupe_seq(refs_note) + for r in refs_note: + # echte note-scope Referenz & Backlink (wie bisher) + edges.append(_edge("references", "note", note_id, r, note_id, { + "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope:v1"), + "provenance": "explicit", + "rule_id": "explicit:note_scope:v1", + "confidence": 1.0, + })) + edges.append(_edge("backlink", "note", r, note_id, note_id, { + "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink:v1"), + "provenance": "rule", + "rule_id": "derived:backlink:v1", + "confidence": 0.9, + })) + # und zusätzlich default-Relationen (note-scope) + for rel in defaults: + if rel == "references": + continue + edges.append(_edge(rel, "note", note_id, r, note_id, { + "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) + if rel in {"related_to", "similar_to"}: + edges.append(_edge(rel, "note", r, note_id, note_id, { + "edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) + + # --- 5) Dedupe (Schlüssel: source_id, target_id, relation, rule_id) --- + seen: Set[Tuple[str,str,str,str]] = set() + out: List[dict] = [] for e in edges: - k = (e["source_id"], e["target_id"], e["kind"], e["rule_id"]) - unique[k] = e - return list(unique.values()) + s = str(e.get("source_id") or "") + t = str(e.get("target_id") or "") + rel = str(e.get("relation") or e.get("kind") or "edge") + rule = str(e.get("rule_id") or "") + key = (s, t, rel, rule) + if key in seen: + continue + seen.add(key) + out.append(e) + return out