mindnet/app/core/derive_edges.py
2025-12-15 15:40:39 +01:00

421 lines
16 KiB
Python

"""
FILE: app/core/derive_edges.py
DESCRIPTION: Extrahiert Graph-Kanten aus Text. Unterstützt Wikilinks, Inline-Relations ([[rel:type|target]]) und Obsidian Callouts.
VERSION: 2.0.0
STATUS: Active
DEPENDENCIES: re, os, yaml, typing
EXTERNAL_CONFIG: config/types.yaml
LAST_ANALYSIS: 2025-12-15
"""
from __future__ import annotations
import os
import re
from typing import Iterable, List, Optional, Tuple, Set, Dict
try:
import yaml # optional, nur für types.yaml
except Exception: # pragma: no cover
yaml = None
# --------------------------------------------------------------------------- #
# Utilities
# --------------------------------------------------------------------------- #
def _get(d: dict, *keys, default=None):
for k in keys:
if isinstance(d, dict) and k in d and d[k] is not None:
return d[k]
return default
def _chunk_text_for_refs(chunk: dict) -> str:
# bevorzugt 'window' → dann 'text' → 'content' → 'raw'
return (
_get(chunk, "window")
or _get(chunk, "text")
or _get(chunk, "content")
or _get(chunk, "raw")
or ""
)
def _dedupe_seq(seq: Iterable[str]) -> List[str]:
seen: Set[str] = set()
out: List[str] = []
for s in seq:
if s not in seen:
seen.add(s)
out.append(s)
return out
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
pl = {
"kind": kind,
"relation": kind, # Alias (v2)
"scope": scope, # "chunk" | "note"
"source_id": source_id,
"target_id": target_id,
"note_id": note_id, # Träger-Note der Kante
}
if extra:
pl.update(extra)
return pl
def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str:
base = f"{kind}:{s}->{t}#{scope}"
if rule_id:
base += f"|{rule_id}"
try:
import hashlib
return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest()
except Exception: # pragma: no cover
return base
# --------------------------------------------------------------------------- #
# Typen-Registry (types.yaml)
# --------------------------------------------------------------------------- #
def _env(n: str, default: Optional[str] = None) -> str:
v = os.getenv(n)
return v if v is not None else (default or "")
def _load_types_registry() -> dict:
"""Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml"""
p = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
if not os.path.isfile(p) or yaml is None:
return {}
try:
with open(p, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
return data
except Exception:
return {}
def _get_types_map(reg: dict) -> dict:
if isinstance(reg, dict) and isinstance(reg.get("types"), dict):
return reg["types"]
return reg if isinstance(reg, dict) else {}
def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]:
"""
Liefert die edge_defaults-Liste für den gegebenen Notiztyp.
Fallback-Reihenfolge:
1) reg['types'][note_type]['edge_defaults']
2) reg['defaults']['edge_defaults'] (oder 'default'/'global')
3) []
"""
types_map = _get_types_map(reg)
if note_type and isinstance(types_map, dict):
t = types_map.get(note_type)
if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list):
return [str(x) for x in t["edge_defaults"] if isinstance(x, str)]
for key in ("defaults", "default", "global"):
v = reg.get(key)
if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list):
return [str(x) for x in v["edge_defaults"] if isinstance(x, str)]
return []
# --------------------------------------------------------------------------- #
# Parser für Links / Relationen
# --------------------------------------------------------------------------- #
# Normale Wikilinks (Fallback)
_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]")
# Getypte Inline-Relationen:
# [[rel:KIND | Target]]
# [[rel:KIND Target]]
_REL_PIPE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s*\|\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
_REL_SPACE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s+(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
# rel: KIND [[Target]] (reines Textmuster)
_REL_TEXT = re.compile(r"rel\s*:\s*(?P<kind>[a-z_]+)\s*\[\[\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
"""
Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links,
damit die generische Wikilink-Erkennung sie nicht doppelt zählt.
Unterstützt drei Varianten:
- [[rel:KIND | Target]]
- [[rel:KIND Target]]
- rel: KIND [[Target]]
"""
pairs: List[Tuple[str,str]] = []
def _collect(m):
k = (m.group("kind") or "").strip().lower()
t = (m.group("target") or "").strip()
if k and t:
pairs.append((k, t))
return "" # Link entfernen
text = _REL_PIPE.sub(_collect, text)
text = _REL_SPACE.sub(_collect, text)
text = _REL_TEXT.sub(_collect, text)
return pairs, text
# Obsidian Callout Parser
_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE)
_REL_LINE = re.compile(r"^(?P<kind>[a-z_]+)\s*:\s*(?P<targets>.+?)\s*$", re.IGNORECASE)
_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]")
def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
"""
Findet [!edge]-Callouts und extrahiert (kind, target). Entfernt den gesamten
Callout-Block aus dem Text (damit Wikilinks daraus nicht zusätzlich als
"references" gezählt werden).
"""
if not text:
return [], text
lines = text.splitlines()
out_pairs: List[Tuple[str,str]] = []
keep_lines: List[str] = []
i = 0
while i < len(lines):
m = _CALLOUT_START.match(lines[i])
if not m:
keep_lines.append(lines[i])
i += 1
continue
block_lines: List[str] = []
first_rest = m.group(1) or ""
if first_rest.strip():
block_lines.append(first_rest)
i += 1
while i < len(lines) and lines[i].lstrip().startswith('>'):
block_lines.append(lines[i].lstrip()[1:].lstrip())
i += 1
for bl in block_lines:
mrel = _REL_LINE.match(bl)
if not mrel:
continue
kind = (mrel.group("kind") or "").strip().lower()
targets = mrel.group("targets") or ""
found = _WIKILINKS_IN_LINE.findall(targets)
if found:
for t in found:
t = t.strip()
if t:
out_pairs.append((kind, t))
else:
for raw in re.split(r"[,;]", targets):
t = raw.strip()
if t:
out_pairs.append((kind, t))
# Callout wird NICHT in keep_lines übernommen
continue
remainder = "\n".join(keep_lines)
return out_pairs, remainder
def _extract_wikilinks(text: str) -> List[str]:
ids: List[str] = []
for m in _WIKILINK_RE.finditer(text or ""):
ids.append(m.group(1).strip())
return ids
# --------------------------------------------------------------------------- #
# Hauptfunktion
# --------------------------------------------------------------------------- #
def build_edges_for_note(
note_id: str,
chunks: List[dict],
note_level_references: Optional[List[str]] = None,
include_note_scope_refs: bool = False,
) -> List[dict]:
"""
Erzeugt Kanten für eine Note.
- belongs_to: für jeden Chunk (chunk -> note)
- next / prev: zwischen aufeinanderfolgenden Chunks
- references: pro Chunk aus window/text (via Wikilinks)
- typed inline relations: [[rel:KIND | Target]] / [[rel:KIND Target]] / rel: KIND [[Target]]
- Obsidian Callouts: > [!edge] KIND: [[Target]] [[Target2]]
- optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references
- typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz
"""
edges: List[dict] = []
# Note-Typ (aus erstem Chunk erwartet)
note_type = None
if chunks:
note_type = _get(chunks[0], "type")
# 1) belongs_to
for ch in chunks:
cid = _get(ch, "chunk_id", "id")
if not cid:
continue
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"),
"provenance": "rule",
"rule_id": "structure:belongs_to",
"confidence": 1.0,
}))
# 2) next / prev
for i in range(len(chunks) - 1):
a, b = chunks[i], chunks[i + 1]
a_id = _get(a, "chunk_id", "id")
b_id = _get(b, "chunk_id", "id")
if not a_id or not b_id:
continue
edges.append(_edge("next", "chunk", a_id, b_id, note_id, {
"chunk_id": a_id,
"edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"),
"provenance": "rule",
"rule_id": "structure:order",
"confidence": 0.95,
}))
edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {
"chunk_id": b_id,
"edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"),
"provenance": "rule",
"rule_id": "structure:order",
"confidence": 0.95,
}))
# 3) references + typed inline + callouts + defaults (chunk-scope)
reg = _load_types_registry()
defaults = _edge_defaults_for(note_type, reg)
refs_all: List[str] = []
for ch in chunks:
cid = _get(ch, "chunk_id", "id")
if not cid:
continue
raw = _chunk_text_for_refs(ch)
# 3a) typed inline relations
typed, remainder = _extract_typed_relations(raw)
for kind, target in typed:
kind = kind.strip().lower()
if not kind or not target:
continue
edges.append(_edge(kind, "chunk", cid, target, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel"),
"provenance": "explicit",
"rule_id": "inline:rel",
"confidence": 0.95,
}))
if kind in {"related_to", "similar_to"}:
edges.append(_edge(kind, "chunk", target, cid, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel"),
"provenance": "explicit",
"rule_id": "inline:rel",
"confidence": 0.95,
}))
# 3b) callouts
call_pairs, remainder2 = _extract_callout_relations(remainder)
for kind, target in call_pairs:
k = (kind or "").strip().lower()
if not k or not target:
continue
edges.append(_edge(k, "chunk", cid, target, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"),
"provenance": "explicit",
"rule_id": "callout:edge",
"confidence": 0.95,
}))
if k in {"related_to", "similar_to"}:
edges.append(_edge(k, "chunk", target, cid, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge"),
"provenance": "explicit",
"rule_id": "callout:edge",
"confidence": 0.95,
}))
# 3c) generische Wikilinks → references (+ defaults je Ref)
refs = _extract_wikilinks(remainder2)
for r in refs:
edges.append(_edge("references", "chunk", cid, r, note_id, {
"chunk_id": cid,
"ref_text": r,
"edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"),
"provenance": "explicit",
"rule_id": "explicit:wikilink",
"confidence": 1.0,
}))
for rel in defaults:
if rel == "references":
continue
edges.append(_edge(rel, "chunk", cid, r, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"),
"provenance": "rule",
"rule_id": f"edge_defaults:{note_type}:{rel}",
"confidence": 0.7,
}))
if rel in {"related_to", "similar_to"}:
edges.append(_edge(rel, "chunk", r, cid, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}"),
"provenance": "rule",
"rule_id": f"edge_defaults:{note_type}:{rel}",
"confidence": 0.7,
}))
refs_all.extend(refs)
# 4) optional note-scope refs/backlinks (+ defaults)
if include_note_scope_refs:
refs_note = list(refs_all or [])
if note_level_references:
refs_note.extend([r for r in note_level_references if isinstance(r, str) and r])
refs_note = _dedupe_seq(refs_note)
for r in refs_note:
edges.append(_edge("references", "note", note_id, r, note_id, {
"edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"),
"provenance": "explicit",
"rule_id": "explicit:note_scope",
"confidence": 1.0,
}))
edges.append(_edge("backlink", "note", r, note_id, note_id, {
"edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"),
"provenance": "rule",
"rule_id": "derived:backlink",
"confidence": 0.9,
}))
for rel in defaults:
if rel == "references":
continue
edges.append(_edge(rel, "note", note_id, r, note_id, {
"edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"),
"provenance": "rule",
"rule_id": f"edge_defaults:{note_type}:{rel}",
"confidence": 0.7,
}))
if rel in {"related_to", "similar_to"}:
edges.append(_edge(rel, "note", r, note_id, note_id, {
"edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}"),
"provenance": "rule",
"rule_id": f"edge_defaults:{note_type}:{rel}",
"confidence": 0.7,
}))
# 5) De-Dupe (source_id, target_id, relation, rule_id)
seen: Set[Tuple[str,str,str,str]] = set()
out: List[dict] = []
for e in edges:
s = str(e.get("source_id") or "")
t = str(e.get("target_id") or "")
rel = str(e.get("relation") or e.get("kind") or "edge")
rule = str(e.get("rule_id") or "")
key = (s, t, rel, rule)
if key in seen:
continue
seen.add(key)
out.append(e)
return out