app/core/derive_edges.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-17 15:39:18 +01:00
parent 12c600edbe
commit 23d0670126

View File

@ -1,438 +1,241 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
Modul: app/core/derive_edges.py
Version: 2.2.0 (V2-superset mit "typed inline relations" + Obsidian-Callouts)
Zweck
-----
Bewahrt die bestehende Edgelogik (belongs_to, prev/next, references, backlink)
und ergänzt:
- Typ-Default-Kanten gemäß config/types.yaml (edge_defaults je Notiztyp)
- **Explizite, getypte Inline-Relationen** direkt im Chunk-Text:
* [[rel:depends_on | Target Title]]
* [[rel:related_to Target Title]]
- **Obsidian-Callouts** zur Pflege von Kanten im Markdown:
* > [!edge] related_to: [[Vector DB Basics]]
* Mehrere Zeilen im Callout werden unterstützt (alle Zeilen beginnen mit '>').
Konfiguration
-------------
- ENV MINDNET_TYPES_FILE (Default: ./config/types.yaml)
Hinweis
-------
Diese Implementierung ist rückwärtskompatibel zur bisherigen Signatur.
"""
from __future__ import annotations from __future__ import annotations
import os
import re import re
from typing import Iterable, List, Optional, Tuple, Set from typing import Dict, Iterable, List, Optional, Tuple
try:
import yaml # optional, nur für types.yaml
except Exception: # pragma: no cover
yaml = None
# ---------------------------- Utilities ------------------------------------ # ------------------------------
# Edge payload helper
def _get(d: dict, *keys, default=None): # ------------------------------
for k in keys: def _edge_payload(
if k in d and d[k] is not None: *,
return d[k] note_id: str,
return default chunk_id: Optional[str],
kind: str,
def _chunk_text_for_refs(chunk: dict) -> str: source_id: str,
# bevorzugt 'window' → dann 'text' → 'content' → 'raw' target_id: str,
return ( rule_id: str,
_get(chunk, "window") scope: str = "chunk",
or _get(chunk, "text") confidence: Optional[float] = None,
or _get(chunk, "content") ) -> Dict:
or _get(chunk, "raw") p = {
or "" "note_id": note_id,
) "chunk_id": chunk_id,
def _dedupe_seq(seq: Iterable[str]) -> List[str]:
seen: Set[str] = set()
out: List[str] = []
for s in seq:
if s not in seen:
seen.add(s)
out.append(s)
return out
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
pl = {
"kind": kind, "kind": kind,
"relation": kind, # v2 Feld (alias) "scope": scope,
"scope": scope, # "chunk" | "note"
"source_id": source_id, "source_id": source_id,
"target_id": target_id, "target_id": target_id,
"note_id": note_id, # Träger/Quelle der Kante (aktuelle Note) "rule_id": rule_id,
} }
if extra: if confidence is not None:
pl.update(extra) p["confidence"] = float(confidence)
return pl return p
def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str:
base = f"{kind}:{s}->{t}#{scope}"
if rule_id:
base += f"|{rule_id}"
try:
import hashlib
return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest()
except Exception: # pragma: no cover
return base
# ---------------------- Typen-Registry (types.yaml) ------------------------ # ------------------------------
# Inline [[wikilink]] parser
# ------------------------------
_WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
def _env(n: str, default: Optional[str] = None) -> str: def _iter_wikilinks(text: str) -> Iterable[str]:
v = os.getenv(n) for m in _WIKILINK_RE.finditer(text):
return v if v is not None else (default or "" ) yield m.group(1).strip()
def _load_types_registry() -> dict:
"""Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml"""
p = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
if not os.path.isfile(p) or yaml is None:
return {}
try:
with open(p, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
return data
except Exception:
return {}
def _get_types_map(reg: dict) -> dict: # ------------------------------
if isinstance(reg, dict) and isinstance(reg.get("types"), dict): # Callout parser
return reg["types"] # Syntax:
return reg if isinstance(reg, dict) else {} # > [!edge] related_to: [[Vector DB Basics]] [[Embeddings 101]]
# Mehrere Ziele pro Zeile erlaubt.
# ------------------------------
_CALLOUT_RE = re.compile(
r"^\s*>\s*\[!edge\]\s*([a-z_]+)\s*:\s*(.+)$",
flags=re.IGNORECASE,
)
def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: def _parse_callout_line(line: str) -> Optional[Tuple[str, List[str]]]:
""" m = _CALLOUT_RE.match(line)
Liefert die edge_defaults-Liste für den gegebenen Notiztyp. if not m:
Fallback-Reihenfolge: return None
1) reg['types'][note_type]['edge_defaults'] relation = m.group(1).strip().lower()
2) reg['defaults']['edge_defaults'] (oder 'default'/'global') rhs = m.group(2)
3) [] targets = [t.strip() for t in _WIKILINK_RE.findall(rhs) if t.strip()]
""" if not targets:
types_map = _get_types_map(reg) return None
# 1) exakter Typ return (relation, targets)
if note_type and isinstance(types_map, dict):
t = types_map.get(note_type)
if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list):
return [str(x) for x in t["edge_defaults"] if isinstance(x, str)]
# 2) Fallback
for key in ("defaults", "default", "global"):
v = reg.get(key)
if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list):
return [str(x) for x in v["edge_defaults"] if isinstance(x, str)]
# 3) leer
return []
# ------------------------ Parser für Links ---------------------------------
# Normale Wikilinks (Fallback) # ------------------------------
_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") # Defaults aus types.yaml anwenden (wenn konfiguriert)
# types_cfg Beispiel:
# { "types": { "project": { "edge_defaults": ["references","depends_on"] }, ... } }
# ------------------------------
def _edge_defaults_for_type(types_cfg: Dict, note_type: str) -> List[str]:
tdef = (types_cfg or {}).get("types", {}).get(note_type, {})
vals = tdef.get("edge_defaults") or []
return [str(v).strip().lower() for v in vals if str(v).strip()]
# Getypte Inline-Relationen:
# [[rel:depends_on | Target]]
# [[rel:related_to Target]]
_REL_PIPE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s*\|\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
_REL_SPACE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s+(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: # ------------------------------
""" # Hauptfunktion: Edges ableiten
Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, # Erwartete Inputs:
damit die generische Wikilink-Erkennung sie nicht doppelt zählt. # note: { "note_id","title","type","text", ... }
""" # chunks: [ { "chunk_id","note_id","index","ord","text","window", ... }, ... ]
pairs: List[Tuple[str,str]] = [] # types_cfg: geladene types.yaml als Dict
def _collect(m): # ------------------------------
k = (m.group("kind") or "").strip().lower() def derive_edges(
t = (m.group("target") or "").strip() note: Dict,
if k and t: chunks: List[Dict],
pairs.append((k, t)) types_cfg: Optional[Dict] = None,
return "" # löschen ) -> List[Dict]:
text = _REL_PIPE.sub(_collect, text) note_id = note.get("note_id") or note.get("id")
text = _REL_SPACE.sub(_collect, text) note_title = note.get("title") or ""
return pairs, text note_type = (note.get("type") or "").strip().lower()
text = note.get("text") or ""
# ---- Obsidian Callout Parser ---------------------------------------------- edges: List[Dict] = []
# Callout-Start erkennt Zeilen wie: > [!edge] ... (case-insensitive)
_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE)
# Innerhalb von Callouts erwarten wir je Zeile Muster wie: # 1) Sequenz-Edges je Note: belongs_to / next / prev
# related_to: [[Vector DB Basics]] for i, ch in enumerate(chunks):
# depends_on: [[A]], [[B]] cid = ch.get("chunk_id")
# similar_to: Qdrant Vektordatenbank # belongs_to
_REL_LINE = re.compile(r"^(?P<kind>[a-z_]+)\s*:\s*(?P<targets>.+?)\s*$", re.IGNORECASE) edges.append(
_edge_payload(
note_id=note_id,
chunk_id=cid,
kind="belongs_to",
source_id=cid,
target_id=note_id,
rule_id="structure:v1:belongs_to",
scope="chunk",
)
)
# next/prev
if i + 1 < len(chunks):
nxt = chunks[i + 1]["chunk_id"]
edges.append(
_edge_payload(
note_id=note_id,
chunk_id=cid,
kind="next",
source_id=cid,
target_id=nxt,
rule_id="structure:v1:next",
scope="chunk",
)
)
if i - 1 >= 0:
prv = chunks[i - 1]["chunk_id"]
edges.append(
_edge_payload(
note_id=note_id,
chunk_id=cid,
kind="prev",
source_id=cid,
target_id=prv,
rule_id="structure:v1:prev",
scope="chunk",
)
)
_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") # 2) Inline-Wikilinks ([[Title]]) => references (note-scope + chunk-scope)
# - chunk-scope: pro Chunk in dessen Text/Window
def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: # - note-scope: Gesamttext der Note
""" # Hinweis: target_id wird hier als Titel gespeichert; später kann ein Resolver auf note_id mappen.
Findet Obsidian-Callouts vom Typ [!edge] und extrahiert (kind, target). # chunk-scope
Entfernt den gesamten Callout-Block aus dem Text, damit Wikilinks daraus
nicht zusätzlich als "references" gezählt werden.
"""
if not text:
return [], text
lines = text.splitlines()
out_pairs: List[Tuple[str,str]] = []
keep_lines: List[str] = []
i = 0
while i < len(lines):
m = _CALLOUT_START.match(lines[i])
if not m:
keep_lines.append(lines[i])
i += 1
continue
# Wir sind in einem Callout-Block; erste Zeile nach dem Marker:
# Rest dieser Zeile nach [!edge] mitnehmen
block_lines: List[str] = []
first_rest = m.group(1) or ""
if first_rest.strip():
block_lines.append(first_rest)
# Folgezeilen sind Teil des Callouts, solange sie weiterhin mit '>' beginnen
i += 1
while i < len(lines) and lines[i].lstrip().startswith('>'):
# Entferne führendes '>' und evtl. Leerzeichen
block_lines.append(lines[i].lstrip()[1:].lstrip())
i += 1
# Parse jede Blockzeile eigenständig
for bl in block_lines:
mrel = _REL_LINE.match(bl)
if not mrel:
continue
kind = (mrel.group("kind") or "").strip().lower()
targets = mrel.group("targets") or ""
# Wikilinks bevorzugt
found = _WIKILINKS_IN_LINE.findall(targets)
if found:
for t in found:
t = t.strip()
if t:
out_pairs.append((kind, t))
else:
# Fallback: Split per ',' oder ';'
for raw in re.split(r"[,;]", targets):
t = raw.strip()
if t:
out_pairs.append((kind, t))
# Wichtig: Callout wird NICHT in keep_lines übernommen (entfernt)
continue
remainder = "\n".join(keep_lines)
return out_pairs, remainder
def _extract_wikilinks(text: str) -> List[str]:
ids: List[str] = []
for m in _WIKILINK_RE.finditer(text or ""):
ids.append(m.group(1).strip())
return ids
# --------------------------- Hauptfunktion ---------------------------------
def build_edges_for_note(
note_id: str,
chunks: List[dict],
note_level_references: Optional[List[str]] = None,
include_note_scope_refs: bool = False,
) -> List[dict]:
"""
Erzeugt Kanten für eine Note.
- belongs_to: für jeden Chunk (chunk -> note)
- next / prev: zwischen aufeinanderfolgenden Chunks
- references: pro Chunk aus window/text (via Wikilinks)
- typed inline relations: [[rel:KIND | Target]] oder [[rel:KIND Target]]
- Obsidian Callouts: > [!edge] KIND: [[Target]]
- optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references
- typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz
"""
edges: List[dict] = []
# --- 0) Note-Typ ermitteln (aus erstem Chunk erwartet) ---
note_type = None
if chunks:
note_type = _get(chunks[0], "type")
# --- 1) belongs_to ---
for ch in chunks: for ch in chunks:
cid = _get(ch, "chunk_id", "id") cid = ch.get("chunk_id")
if not cid: body = (ch.get("window") or ch.get("text") or "")
continue touched = False
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { for tgt in _iter_wikilinks(body):
"chunk_id": cid, touched = True
"edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), edges.append(
"provenance": "rule", _edge_payload(
"rule_id": "structure:belongs_to:v1", note_id=note_id,
"confidence": 1.0, chunk_id=cid,
})) kind="references",
source_id=cid,
target_id=tgt, # Titel
rule_id="inline:rel:v1:references",
scope="chunk",
confidence=0.8,
)
)
# Optional: wenn in einem Chunk Wikilinks vorkamen, kannst du (später) einen counter o. ä. setzen.
_ = touched
# --- 2) next/prev --- # note-scope (Gesamttext)
for i in range(len(chunks) - 1): for tgt in _iter_wikilinks(text):
a, b = chunks[i], chunks[i + 1] edges.append(
a_id = _get(a, "chunk_id", "id") _edge_payload(
b_id = _get(b, "chunk_id", "id") note_id=note_id,
if not a_id or not b_id: chunk_id=None,
continue kind="references",
edges.append(_edge("next", "chunk", a_id, b_id, note_id, { source_id=note_id,
"chunk_id": a_id, target_id=tgt, # Titel
"edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"), rule_id="explicit:ref:v1:wikilink",
"provenance": "rule", scope="note",
"rule_id": "structure:order:v1", confidence=0.8,
"confidence": 0.95, )
})) )
edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {
"chunk_id": b_id,
"edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"),
"provenance": "rule",
"rule_id": "structure:order:v1",
"confidence": 0.95,
}))
# --- 3) references (chunk-scope) + inline relations + callouts + abgeleitete Relationen je Ref ---
reg = _load_types_registry()
defaults = _edge_defaults_for(note_type, reg)
refs_all: List[str] = []
# 3) Callouts:
# > [!edge] related_to: [[A]] [[B]]
# ⇒ pro Ziel A/B je ein Edge mit rule_id="callout:edge:v1:<relation>"
for ch in chunks: for ch in chunks:
cid = _get(ch, "chunk_id", "id") cid = ch.get("chunk_id")
if not cid: body = (ch.get("window") or ch.get("text") or "")
continue for line in body.splitlines():
raw = _chunk_text_for_refs(ch) parsed = _parse_callout_line(line)
if not parsed:
# a) typed inline relations zuerst extrahieren
typed, remainder = _extract_typed_relations(raw)
for kind, target in typed:
edges.append(_edge(kind, "chunk", cid, target, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel:v1"),
"provenance": "explicit",
"rule_id": "inline:rel:v1",
"confidence": 0.95,
}))
# symmetrische Relationen zusätzlich rückwärts
if kind in {"related_to", "similar_to"}:
edges.append(_edge(kind, "chunk", target, cid, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel:v1"),
"provenance": "explicit",
"rule_id": "inline:rel:v1",
"confidence": 0.95,
}))
# b) Obsidian Callouts extrahieren (und aus remainder entfernen)
call_pairs, remainder2 = _extract_callout_relations(remainder)
for kind, target in call_pairs:
k = (kind or "").strip().lower()
if not k or not target:
continue continue
edges.append(_edge(k, "chunk", cid, target, note_id, { relation, targets = parsed
"chunk_id": cid, # normalize relation name
"edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge:v1"), relation = relation.lower()
"provenance": "explicit", # einheitliches Rule-Tagging für Callouts:
"rule_id": "callout:edge:v1", rule_tag = f"callout:edge:v1:{relation}"
"confidence": 0.95, for tgt in targets:
})) edges.append(
if k in {"related_to", "similar_to"}: _edge_payload(
edges.append(_edge(k, "chunk", target, cid, note_id, { note_id=note_id,
"chunk_id": cid, chunk_id=cid,
"edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge:v1"), kind=relation,
"provenance": "explicit", source_id=cid,
"rule_id": "callout:edge:v1", target_id=tgt, # Titel
"confidence": 0.95, rule_id=rule_tag,
})) scope="chunk",
confidence=0.7,
)
)
# c) generische Wikilinks (remainder2) → "references" # 4) Ableitungs-Edges (edge_defaults) aus types.yaml
refs = _extract_wikilinks(remainder2) # Beispiel: project -> ["references","depends_on"]
for r in refs: # Für jede Chunk-Einheit eine schwach gewichtete Default-Beziehung gegen den Note-Titel,
# reale Referenz (wie bisher) # damit es als Navigationskanten funktioniert, bis ein Resolver Titeleindeutigkeit herstellt.
edges.append(_edge("references", "chunk", cid, r, note_id, { defaults = _edge_defaults_for_type(types_cfg or {}, note_type)
"chunk_id": cid, if defaults:
"ref_text": r, rule_prefix = f"edge_defaults:{note_type}"
"edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink:v1"), for ch in chunks:
"provenance": "explicit", cid = ch.get("chunk_id")
"rule_id": "explicit:wikilink:v1",
"confidence": 1.0,
}))
# abgeleitete Kanten je default-Relation
for rel in defaults: for rel in defaults:
if rel == "references": edges.append(
continue # doppelt vermeiden _edge_payload(
edges.append(_edge(rel, "chunk", cid, r, note_id, { note_id=note_id,
"chunk_id": cid, chunk_id=cid,
"edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}:v1"), kind=rel,
"provenance": "rule", source_id=cid,
"rule_id": f"edge_defaults:{note_type}:{rel}:v1", target_id=note_title or note_id, # weiche Zielmarke
"confidence": 0.7, rule_id=f"{rule_prefix}:{rel}",
})) scope="chunk",
# symmetrisch? confidence=0.7,
if rel in {"related_to", "similar_to"}: )
edges.append(_edge(rel, "chunk", r, cid, note_id, { )
"chunk_id": cid,
"edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}:v1"),
"provenance": "rule",
"rule_id": f"edge_defaults:{note_type}:{rel}:v1",
"confidence": 0.7,
}))
refs_all.extend(refs)
# --- 4) optional: note-scope references/backlinks (+ defaults) --- # 5) De-Duplizierung (idempotent): key = (source_id, target_id, kind, rule_id)
if include_note_scope_refs: unique: Dict[Tuple[str, str, str, str], Dict] = {}
refs_note = list(refs_all or [])
if note_level_references:
refs_note.extend([r for r in note_level_references if isinstance(r, str) and r])
refs_note = _dedupe_seq(refs_note)
for r in refs_note:
# echte note-scope Referenz & Backlink (wie bisher)
edges.append(_edge("references", "note", note_id, r, note_id, {
"edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope:v1"),
"provenance": "explicit",
"rule_id": "explicit:note_scope:v1",
"confidence": 1.0,
}))
edges.append(_edge("backlink", "note", r, note_id, note_id, {
"edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink:v1"),
"provenance": "rule",
"rule_id": "derived:backlink:v1",
"confidence": 0.9,
}))
# und zusätzlich default-Relationen (note-scope)
for rel in defaults:
if rel == "references":
continue
edges.append(_edge(rel, "note", note_id, r, note_id, {
"edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}:v1"),
"provenance": "rule",
"rule_id": f"edge_defaults:{note_type}:{rel}:v1",
"confidence": 0.7,
}))
if rel in {"related_to", "similar_to"}:
edges.append(_edge(rel, "note", r, note_id, note_id, {
"edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}:v1"),
"provenance": "rule",
"rule_id": f"edge_defaults:{note_type}:{rel}:v1",
"confidence": 0.7,
}))
# --- 5) Dedupe (Schlüssel: source_id, target_id, relation, rule_id) ---
seen: Set[Tuple[str,str,str,str]] = set()
out: List[dict] = []
for e in edges: for e in edges:
s = str(e.get("source_id") or "") k = (e["source_id"], e["target_id"], e["kind"], e["rule_id"])
t = str(e.get("target_id") or "") unique[k] = e
rel = str(e.get("relation") or e.get("kind") or "edge") return list(unique.values())
rule = str(e.get("rule_id") or "")
key = (s, t, rel, rule)
if key in seen:
continue
seen.add(key)
out.append(e)
return out