# app/core/derive_edges.py # -*- coding: utf-8 -*- """ Edge-Builder v2 (explicit + type-default "rule" edges) ----------------------------------------------------- - Extrahiert reale Kanten aus Chunks (Wikilinks) und aus Note-Frontmatter (note_level_refs) - Ergänzt konfigurierbare Ableitungs-Kanten gemäß config/types.yaml.edge_defaults - Liefert *idempotente* Edge-Payloads ohne Duplikate - Payload enthält sowohl v1-Felder (kompatibel zu qdrant_points._normalize_edge_payload) als auch v2-Felder gem. Playbook (src_note_id, dst_note_id, relation, rule_id, provenance, confidence) Konfiguration - Pfad zu der Registry via ENV: MINDNET_TYPES_FILE (Default: ./config/types.yaml) - Struktur (Beispiel): types: concept: retriever_weight: 1.0 chunk_profile: medium edge_defaults: ["references","related_to"] journal: retriever_weight: 0.8 chunk_profile: long edge_defaults: ["references"] Siehe auch: - mindnet_v2_implementation_playbook.md (edge.schema.json, default_edge.schema.json) """ from __future__ import annotations import os import re import json from typing import Dict, Iterable, List, Optional, Tuple, Set try: import yaml # type: ignore except Exception: yaml = None # pragma: no cover # ---- Projekt-Utilities ---- try: from app.core.parser import extract_wikilinks except Exception: # Fallback: Minimaler Wikilink-Parser [[some-id]] oder [[Title|some-id]] WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:.]+)\]\]") def extract_wikilinks(text: str) -> List[Tuple[str, str]]: # (link_text, target_id) links = [] for m in WIKILINK_RE.finditer(text or ""): raw = m.group(0) target = m.group(1) links.append((raw, target)) return links # --------------------------------------------------------------------------- # Registry-Lader # --------------------------------------------------------------------------- def _types_path() -> str: p = os.getenv("MINDNET_TYPES_FILE") or "./config/types.yaml" return p def _load_types() -> Dict[str, dict]: path = _types_path() if not path or not os.path.isfile(path): return {} if yaml is None: return {} try: with open(path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict): return data["types"] return data if isinstance(data, dict) else {} except Exception: return {} def _edge_defaults_for(note_type: Optional[str]) -> List[str]: types = _load_types() t = (note_type or "").strip().lower() cfg = types.get(t) or types.get("concept") or {} defaults = cfg.get("edge_defaults") or [] if isinstance(defaults, str): defaults = [defaults] return [str(x) for x in defaults if isinstance(x, (str, int, float))] # --------------------------------------------------------------------------- # Edge-Erzeugung # --------------------------------------------------------------------------- def _dedupe(edges: List[Dict]) -> List[Dict]: """De-dupliziere anhand (source_id, target_id, relation, rule_id).""" seen: Set[Tuple[str, str, str, str]] = set() out: List[Dict] = [] for e in edges: s = str(e.get("source_id") or e.get("src_note_id") or "") t = str(e.get("target_id") or e.get("dst_note_id") or "") rel = str(e.get("relation") or e.get("kind") or "edge") rule = str(e.get("rule_id") or "") key = (s, t, rel, rule) if key in seen: continue seen.add(key) out.append(e) return out def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: base = f"{kind}:{s}->{t}#{scope}" if rule_id: base += f"|{rule_id}" # kurze stabile ID (BLAKE2s 12 bytes hex) – qdrant_points macht ohnehin UUIDv5, # diese ID dient der Nachvollziehbarkeit im Payload try: import hashlib return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() except Exception: return base def _structural_edges(note_id: str, chunks: List[Dict]) -> List[Dict]: """belongs_to + prev/next (scope=chunk)""" edges: List[Dict] = [] # belongs_to for ch in chunks: cid = ch.get("chunk_id") or ch.get("id") if not cid: continue e = { "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), "kind": "belongs_to", "scope": "chunk", "source_id": cid, "target_id": note_id, # v2-Felder "src_note_id": note_id, "src_chunk_id": cid, "dst_note_id": note_id, "relation": "belongs_to", "provenance": "rule", "rule_id": "structure:belongs_to:v1", "confidence": 1.0, } edges.append(e) # prev/next ordered = sorted([c for c in chunks if c.get("chunk_id")], key=lambda c: c.get("ord") or c.get("chunk_index") or 0) for a, b in zip(ordered, ordered[1:]): a_id = a.get("chunk_id"); b_id = b.get("chunk_id") if not a_id or not b_id: continue # next e1 = { "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"), "kind": "next", "scope": "chunk", "source_id": a_id, "target_id": b_id, "src_note_id": note_id, "src_chunk_id": a_id, "dst_note_id": note_id, "dst_chunk_id": b_id, "relation": "next", "provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95, } # prev (Gegenkante) e2 = { "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"), "kind": "prev", "scope": "chunk", "source_id": b_id, "target_id": a_id, "src_note_id": note_id, "src_chunk_id": b_id, "dst_note_id": note_id, "dst_chunk_id": a_id, "relation": "prev", "provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95, } edges.extend([e1, e2]) return edges def _explicit_edges_from_chunks(note_id: str, chunks: List[Dict]) -> List[Dict]: edges: List[Dict] = [] for ch in chunks: cid = ch.get("chunk_id") or ch.get("id") window = ch.get("window") or ch.get("text") or "" for link_text, target_id in extract_wikilinks(window): # explizite Referenz (chunk-scope) e = { "edge_id": _mk_edge_id("references", cid, target_id, "chunk"), "kind": "references", "scope": "chunk", "source_id": cid, "target_id": target_id, "note_id": note_id, # v1-Kompatibilität # v2 "src_note_id": note_id, "src_chunk_id": cid, "dst_note_id": target_id, "relation": "references", "provenance": "explicit", "rule_id": "", "confidence": 1.0, "link_text": link_text, } edges.append(e) return edges def _explicit_edges_from_note_level(note_id: str, refs: Iterable[str], include_note_scope_refs: bool) -> List[Dict]: edges: List[Dict] = [] if not include_note_scope_refs: return edges for target_id in refs or []: e = { "edge_id": _mk_edge_id("references", note_id, target_id, "note"), "kind": "references", "scope": "note", "source_id": note_id, "target_id": target_id, # v2 "src_note_id": note_id, "dst_note_id": target_id, "relation": "references", "provenance": "explicit", "rule_id": "", "confidence": 1.0, } edges.append(e) return edges def _apply_type_defaults(note_type: Optional[str], base_edges: List[Dict]) -> List[Dict]: """ Ergänzt pro vorhandener (expliziter) Referenz zusätzliche Kanten gemäß types.yaml.edge_defaults (relationen). Jede Relation wird als eigene Kante erzeugt. """ rels = [r for r in _edge_defaults_for(note_type) if r and r != "references"] if not rels: return [] out: List[Dict] = [] for e in base_edges: if e.get("relation") != "references": continue s_note = e.get("src_note_id") or e.get("note_id") s_chunk = e.get("src_chunk_id") t_note = e.get("dst_note_id") or e.get("target_id") scope = e.get("scope") or "chunk" for rel in rels: rule_id = f"type_default:{(note_type or 'unknown')}:{rel}:v1" k = rel src = e.get("source_id") tgt = e.get("target_id") edge_id = _mk_edge_id(k, src, tgt, scope, rule_id) out.append({ "edge_id": edge_id, "kind": k, "scope": scope, "source_id": src, "target_id": tgt, "note_id": s_note, # v2 "src_note_id": s_note, "src_chunk_id": s_chunk, "dst_note_id": t_note, "relation": k, "provenance": "rule", "rule_id": rule_id, "confidence": 0.7, }) return out def build_edges_for_note( note_id: str, chunk_payloads: List[Dict], note_level_refs: Optional[List[str]] = None, include_note_scope_refs: bool = False, ) -> List[Dict]: """ Liefert alle Kanten zu einer Note: - Struktur: belongs_to, prev/next (scope=chunk, provenance=rule) - Explizite Referenzen aus Chunks (scope=chunk, provenance=explicit) - Explizite Referenzen aus Frontmatter (scope=note, wenn aktiviert) - Type-Default-Regeln (pro expliziter Referenz zusätzliche Kanten, provenance=rule) - Backlinks auf Note-Ebene (pro Referenz eine Rückkante, provenance=rule) """ chunks = list(chunk_payloads or []) note_type = None if chunks: note_type = chunks[0].get("type") or chunks[0].get("note_type") edges: List[Dict] = [] edges.extend(_structural_edges(note_id, chunks)) # Explizite Referenzen ref_chunk_edges = _explicit_edges_from_chunks(note_id, chunks) edges.extend(ref_chunk_edges) ref_note_edges = _explicit_edges_from_note_level(note_id, note_level_refs or [], include_note_scope_refs) edges.extend(ref_note_edges) # Type-Defaults (Regeln) – basierend auf expliziten Referenzen edges.extend(_apply_type_defaults(note_type, ref_chunk_edges + ref_note_edges)) # Backlinks (nur Note-Ebene) – Gegenkanten für 'references' for e in ref_chunk_edges + ref_note_edges: t = e.get("target_id") or e.get("dst_note_id") if not t: continue scope = "note" rule_id = "derived:backlink:v1" back = { "edge_id": _mk_edge_id("backlink", t, note_id, scope, rule_id), "kind": "backlink", "scope": scope, "source_id": t, "target_id": note_id, "note_id": note_id, # v2 "src_note_id": t, "dst_note_id": note_id, "relation": "backlink", "provenance": "rule", "rule_id": rule_id, "confidence": 0.9, "original_relation": e.get("relation"), } edges.append(back) # Final: de-dupe return _dedupe(edges)