From 95b59e9b0a72037ce1895dc3f43434ad2289282e Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 17 Nov 2025 11:09:05 +0100 Subject: [PATCH] app/core/derive_edges.py aktualisiert --- app/core/derive_edges.py | 341 ++++++++++++++++++++------------------- 1 file changed, 172 insertions(+), 169 deletions(-) diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index e992ee5..394d50c 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,42 +1,96 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -app/core/derive_edges.py -Mindnet V2 — Edge-Ableitung (real + defaults), idempotent +Modul: app/core/derive_edges.py +Version: 1.5.0 (Mindnet V2) +Status: Stable -Erzeugt Kanten für eine Note aus: -1) Sequenzkanten pro Chunk: belongs_to, next, prev -2) Reale Referenzen aus Chunk-Text (Markdown-Links, Wikilinks) + optional Frontmatter-Refs -3) Abgeleitete Kanten je Typ-Regel (types.yaml.edge_defaults), z. B. additional relations wie "depends_on", "related_to" - - Regel-Tagging via rule_id="edge_defaults::" - - De-Dupe via Key: (source_id, target_id, relation, rule_id) +Ziele +----- +1) Beibehalten der bewährten Edge-Ableitung: + - belongs_to (chunk -> note) + - next / prev (Chunk-Kette) + - references (chunk-scope) aus Chunk.window/text via `extract_wikilinks` -Edge-Payload-Minimum: -- relation (alias: kind) -- note_id (Quelle; also die ID der Note, zu der die Chunks gehören) -- source_id (Chunk-ID oder Note-ID, je nach scope) -- target_id (Note-/Slug-/URL-ID; deterministisch normalisiert) -- chunk_id (falls scope='chunk') -- scope: 'chunk'|'note' -- confidence: float (bei abgeleitet z. B. 0.7) -- rule_id: str | None +2) Ergänzung: typenbasierte, abgeleitete Kanten aus `config/types.yaml`: + - Für jede gefundene Referenz werden zusätzliche Relationen aus + `edge_defaults` des Notiztyps erzeugt (z. B. "depends_on", "related_to"). + - Optional symmetrische Relationen (z. B. "related_to", "similar_to"). + - Dedupe bleibt kompatibel (Key: kind, source_id, target_id, scope). + +Hinweise +-------- +- Es werden keine Markdown-Links neu geparst; wir bleiben bei der + vorhandenen Parser-Logik (`extract_wikilinks`) zur Sicherung der Kompatibilität. +- `edge_defaults` werden sowohl für Chunk-scope-Referenzen als auch – falls + `include_note_scope_refs=True` – für Note-scope-Referenzen angewendet. """ from __future__ import annotations -from typing import Any, Dict, Iterable, List, Optional, Tuple -import os, re, yaml, hashlib +from typing import Dict, List, Optional, Iterable, Set -# ---------------- Registry Laden ---------------- +import os +import yaml -def _env(n: str, d: Optional[str]=None) -> str: +# Wikilinks-Parser beibehalten (Kompatibilität!) +from app.core.parser import extract_wikilinks + + +# ---------------------------- Utilities ------------------------------------ + +def _get(d: dict, *keys, default=None): + for k in keys: + if k in d and d[k] is not None: + return d[k] + return default + +def _chunk_text_for_refs(chunk: dict) -> str: + # bevorzugt 'window' → dann 'text' → 'content' → 'raw' + return ( + _get(chunk, "window") + or _get(chunk, "text") + or _get(chunk, "content") + or _get(chunk, "raw") + or "" + ) + +def _dedupe(seq: Iterable[str]) -> List[str]: + seen: Set[str] = set() + out: List[str] = [] + for s in seq: + if s not in seen: + seen.add(s) + out.append(s) + return out + +def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: + pl = { + "kind": kind, + "scope": scope, # "chunk" | "note" + "source_id": source_id, + "target_id": target_id, + "note_id": note_id, # Träger/Quelle der Kante (aktuelle Note) + } + if extra: + pl.update(extra) + return pl + + +# ---------------------- Typen-Registry (types.yaml) ------------------------ + +SYM_REL = {"related_to", "similar_to"} # symmetrische Relationstypen + +def _env(n: str, default: Optional[str] = None) -> str: v = os.getenv(n) - return v if v is not None else (d or "") + return v if v is not None else (default or "") -def _load_types() -> dict: +def _load_types_registry() -> dict: + """Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml""" p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") try: with open(p, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} + data = yaml.safe_load(f) or {} + return data except Exception: return {} @@ -45,166 +99,115 @@ def _get_types_map(reg: dict) -> dict: return reg["types"] return reg if isinstance(reg, dict) else {} -def _edge_defaults_for(note_type: str, reg: dict) -> List[str]: - m = _get_types_map(reg) - if isinstance(m, dict): - t = m.get(note_type) or {} - if isinstance(t, dict): - vals = t.get("edge_defaults") - if isinstance(vals, list): - return [str(x) for x in vals if isinstance(x, (str,))] +def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: + """ + Liefert die edge_defaults-Liste für den gegebenen Notiztyp. + Fallback-Reihenfolge: + 1) reg['types'][note_type]['edge_defaults'] + 2) reg['defaults']['edge_defaults'] (oder 'default'/'global') + 3) [] + """ + types_map = _get_types_map(reg) + # 1) exakter Typ + if note_type and isinstance(types_map, dict): + t = types_map.get(note_type) + if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): + return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] + # 2) Fallback + for key in ("defaults", "default", "global"): + v = reg.get(key) + if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): + return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] + # 3) leer return [] -# ---------------- Utils ---------------- -SYM_REL = {"related_to", "similar_to"} # symmetrische Relationen +# --------------------------- Hauptfunktion --------------------------------- -def _slug_id(s: str) -> str: - s = (s or "").strip().lower() - s = re.sub(r"\s+", "-", s) - s = re.sub(r"[^\w\-:/#\.]", "", s) # lasse urls, hashes rudimentär zu - if not s: - s = "ref" - return s - -def _mk_edge_id(source_id: str, relation: str, target_id: str, rule_id: Optional[str]) -> str: - base = f"{source_id}|{relation}|{target_id}|{rule_id or ''}" - h = hashlib.sha1(base.encode("utf-8")).hexdigest()[:16] - return f"e_{h}" - -def _add(edge_list: List[Dict[str, Any]], - dedupe: set, - note_id: str, - source_id: str, - relation: str, - target_id: str, - *, - chunk_id: Optional[str] = None, - scope: str = "chunk", - confidence: Optional[float] = None, - rule_id: Optional[str] = None) -> None: - key = (source_id, target_id, relation, rule_id or "") - if key in dedupe: - return - dedupe.add(key) - payload = { - "edge_id": _mk_edge_id(source_id, relation, target_id, rule_id), - "note_id": note_id, - "kind": relation, # alias - "relation": relation, - "scope": scope, - "source_id": source_id, - "target_id": target_id, - } - if chunk_id: - payload["chunk_id"] = chunk_id - if confidence is not None: - payload["confidence"] = float(confidence) - if rule_id is not None: - payload["rule_id"] = rule_id - edge_list.append(payload) - -# ---------------- Refs Parsen ---------------- - -MD_LINK = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") # [text](target) -WIKI_LINK = re.compile(r"\[\[([^|\]]+)(?:\|[^]]+)?\]\]") # [[Title]] oder [[Title|alias]] - -def _extract_refs(text: str) -> List[Tuple[str, str]]: - """liefert Liste (label, target) – target kann URL, Title, etc. sein""" - out: List[Tuple[str,str]] = [] - if not text: - return out - for m in MD_LINK.finditer(text): - label = (m.group(1) or "").strip() - tgt = (m.group(2) or "").strip() - out.append((label, tgt)) - for m in WIKI_LINK.finditer(text): - title = (m.group(1) or "").strip() - out.append((title, title)) - return out - -# ---------------- Haupt-API ---------------- - -def build_edges_for_note(*, - note_id: str, - chunk_payloads: List[Dict[str, Any]], - note_level_refs: Optional[List[Dict[str, Any]]] = None, - include_note_scope_refs: bool = False) -> List[Dict[str, Any]]: +def build_edges_for_note( + note_id: str, + chunks: List[dict], + note_level_references: Optional[List[str]] = None, + include_note_scope_refs: bool = False, +) -> List[dict]: """ - Baut alle Kanten für eine Note. - - Sequenzkanten (belongs_to, next, prev) - - Referenzen aus Chunk-Text (scope=chunk) - - Abgeleitete Kanten gemäß edge_defaults aus types.yaml (für jede gefundene Referenz) + Erzeugt Kanten für eine Note. + + - belongs_to: für jeden Chunk (chunk -> note) + - next / prev: zwischen aufeinanderfolgenden Chunks + - references: pro Chunk aus window/text (via extract_wikilinks) + - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references + - NEU: typenbasierte, abgeleitete Kanten (edge_defaults) je gefundener Referenz """ + edges: List[dict] = [] + + # --- 0) Note-Typ ermitteln (aus erstem Chunk erwartet) --- note_type = None - if chunk_payloads: - note_type = chunk_payloads[0].get("type") - reg = _load_types() - defaults = _edge_defaults_for(note_type or "concept", reg) + if chunks: + note_type = _get(chunks[0], "type") - edges: List[Dict[str, Any]] = [] - seen = set() + # --- 1) belongs_to --- + for ch in chunks: + cid = _get(ch, "chunk_id", "id") + if not cid: + continue + edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {"chunk_id": cid})) - # 1) Sequenzkanten - for ch in chunk_payloads: - cid = ch.get("chunk_id") or ch.get("id") - nid = ch.get("note_id") or note_id - idx = ch.get("index") - # belongs_to - _add(edges, seen, note_id=nid, source_id=cid, relation="belongs_to", - target_id=nid, chunk_id=cid, scope="chunk") - # next/prev - for nb, rel in ((ch.get("neighbors_next"), "next"), (ch.get("neighbors_prev"), "prev")): - if not nb: - continue - # neighbors sind Listen - items = nb if isinstance(nb, list) else [nb] - for tid in items: - _add(edges, seen, note_id=nid, source_id=cid, relation=rel, - target_id=tid, chunk_id=cid, scope="chunk") + # --- 2) next/prev --- + for i in range(len(chunks) - 1): + a, b = chunks[i], chunks[i + 1] + a_id = _get(a, "chunk_id", "id") + b_id = _get(b, "chunk_id", "id") + if not a_id or not b_id: + continue + edges.append(_edge("next", "chunk", a_id, b_id, note_id, {"chunk_id": a_id})) + edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {"chunk_id": b_id})) - # 2) Refs aus Chunk-Text (+ derived edges je ref) - for ch in chunk_payloads: - cid = ch.get("chunk_id") or ch.get("id") - nid = ch.get("note_id") or note_id - text = ch.get("text") or "" - for (label, tgt) in _extract_refs(text): - target_id = _slug_id(tgt) - # real reference - _add(edges, seen, note_id=nid, source_id=cid, relation="references", - target_id=target_id, chunk_id=cid, scope="chunk") - # defaults amplification + # --- 3) references (chunk-scope) + abgeleitete Relationen je Ref --- + reg = _load_types_registry() + defaults = _edge_defaults_for(note_type, reg) + refs_all: List[str] = [] + + for ch in chunks: + cid = _get(ch, "chunk_id", "id") + if not cid: + continue + txt = _chunk_text_for_refs(ch) + refs = extract_wikilinks(txt) # Parser-Logik nicht verändert + for r in refs: + # reale Referenz (wie bisher) + edges.append(_edge("references", "chunk", cid, r, note_id, {"chunk_id": cid, "ref_text": r})) + # abgeleitete Kanten je default-Relation for rel in defaults: if rel == "references": - continue - rule = f"edge_defaults:{note_type}:{rel}" - _add(edges, seen, note_id=nid, source_id=cid, relation=rel, - target_id=target_id, chunk_id=cid, scope="chunk", - confidence=0.7, rule_id=rule) + continue # doppelt vermeiden + edges.append(_edge(rel, "chunk", cid, r, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) # symmetrisch? - if rel in SYM_REL: - _add(edges, seen, note_id=nid, source_id=target_id, relation=rel, - target_id=cid, chunk_id=cid, scope="chunk", - confidence=0.7, rule_id=rule) + if rel in {"related_to", "similar_to"}: + edges.append(_edge(rel, "chunk", r, cid, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) + refs_all.extend(refs) - # 3) optionale Note-Scope-Refs aus Frontmatter (falls geliefert) - note_level_refs = note_level_refs or [] - if include_note_scope_refs and note_level_refs: - nid = note_id - for r in note_level_refs: - tgt = (r or {}).get("target_id") or (r or {}).get("target") or "" - if not tgt: - continue - target_id = _slug_id(str(tgt)) - _add(edges, seen, note_id=nid, source_id=nid, relation="references", - target_id=target_id, chunk_id=None, scope="note") + # --- 4) optional: note-scope references/backlinks (+ defaults) --- + if include_note_scope_refs: + refs_note = refs_all[:] + if note_level_references: + refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) + refs_note = _dedupe(refs_note) + for r in refs_note: + # echte note-scope Referenz & Backlink (wie bisher) + edges.append(_edge("references", "note", note_id, r, note_id)) + edges.append(_edge("backlink", "note", r, note_id, note_id)) + # und zusätzlich default-Relationen (note-scope) for rel in defaults: if rel == "references": continue - rule = f"edge_defaults:{note_type}:{rel}" - _add(edges, seen, note_id=nid, source_id=nid, relation=rel, - target_id=target_id, chunk_id=None, scope="note", - confidence=0.7, rule_id=rule) - - return edges + edges.append(_edge(rel, "note", note_id, r, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) + if rel in {"related_to", "similar_to"}: + edges.append(_edge(rel, "note", r, note_id, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7})) + # --- 5) Dedupe (unverändert kompatibel) --- + dedup = {} + for e in edges: + k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", "")) + dedup[k] = e + return list(dedup.values())