""" FILE: app/core/derive_edges.py DESCRIPTION: Extrahiert Graph-Kanten aus Text. Unterstützt Wikilinks, Inline-Relations ([[rel:type|target]]) und Obsidian Callouts. WP-15b: Integration des Candidate-Pools und Provenance-Priorisierung. Sichert die Graph-Integrität durch confidence-basiertes De-Duplicating. VERSION: 2.1.0 STATUS: Active DEPENDENCIES: re, os, yaml, typing, hashlib EXTERNAL_CONFIG: config/types.yaml LAST_ANALYSIS: 2025-12-26 """ from __future__ import annotations import os import re import hashlib from typing import Iterable, List, Optional, Tuple, Set, Dict try: import yaml # optional, nur für types.yaml except Exception: # pragma: no cover yaml = None # --------------------------------------------------------------------------- # # 1. Utilities & ID Generation # --------------------------------------------------------------------------- # def _get(d: dict, *keys, default=None): """Sicherer Zugriff auf verschachtelte Dictionary-Keys.""" for k in keys: if isinstance(d, dict) and k in d and d[k] is not None: return d[k] return default def _chunk_text_for_refs(chunk: dict) -> str: """Extrahiert den relevanten Text für die Referenzsuche (bevorzugt Window).""" return ( _get(chunk, "window") or _get(chunk, "text") or _get(chunk, "content") or _get(chunk, "raw") or "" ) def _dedupe_seq(seq: Iterable[str]) -> List[str]: """Dedupliziert eine Sequenz von Strings unter Beibehaltung der Reihenfolge.""" seen: Set[str] = set() out: List[str] = [] for s in seq: if s not in seen: seen.add(s) out.append(s) return out def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: """Konstruiert ein valides Kanten-Payload-Objekt für Qdrant.""" pl = { "kind": kind, "relation": kind, # Alias für Abwärtskompatibilität (v2) "scope": scope, # "chunk" | "note" "source_id": source_id, "target_id": target_id, "note_id": note_id, # Träger-Note der Kante } if extra: pl.update(extra) return pl def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: """Erzeugt eine deterministische 12-Byte ID mittels BLAKE2s.""" base = f"{kind}:{s}->{t}#{scope}" if rule_id: base += f"|{rule_id}" try: return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() except Exception: # pragma: no cover return base # --------------------------------------------------------------------------- # # 2. Konfiguration & Provenance-Skala # --------------------------------------------------------------------------- # # WP-15b: Prioritäten-Ranking für die De-Duplizierung PROVENANCE_PRIORITY = { "explicit:wikilink": 1.00, "inline:rel": 0.95, "callout:edge": 0.90, "semantic_ai": 0.90, # Validierte KI-Kanten "structure:belongs_to": 1.00, "structure:order": 0.95, # next/prev "explicit:note_scope": 1.00, "derived:backlink": 0.90, "edge_defaults": 0.70 # Heuristik (types.yaml) } def _env(n: str, default: Optional[str] = None) -> str: v = os.getenv(n) return v if v is not None else (default or "") def _load_types_registry() -> dict: """Lädt die YAML-Registry zur Ermittlung von Standard-Kanten.""" p = _env("MINDNET_TYPES_FILE", "./config/types.yaml") if not os.path.isfile(p) or yaml is None: return {} try: with open(p, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} return data except Exception: return {} def _get_types_map(reg: dict) -> dict: if isinstance(reg, dict) and isinstance(reg.get("types"), dict): return reg["types"] return reg if isinstance(reg, dict) else {} def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: """Liefert die edge_defaults-Liste für den gegebenen Notiztyp.""" types_map = _get_types_map(reg) if note_type and isinstance(types_map, dict): t = types_map.get(note_type) if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] for key in ("defaults", "default", "global"): v = reg.get(key) if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] return [] # --------------------------------------------------------------------------- # # 3. Parser für Links / Relationen (Core Logik v2.0.0) # --------------------------------------------------------------------------- # # Normale Wikilinks (Fallback) _WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") # Getypte Inline-Relationen _REL_PIPE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s*\|\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) _REL_SPACE = re.compile(r"\[\[\s*rel:(?P[a-z_]+)\s+(?P[^\]]+?)\s*\]\]", re.IGNORECASE) _REL_TEXT = re.compile(r"rel\s*:\s*(?P[a-z_]+)\s*\[\[\s*(?P[^\]]+?)\s*\]\]", re.IGNORECASE) def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: """Extrahiert [[rel:KIND|Target]] und entfernt sie zur Vermeidung von Dubletten.""" pairs: List[Tuple[str,str]] = [] def _collect(m): k = (m.group("kind") or "").strip().lower() t = (m.group("target") or "").strip() if k and t: pairs.append((k, t)) return "" # Link entfernen text = _REL_PIPE.sub(_collect, text) text = _REL_SPACE.sub(_collect, text) text = _REL_TEXT.sub(_collect, text) return pairs, text # Obsidian Callout Parser für mehrzeilige Blöcke _CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE) _REL_LINE = re.compile(r"^(?P[a-z_]+)\s*:\s*(?P.+?)\s*$", re.IGNORECASE) _WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]") def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]: """Verarbeitet [!edge]-Callouts und entfernt diese aus dem Textfluss.""" if not text: return [], text lines = text.splitlines() out_pairs: List[Tuple[str,str]] = [] keep_lines: List[str] = [] i = 0 while i < len(lines): m = _CALLOUT_START.match(lines[i]) if not m: keep_lines.append(lines[i]) i += 1 continue block_lines: List[str] = [] first_rest = m.group(1) or "" if first_rest.strip(): block_lines.append(first_rest) i += 1 while i < len(lines) and lines[i].lstrip().startswith('>'): block_lines.append(lines[i].lstrip()[1:].lstrip()) i += 1 for bl in block_lines: mrel = _REL_LINE.match(bl) if not mrel: continue kind = (mrel.group("kind") or "").strip().lower() targets = mrel.group("targets") or "" found = _WIKILINKS_IN_LINE.findall(targets) if found: for t in found: t = t.strip() if t: out_pairs.append((kind, t)) else: for raw in re.split(r"[,;]", targets): t = raw.strip() if t: out_pairs.append((kind, t)) continue remainder = "\n".join(keep_lines) return out_pairs, remainder def _extract_wikilinks(text: str) -> List[str]: """Extrahiert Standard-Wikilinks aus dem verbleibenden Text.""" ids: List[str] = [] for m in _WIKILINK_RE.finditer(text or ""): ids.append(m.group(1).strip()) return ids # --------------------------------------------------------------------------- # # 4. Hauptfunktion (build_edges_for_note) # --------------------------------------------------------------------------- # def build_edges_for_note( note_id: str, chunks: List[dict], note_level_references: Optional[List[str]] = None, include_note_scope_refs: bool = False, ) -> List[dict]: """ Erzeugt und aggregiert alle Kanten für eine Note inklusive WP-15b Candidate-Processing. Setzt Provenance-Ranking zur Graph-Stabilisierung ein. """ edges: List[dict] = [] note_type = _get(chunks[0], "type") if chunks else "concept" # 1) Struktur-Kanten: belongs_to (Chunk -> Note) for ch in chunks: cid = _get(ch, "chunk_id", "id") if not cid: continue edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), "provenance": "structure", "rule_id": "structure:belongs_to", "confidence": PROVENANCE_PRIORITY["structure:belongs_to"], })) # 2) Struktur-Kanten: next / prev (Sequenz) for i in range(len(chunks) - 1): a, b = chunks[i], chunks[i + 1] a_id = _get(a, "chunk_id", "id") b_id = _get(b, "chunk_id", "id") if not a_id or not b_id: continue edges.append(_edge("next", "chunk", a_id, b_id, note_id, { "chunk_id": a_id, "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"), "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"], })) edges.append(_edge("prev", "chunk", b_id, a_id, note_id, { "chunk_id": b_id, "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"), "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"], })) # 3) Inhaltliche Kanten (Refs, Inlines, Callouts, Candidates) reg = _load_types_registry() defaults = _edge_defaults_for(note_type, reg) refs_all: List[str] = [] for ch in chunks: cid = _get(ch, "chunk_id", "id") if not cid: continue raw = _chunk_text_for_refs(ch) # 3a) Typed Inline Relations typed, remainder = _extract_typed_relations(raw) for kind, target in typed: k = kind.strip().lower() if not k or not target: continue edges.append(_edge(k, "chunk", cid, target, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, target, "chunk", "inline:rel"), "provenance": "explicit", "rule_id": "inline:rel", "confidence": PROVENANCE_PRIORITY["inline:rel"], })) # 3b) WP-15b Candidate Pool Integration (KI-validierte Kanten) # Verarbeitet Kanten, die bereits in der Ingestion semantisch geprüft wurden. pool = ch.get("candidate_pool") or ch.get("candidate_edges") or [] for cand in pool: target = cand.get("to") kind = cand.get("kind", "related_to") prov = cand.get("provenance", "semantic_ai") if not target: continue edges.append(_edge(kind, "chunk", cid, target, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(kind, cid, target, "chunk", f"candidate:{prov}"), "provenance": prov, "rule_id": f"candidate:{prov}", "confidence": PROVENANCE_PRIORITY.get(prov, 0.90), })) # 3c) Obsidian Callouts call_pairs, remainder2 = _extract_callout_relations(remainder) for kind, target in call_pairs: k = (kind or "").strip().lower() if not k or not target: continue edges.append(_edge(k, "chunk", cid, target, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"), "provenance": "explicit", "rule_id": "callout:edge", "confidence": PROVENANCE_PRIORITY["callout:edge"], })) # 3d) Standard-Wikilinks -> references (+ defaults) refs = _extract_wikilinks(remainder2) for r in refs: edges.append(_edge("references", "chunk", cid, r, note_id, { "chunk_id": cid, "ref_text": r, "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), "provenance": "explicit", "rule_id": "explicit:wikilink", "confidence": PROVENANCE_PRIORITY["explicit:wikilink"], })) # Regelbasierte Kanten aus types.yaml anhängen for rel in defaults: if rel == "references": continue edges.append(_edge(rel, "chunk", cid, r, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": PROVENANCE_PRIORITY["edge_defaults"], })) refs_all.extend(refs) # 4) Optionale Note-Scope Referenzen & Backlinks if include_note_scope_refs: refs_note = list(refs_all or []) if note_level_references: refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) refs_note = _dedupe_seq(refs_note) for r in refs_note: edges.append(_edge("references", "note", note_id, r, note_id, { "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), "provenance": "explicit", "rule_id": "explicit:note_scope", "confidence": PROVENANCE_PRIORITY["explicit:note_scope"], })) # Backlink-Erzeugung zur Graphen-Stärkung edges.append(_edge("backlink", "note", r, note_id, note_id, { "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), "provenance": "rule", "rule_id": "derived:backlink", "confidence": PROVENANCE_PRIORITY["derived:backlink"], })) for rel in defaults: if rel == "references": continue edges.append(_edge(rel, "note", note_id, r, note_id, { "edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": PROVENANCE_PRIORITY["edge_defaults"], })) # 5) WP-15b: Confidence-basierte De-Duplizierung # Wenn dieselbe Relation mehrfach existiert, gewinnt die mit der höchsten Confidence. unique_map: Dict[Tuple[str, str, str], dict] = {} for e in edges: s, t = str(e.get("source_id")), str(e.get("target_id")) rel = str(e.get("relation") or e.get("kind") or "edge") key = (s, t, rel) if key not in unique_map: unique_map[key] = e else: # Vergleich der Vertrauenswürdigkeit (Provenance Ranking) if e.get("confidence", 0) > unique_map[key].get("confidence", 0): unique_map[key] = e return list(unique_map.values())