From 6f0b46348965cda89aa8c05423b3ee227a9726dc Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 17 Nov 2025 12:21:28 +0100 Subject: [PATCH] app/core/derive_edges.py aktualisiert --- app/core/derive_edges.py | 229 ++++++++++++++++++++++++++++++--------- 1 file changed, 178 insertions(+), 51 deletions(-) diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 394d50c..e9126c8 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -2,39 +2,36 @@ # -*- coding: utf-8 -*- """ Modul: app/core/derive_edges.py -Version: 1.5.0 (Mindnet V2) -Status: Stable +Version: 2.1.0 (V2-superset mit "typed inline relations") -Ziele +Zweck ----- -1) Beibehalten der bewährten Edge-Ableitung: - - belongs_to (chunk -> note) - - next / prev (Chunk-Kette) - - references (chunk-scope) aus Chunk.window/text via `extract_wikilinks` +Bewahrt die bestehende Edgelogik (belongs_to, prev/next, references, backlink) +und ergänzt: +- Typ-Default-Kanten gemäß config/types.yaml (edge_defaults je Notiztyp) +- **Explizite, getypte Inline-Relationen** direkt im Chunk-Text: + * [[rel:depends_on | Target Title]] + * [[rel:related_to Target Title]] + (beides wird erkannt; Groß-/Kleinschreibung egal) -2) Ergänzung: typenbasierte, abgeleitete Kanten aus `config/types.yaml`: - - Für jede gefundene Referenz werden zusätzliche Relationen aus - `edge_defaults` des Notiztyps erzeugt (z. B. "depends_on", "related_to"). - - Optional symmetrische Relationen (z. B. "related_to", "similar_to"). - - Dedupe bleibt kompatibel (Key: kind, source_id, target_id, scope). +Konfiguration +------------- +- ENV MINDNET_TYPES_FILE (Default: ./config/types.yaml) -Hinweise --------- -- Es werden keine Markdown-Links neu geparst; wir bleiben bei der - vorhandenen Parser-Logik (`extract_wikilinks`) zur Sicherung der Kompatibilität. -- `edge_defaults` werden sowohl für Chunk-scope-Referenzen als auch – falls - `include_note_scope_refs=True` – für Note-scope-Referenzen angewendet. +Hinweis +------- +Diese Implementierung ist rückwärtskompatibel zur bisherigen Signatur. """ - from __future__ import annotations -from typing import Dict, List, Optional, Iterable, Set import os -import yaml - -# Wikilinks-Parser beibehalten (Kompatibilität!) -from app.core.parser import extract_wikilinks +import re +from typing import Dict, Iterable, List, Optional, Tuple, Set +try: + import yaml # optional, nur für types.yaml +except Exception: # pragma: no cover + yaml = None # ---------------------------- Utilities ------------------------------------ @@ -54,7 +51,7 @@ def _chunk_text_for_refs(chunk: dict) -> str: or "" ) -def _dedupe(seq: Iterable[str]) -> List[str]: +def _dedupe_seq(seq: Iterable[str]) -> List[str]: seen: Set[str] = set() out: List[str] = [] for s in seq: @@ -66,20 +63,28 @@ def _dedupe(seq: Iterable[str]) -> List[str]: def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: pl = { "kind": kind, - "scope": scope, # "chunk" | "note" + "relation": kind, # v2 Feld + "scope": scope, # "chunk" | "note" "source_id": source_id, "target_id": target_id, - "note_id": note_id, # Träger/Quelle der Kante (aktuelle Note) + "note_id": note_id, # Träger/Quelle der Kante (aktuelle Note) } if extra: pl.update(extra) return pl +def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: + base = f"{kind}:{s}->{t}#{scope}" + if rule_id: + base += f"|{rule_id}" + try: + import hashlib + return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() + except Exception: # pragma: no cover + return base # ---------------------- Typen-Registry (types.yaml) ------------------------ -SYM_REL = {"related_to", "similar_to"} # symmetrische Relationstypen - def _env(n: str, default: Optional[str] = None) -> str: v = os.getenv(n) return v if v is not None else (default or "") @@ -87,6 +92,8 @@ def _env(n: str, default: Optional[str] = None) -> str: def _load_types_registry() -> dict: """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") + if not os.path.isfile(p) or yaml is None: + return {} try: with open(p, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} @@ -121,6 +128,38 @@ def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: # 3) leer return [] +# ------------------------ Parser für Links --------------------------------- + +# Normale Wikilinks (Fallback) +_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") + +# Getypte Inline-Relationen: +# [[rel:depends_on | Target]] +# [[rel:related_to Target]] +_REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) +_REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) + +def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: + """ + Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links, + damit die generische Wikilink-Erkennung sie nicht doppelt zählt. + """ + pairs: List[Tuple[str,str]] = [] + def _collect(m): + k = (m.group("kind") or "").strip().lower() + t = (m.group("target") or "").strip() + if k and t: + pairs.append((k, t)) + return "" # löschen + text = _REL_PIPE.sub(_collect, text) + text = _REL_SPACE.sub(_collect, text) + return pairs, text + +def _extract_wikilinks(text: str) -> List[str]: + ids: List[str] = [] + for m in _WIKILINK_RE.finditer(text or ""): + ids.append(m.group(1).strip()) + return ids # --------------------------- Hauptfunktion --------------------------------- @@ -135,9 +174,10 @@ def build_edges_for_note( - belongs_to: für jeden Chunk (chunk -> note) - next / prev: zwischen aufeinanderfolgenden Chunks - - references: pro Chunk aus window/text (via extract_wikilinks) + - references: pro Chunk aus window/text (via Wikilinks) + - typed inline relations: [[rel:KIND | Target]] oder [[rel:KIND Target]] - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references - - NEU: typenbasierte, abgeleitete Kanten (edge_defaults) je gefundener Referenz + - typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz """ edges: List[dict] = [] @@ -151,7 +191,13 @@ def build_edges_for_note( cid = _get(ch, "chunk_id", "id") if not cid: continue - edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {"chunk_id": cid})) + edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), + "provenance": "rule", + "rule_id": "structure:belongs_to:v1", + "confidence": 1.0, + })) # --- 2) next/prev --- for i in range(len(chunks) - 1): @@ -160,10 +206,22 @@ def build_edges_for_note( b_id = _get(b, "chunk_id", "id") if not a_id or not b_id: continue - edges.append(_edge("next", "chunk", a_id, b_id, note_id, {"chunk_id": a_id})) - edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {"chunk_id": b_id})) + edges.append(_edge("next", "chunk", a_id, b_id, note_id, { + "chunk_id": a_id, + "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"), + "provenance": "rule", + "rule_id": "structure:order:v1", + "confidence": 0.95, + })) + edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { + "chunk_id": b_id, + "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"), + "provenance": "rule", + "rule_id": "structure:order:v1", + "confidence": 0.95, + })) - # --- 3) references (chunk-scope) + abgeleitete Relationen je Ref --- + # --- 3) references (chunk-scope) + typed inline relations + abgeleitete Relationen je Ref --- reg = _load_types_registry() defaults = _edge_defaults_for(note_type, reg) refs_all: List[str] = [] @@ -172,42 +230,111 @@ def build_edges_for_note( cid = _get(ch, "chunk_id", "id") if not cid: continue - txt = _chunk_text_for_refs(ch) - refs = extract_wikilinks(txt) # Parser-Logik nicht verändert + raw = _chunk_text_for_refs(ch) + + # a) typed inline relations zuerst extrahieren + typed, remainder = _extract_typed_relations(raw) + for kind, target in typed: + edges.append(_edge(kind, "chunk", cid, target, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel:v1"), + "provenance": "explicit", + "rule_id": "inline:rel:v1", + "confidence": 0.95, + })) + # symmetrische Relationen zusätzlich rückwärts + if kind in {"related_to", "similar_to"}: + edges.append(_edge(kind, "chunk", target, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel:v1"), + "provenance": "explicit", + "rule_id": "inline:rel:v1", + "confidence": 0.95, + })) + + # b) generische Wikilinks (remainder) → "references" + refs = _extract_wikilinks(remainder) for r in refs: # reale Referenz (wie bisher) - edges.append(_edge("references", "chunk", cid, r, note_id, {"chunk_id": cid, "ref_text": r})) + edges.append(_edge("references", "chunk", cid, r, note_id, { + "chunk_id": cid, + "ref_text": r, + "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink:v1"), + "provenance": "explicit", + "rule_id": "explicit:wikilink:v1", + "confidence": 1.0, + })) # abgeleitete Kanten je default-Relation for rel in defaults: if rel == "references": continue # doppelt vermeiden - edges.append(_edge(rel, "chunk", cid, r, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) + edges.append(_edge(rel, "chunk", cid, r, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) # symmetrisch? if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "chunk", r, cid, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) + edges.append(_edge(rel, "chunk", r, cid, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) refs_all.extend(refs) # --- 4) optional: note-scope references/backlinks (+ defaults) --- if include_note_scope_refs: - refs_note = refs_all[:] + refs_note = list(refs_all or []) if note_level_references: refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) - refs_note = _dedupe(refs_note) + refs_note = _dedupe_seq(refs_note) for r in refs_note: # echte note-scope Referenz & Backlink (wie bisher) - edges.append(_edge("references", "note", note_id, r, note_id)) - edges.append(_edge("backlink", "note", r, note_id, note_id)) + edges.append(_edge("references", "note", note_id, r, note_id, { + "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope:v1"), + "provenance": "explicit", + "rule_id": "explicit:note_scope:v1", + "confidence": 1.0, + })) + edges.append(_edge("backlink", "note", r, note_id, note_id, { + "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink:v1"), + "provenance": "rule", + "rule_id": "derived:backlink:v1", + "confidence": 0.9, + })) # und zusätzlich default-Relationen (note-scope) for rel in defaults: if rel == "references": continue - edges.append(_edge(rel, "note", note_id, r, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) + edges.append(_edge(rel, "note", note_id, r, note_id, { + "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) if rel in {"related_to", "similar_to"}: - edges.append(_edge(rel, "note", r, note_id, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) + edges.append(_edge(rel, "note", r, note_id, note_id, { + "edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}:v1"), + "provenance": "rule", + "rule_id": f"edge_defaults:{note_type}:{rel}:v1", + "confidence": 0.7, + })) - # --- 5) Dedupe (unverändert kompatibel) --- - dedup = {} + # --- 5) Dedupe (Schlüssel: source_id, target_id, relation, rule_id) --- + seen: Set[Tuple[str,str,str,str]] = set() + out: List[dict] = [] for e in edges: - k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", "")) - dedup[k] = e - return list(dedup.values()) + s = str(e.get("source_id") or "") + t = str(e.get("target_id") or "") + rel = str(e.get("relation") or e.get("kind") or "edge") + rule = str(e.get("rule_id") or "") + key = (s, t, rel, rule) + if key in seen: + continue + seen.add(key) + out.append(e) + return out