#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ app/core/derive_edges.py Mindnet V2 — Edge-Ableitung (real + defaults), idempotent Erzeugt Kanten für eine Note aus: 1) Sequenzkanten pro Chunk: belongs_to, next, prev 2) Reale Referenzen aus Chunk-Text (Markdown-Links, Wikilinks) + optional Frontmatter-Refs 3) Abgeleitete Kanten je Typ-Regel (types.yaml.edge_defaults), z. B. additional relations wie "depends_on", "related_to" - Regel-Tagging via rule_id="edge_defaults::" - De-Dupe via Key: (source_id, target_id, relation, rule_id) Edge-Payload-Minimum: - relation (alias: kind) - note_id (Quelle; also die ID der Note, zu der die Chunks gehören) - source_id (Chunk-ID oder Note-ID, je nach scope) - target_id (Note-/Slug-/URL-ID; deterministisch normalisiert) - chunk_id (falls scope='chunk') - scope: 'chunk'|'note' - confidence: float (bei abgeleitet z. B. 0.7) - rule_id: str | None """ from __future__ import annotations from typing import Any, Dict, Iterable, List, Optional, Tuple import os, re, yaml, hashlib # ---------------- Registry Laden ---------------- def _env(n: str, d: Optional[str]=None) -> str: v = os.getenv(n) return v if v is not None else (d or "") def _load_types() -> dict: p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") try: with open(p, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} except Exception: return {} def _get_types_map(reg: dict) -> dict: if isinstance(reg, dict) and isinstance(reg.get("types"), dict): return reg["types"] return reg if isinstance(reg, dict) else {} def _edge_defaults_for(note_type: str, reg: dict) -> List[str]: m = _get_types_map(reg) if isinstance(m, dict): t = m.get(note_type) or {} if isinstance(t, dict): vals = t.get("edge_defaults") if isinstance(vals, list): return [str(x) for x in vals if isinstance(x, (str,))] return [] # ---------------- Utils ---------------- SYM_REL = {"related_to", "similar_to"} # symmetrische Relationen def _slug_id(s: str) -> str: s = (s or "").strip().lower() s = re.sub(r"\s+", "-", s) s = re.sub(r"[^\w\-:/#\.]", "", s) # lasse urls, hashes rudimentär zu if not s: s = "ref" return s def _mk_edge_id(source_id: str, relation: str, target_id: str, rule_id: Optional[str]) -> str: base = f"{source_id}|{relation}|{target_id}|{rule_id or ''}" h = hashlib.sha1(base.encode("utf-8")).hexdigest()[:16] return f"e_{h}" def _add(edge_list: List[Dict[str, Any]], dedupe: set, note_id: str, source_id: str, relation: str, target_id: str, *, chunk_id: Optional[str] = None, scope: str = "chunk", confidence: Optional[float] = None, rule_id: Optional[str] = None) -> None: key = (source_id, target_id, relation, rule_id or "") if key in dedupe: return dedupe.add(key) payload = { "edge_id": _mk_edge_id(source_id, relation, target_id, rule_id), "note_id": note_id, "kind": relation, # alias "relation": relation, "scope": scope, "source_id": source_id, "target_id": target_id, } if chunk_id: payload["chunk_id"] = chunk_id if confidence is not None: payload["confidence"] = float(confidence) if rule_id is not None: payload["rule_id"] = rule_id edge_list.append(payload) # ---------------- Refs Parsen ---------------- MD_LINK = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") # [text](target) WIKI_LINK = re.compile(r"\[\[([^|\]]+)(?:\|[^]]+)?\]\]") # [[Title]] oder [[Title|alias]] def _extract_refs(text: str) -> List[Tuple[str, str]]: """liefert Liste (label, target) – target kann URL, Title, etc. sein""" out: List[Tuple[str,str]] = [] if not text: return out for m in MD_LINK.finditer(text): label = (m.group(1) or "").strip() tgt = (m.group(2) or "").strip() out.append((label, tgt)) for m in WIKI_LINK.finditer(text): title = (m.group(1) or "").strip() out.append((title, title)) return out # ---------------- Haupt-API ---------------- def build_edges_for_note(*, note_id: str, chunk_payloads: List[Dict[str, Any]], note_level_refs: Optional[List[Dict[str, Any]]] = None, include_note_scope_refs: bool = False) -> List[Dict[str, Any]]: """ Baut alle Kanten für eine Note. - Sequenzkanten (belongs_to, next, prev) - Referenzen aus Chunk-Text (scope=chunk) - Abgeleitete Kanten gemäß edge_defaults aus types.yaml (für jede gefundene Referenz) """ note_type = None if chunk_payloads: note_type = chunk_payloads[0].get("type") reg = _load_types() defaults = _edge_defaults_for(note_type or "concept", reg) edges: List[Dict[str, Any]] = [] seen = set() # 1) Sequenzkanten for ch in chunk_payloads: cid = ch.get("chunk_id") or ch.get("id") nid = ch.get("note_id") or note_id idx = ch.get("index") # belongs_to _add(edges, seen, note_id=nid, source_id=cid, relation="belongs_to", target_id=nid, chunk_id=cid, scope="chunk") # next/prev for nb, rel in ((ch.get("neighbors_next"), "next"), (ch.get("neighbors_prev"), "prev")): if not nb: continue # neighbors sind Listen items = nb if isinstance(nb, list) else [nb] for tid in items: _add(edges, seen, note_id=nid, source_id=cid, relation=rel, target_id=tid, chunk_id=cid, scope="chunk") # 2) Refs aus Chunk-Text (+ derived edges je ref) for ch in chunk_payloads: cid = ch.get("chunk_id") or ch.get("id") nid = ch.get("note_id") or note_id text = ch.get("text") or "" for (label, tgt) in _extract_refs(text): target_id = _slug_id(tgt) # real reference _add(edges, seen, note_id=nid, source_id=cid, relation="references", target_id=target_id, chunk_id=cid, scope="chunk") # defaults amplification for rel in defaults: if rel == "references": continue rule = f"edge_defaults:{note_type}:{rel}" _add(edges, seen, note_id=nid, source_id=cid, relation=rel, target_id=target_id, chunk_id=cid, scope="chunk", confidence=0.7, rule_id=rule) # symmetrisch? if rel in SYM_REL: _add(edges, seen, note_id=nid, source_id=target_id, relation=rel, target_id=cid, chunk_id=cid, scope="chunk", confidence=0.7, rule_id=rule) # 3) optionale Note-Scope-Refs aus Frontmatter (falls geliefert) note_level_refs = note_level_refs or [] if include_note_scope_refs and note_level_refs: nid = note_id for r in note_level_refs: tgt = (r or {}).get("target_id") or (r or {}).get("target") or "" if not tgt: continue target_id = _slug_id(str(tgt)) _add(edges, seen, note_id=nid, source_id=nid, relation="references", target_id=target_id, chunk_id=None, scope="note") for rel in defaults: if rel == "references": continue rule = f"edge_defaults:{note_type}:{rel}" _add(edges, seen, note_id=nid, source_id=nid, relation=rel, target_id=target_id, chunk_id=None, scope="note", confidence=0.7, rule_id=rule) return edges