#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Modul: app/core/edges.py Version: 2.0.0 (V2‑superset, rückwärtskompatibel zu v1 vom 2025‑09‑09) Zweck ----- Bewahrt die bestehende Edgelogik (belongs_to, prev/next, references, backlink) und ergänzt V2‑Felder + Typ‑Default‑Kanten gemäß config/types.yaml (edge_defaults). Die Funktion ist **idempotent** und **rückwärtskompatibel** zur bisherigen Signatur. Kompatibilitätsgarantien (gegenüber v1): - **Input**: akzeptiert identische Chunk‑Payloads wie v1: * `id` (Chunk‑ID), `note_id` (Owner), `neighbors.prev|next` (optional), `references: [{target_id: ...}]` (optional), alternativ: `chunk_id`, `chunk_index|ord`, `window|text` - **Output (v1‑Felder)**: `kind`, `source_id`, `target_id`, `scope`, `note_id`, `edge_id` - **Neu (v2‑Felder)**: `relation`, `src_note_id`, `src_chunk_id?`, `dst_note_id`, `dst_chunk_id?`, `provenance` (`explicit|rule`), `rule_id?`, `confidence?` Regeln ------ - Deduplizierungsschlüssel: (source_id, target_id, relation, rule_id) - Strukturkanten: * belongs_to: 1× pro Chunk * next/prev: Sequenz der Chunks; nutzt bevorzugt neighbors; sonst ord/chunk_index - Explizite Referenzen: * aus Chunk: `references[].target_id` (falls vorhanden) * Fallback: Wikilinks in `window|text`: [[Some Title|some-id]] oder [[some-id]] - Note‑Scope: * backlink immer; references nur, wenn include_note_scope_refs=True - Typ‑Defaults (edge_defaults aus config/types.yaml des **Quell‑Notiztyps**): * Für jede explizite Referenz wird je default‑Relation eine Regel‑Kante erzeugt * rule_id: "type_default:{note_type}:{relation}:v1", provenance="rule" Konfiguration ------------- - ENV MINDNET_TYPES_FILE (Default: ./config/types.yaml) Lizenz/Autor ------------ - Erstimplementierung v1 (2025‑09‑09) — Projekt Mindnet - Erweiterung v2 (2025‑11‑11) — kompatible Superset‑Implementierung """ from __future__ import annotations import os import re from typing import Dict, Iterable, List, Optional, Tuple, Set try: import yaml # optional, nur für types.yaml except Exception: # pragma: no cover yaml = None # ------------------------------------------------------------ # Hilfen: types.yaml laden (edge_defaults) # ------------------------------------------------------------ def _types_path() -> str: return os.getenv("MINDNET_TYPES_FILE") or "./config/types.yaml" def _load_types() -> Dict[str, dict]: p = _types_path() if not os.path.isfile(p) or yaml is None: return {} try: with open(p, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict): return data["types"] return data if isinstance(data, dict) else {} except Exception: return {} def _edge_defaults_for(note_type: Optional[str]) -> List[str]: types = _load_types() t = (note_type or "").strip().lower() cfg = types.get(t) or {} defaults = cfg.get("edge_defaults") or [] if isinstance(defaults, str): defaults = [defaults] return [str(x) for x in defaults if isinstance(x, (str, int, float))] # ------------------------------------------------------------ # Wikilink‑Parser (Fallback, wenn ch["references"] fehlt) # ------------------------------------------------------------ _WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") def _extract_wikilinks(text: str) -> List[str]: ids: List[str] = [] for m in _WIKILINK_RE.finditer(text or ""): ids.append(m.group(1).strip()) return ids # ------------------------------------------------------------ # Utility # ------------------------------------------------------------ def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: base = f"{kind}:{s}->{t}#{scope}" if rule_id: base += f"|{rule_id}" try: import hashlib return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() except Exception: # pragma: no cover return base def _dedupe(edges: List[Dict]) -> List[Dict]: seen: Set[Tuple[str,str,str,str]] = set() out: List[Dict] = [] for e in edges: s = str(e.get("source_id") or "") t = str(e.get("target_id") or "") rel = str(e.get("relation") or e.get("kind") or "edge") rule = str(e.get("rule_id") or "") key = (s, t, rel, rule) if key in seen: continue seen.add(key) out.append(e) return out def _first(v: dict, *keys, default=None): for k in keys: if k in v and v[k] is not None: return v[k] return default # ------------------------------------------------------------ # Hauptfunktion # ------------------------------------------------------------ def build_edges_for_note( note_id: str, chunk_payloads: List[Dict], note_level_refs: Optional[List[str]] = None, *, include_note_scope_refs: bool = False, ) -> List[Dict]: edges: List[Dict] = [] chunks = list(chunk_payloads or []) # Notiztyp aus erstem Chunk ableiten (kompatibel zu existierenden Payloads) note_type = (chunks[0].get("type") if chunks else None) or (chunks[0].get("note_type") if chunks else None) # --- Strukturkanten ------------------------------------------------------ # belongs_to for ch in chunks: cid = _first(ch, "id", "chunk_id") if not cid: continue owner = ch.get("note_id") or note_id e = { "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), "kind": "belongs_to", "relation": "belongs_to", "scope": "chunk", "source_id": cid, "target_id": note_id, "note_id": owner, # v1-Kompat # v2 "src_note_id": owner, "src_chunk_id": cid, "dst_note_id": note_id, "provenance": "rule", "rule_id": "structure:belongs_to:v1", "confidence": 1.0, } edges.append(e) # next/prev — bevorzugt neighbors.prev/next; sonst via ord/chunk_index # Map der Chunks nach Index ordered = list(chunks) def _idx(c): return _first(c, "chunk_index", "ord", default=0) ordered.sort(key=_idx) for i, ch in enumerate(ordered): cid = _first(ch, "id", "chunk_id") if not cid: continue owner = ch.get("note_id") or note_id nb = ch.get("neighbors") or {} prev_id = nb.get("prev") next_id = nb.get("next") # Fallback-Reihenfolge if prev_id is None and i > 0: prev_id = _first(ordered[i-1], "id", "chunk_id") if next_id is None and i+1 < len(ordered): next_id = _first(ordered[i+1], "id", "chunk_id") if prev_id: edges.append({ "edge_id": _mk_edge_id("prev", cid, prev_id, "chunk", "structure:order:v1"), "kind": "prev", "relation": "prev", "scope": "chunk", "source_id": cid, "target_id": prev_id, "note_id": owner, "src_note_id": owner, "src_chunk_id": cid, "dst_note_id": owner, "dst_chunk_id": prev_id, "provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95, }) edges.append({ "edge_id": _mk_edge_id("next", prev_id, cid, "chunk", "structure:order:v1"), "kind": "next", "relation": "next", "scope": "chunk", "source_id": prev_id, "target_id": cid, "note_id": owner, "src_note_id": owner, "src_chunk_id": prev_id, "dst_note_id": owner, "dst_chunk_id": cid, "provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95, }) # --- Explizite Referenzen (Chunk‑Scope) --------------------------------- explicit_refs: List[Dict] = [] for ch in chunks: cid = _first(ch, "id", "chunk_id") if not cid: continue owner = ch.get("note_id") or note_id # 1) bevorzugt vorhandene ch["references"] refs = ch.get("references") or [] targets = [r.get("target_id") for r in refs if isinstance(r, dict) and r.get("target_id")] # 2) Fallback: Wikilinks aus Text if not targets: text = _first(ch, "window", "text", default="") or "" targets = _extract_wikilinks(text) for tid in targets: if not isinstance(tid, str) or not tid.strip(): continue e = { "edge_id": _mk_edge_id("references", cid, tid, "chunk"), "kind": "references", "relation": "references", "scope": "chunk", "source_id": cid, "target_id": tid, "note_id": owner, # v2 "src_note_id": owner, "src_chunk_id": cid, "dst_note_id": tid, "provenance": "explicit", "rule_id": "", "confidence": 1.0, } edges.append(e) explicit_refs.append(e) # --- Note‑Scope: references (optional) + backlink (immer) ---------------- unique_refs = [] if note_level_refs: seen = set() for tid in note_level_refs: if isinstance(tid, str) and tid.strip() and tid not in seen: unique_refs.append(tid); seen.add(tid) for tid in unique_refs: if include_note_scope_refs: edges.append({ "edge_id": _mk_edge_id("references", note_id, tid, "note"), "kind": "references", "relation": "references", "scope": "note", "source_id": note_id, "target_id": tid, "note_id": note_id, "src_note_id": note_id, "dst_note_id": tid, "provenance": "explicit", "rule_id": "", "confidence": 1.0, }) edges.append({ "edge_id": _mk_edge_id("backlink", tid, note_id, "note", "derived:backlink:v1"), "kind": "backlink", "relation": "backlink", "scope": "note", "source_id": tid, "target_id": note_id, "note_id": note_id, "src_note_id": tid, "dst_note_id": note_id, "provenance": "rule", "rule_id": "derived:backlink:v1", "confidence": 0.9, }) # --- Type‑Defaults je expliziter Referenz -------------------------------- defaults = [d for d in _edge_defaults_for(note_type) if d and d != "references"] if defaults: for e in explicit_refs + ([ ] if not include_note_scope_refs else []): # wir nutzen die bereits erzeugten explicit‑Edges als Vorlage src = e["source_id"]; tgt = e["target_id"] scope = e.get("scope", "chunk") s_note = e.get("src_note_id") or note_id s_chunk = e.get("src_chunk_id") t_note = e.get("dst_note_id") or tgt for rel in defaults: rule_id = f"type_default:{(note_type or 'unknown')}:{rel}:v1" edges.append({ "edge_id": _mk_edge_id(rel, src, tgt, scope, rule_id), "kind": rel, "relation": rel, "scope": scope, "source_id": src, "target_id": tgt, "note_id": s_note, "src_note_id": s_note, "src_chunk_id": s_chunk, "dst_note_id": t_note, "provenance": "rule", "rule_id": rule_id, "confidence": 0.7, }) # --- Dedupe & Return ----------------------------------------------------- return _dedupe(edges)