diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 82ae58b..5ee9da3 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,131 +1,330 @@ -#!/usr/bin/env python3 +# app/core/derive_edges.py # -*- coding: utf-8 -*- """ -Modul: app/core/derive_edges.py -Version: 1.4.0 -Datum: 2025-10-01 +Edge-Builder v2 (explicit + type-default "rule" edges) +----------------------------------------------------- +- Extrahiert reale Kanten aus Chunks (Wikilinks) und aus Note-Frontmatter (note_level_refs) +- Ergänzt konfigurierbare Ableitungs-Kanten gemäß config/types.yaml.edge_defaults +- Liefert *idempotente* Edge-Payloads ohne Duplikate +- Payload enthält sowohl v1-Felder (kompatibel zu qdrant_points._normalize_edge_payload) + als auch v2-Felder gem. Playbook (src_note_id, dst_note_id, relation, rule_id, provenance, confidence) -Zweck ------ -Robuste Kantenbildung für mindnet (Notes/Chunks): -- belongs_to (chunk -> note) -- next / prev (chunk-Kette) -- references (chunk-scope) aus Chunk.window/text -- optional references/backlink (note-scope) +Konfiguration +- Pfad zu der Registry via ENV: MINDNET_TYPES_FILE (Default: ./config/types.yaml) +- Struktur (Beispiel): + types: + concept: + retriever_weight: 1.0 + chunk_profile: medium + edge_defaults: ["references","related_to"] + journal: + retriever_weight: 0.8 + chunk_profile: long + edge_defaults: ["references"] -Wichtig: Wikilinks werden mit der Parser-Funktion `extract_wikilinks` extrahiert, -damit Varianten wie [[id#anchor]] oder [[id|label]] korrekt auf 'id' reduziert werden. - -Erwartete Chunk-Payload-Felder: - { - "note_id": "...", - "chunk_id": "...", # Alias "id" ist zulässig - "id": "...", - "chunk_index": int, - "seq": int, - "window": str, - "text": str, - "path": "rel/path.md", - ... - } +Siehe auch: +- mindnet_v2_implementation_playbook.md (edge.schema.json, default_edge.schema.json) """ from __future__ import annotations -from typing import Dict, List, Optional, Iterable +import os +import re +import json +from typing import Dict, Iterable, List, Optional, Tuple, Set -# WICHTIG: benutze die Parser-Extraktion für saubere Wikilinks -from app.core.parser import extract_wikilinks +try: + import yaml # type: ignore +except Exception: + yaml = None # pragma: no cover -def _get(d: dict, *keys, default=None): - for k in keys: - if k in d and d[k] is not None: - return d[k] - return default +# ---- Projekt-Utilities ---- +try: + from app.core.parser import extract_wikilinks +except Exception: + # Fallback: Minimaler Wikilink-Parser [[some-id]] oder [[Title|some-id]] + WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:.]+)\]\]") + def extract_wikilinks(text: str) -> List[Tuple[str, str]]: # (link_text, target_id) + links = [] + for m in WIKILINK_RE.finditer(text or ""): + raw = m.group(0) + target = m.group(1) + links.append((raw, target)) + return links -def _chunk_text_for_refs(chunk: dict) -> str: - # bevorzugt 'window' → dann 'text' → 'content' → 'raw' - return ( - _get(chunk, "window") - or _get(chunk, "text") - or _get(chunk, "content") - or _get(chunk, "raw") - or "" - ) +# --------------------------------------------------------------------------- +# Registry-Lader +# --------------------------------------------------------------------------- -def _dedupe(seq: Iterable[str]) -> List[str]: - seen = set() - out: List[str] = [] - for s in seq: - if s not in seen: - seen.add(s) - out.append(s) +def _types_path() -> str: + p = os.getenv("MINDNET_TYPES_FILE") or "./config/types.yaml" + return p + +def _load_types() -> Dict[str, dict]: + path = _types_path() + if not path or not os.path.isfile(path): + return {} + if yaml is None: + return {} + try: + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict): + return data["types"] + return data if isinstance(data, dict) else {} + except Exception: + return {} + +def _edge_defaults_for(note_type: Optional[str]) -> List[str]: + types = _load_types() + t = (note_type or "").strip().lower() + cfg = types.get(t) or types.get("concept") or {} + defaults = cfg.get("edge_defaults") or [] + if isinstance(defaults, str): + defaults = [defaults] + return [str(x) for x in defaults if isinstance(x, (str, int, float))] + +# --------------------------------------------------------------------------- +# Edge-Erzeugung +# --------------------------------------------------------------------------- + +def _dedupe(edges: List[Dict]) -> List[Dict]: + """De-dupliziere anhand (source_id, target_id, relation, rule_id).""" + seen: Set[Tuple[str, str, str, str]] = set() + out: List[Dict] = [] + for e in edges: + s = str(e.get("source_id") or e.get("src_note_id") or "") + t = str(e.get("target_id") or e.get("dst_note_id") or "") + rel = str(e.get("relation") or e.get("kind") or "edge") + rule = str(e.get("rule_id") or "") + key = (s, t, rel, rule) + if key in seen: + continue + seen.add(key) + out.append(e) return out -def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: - pl = { - "kind": kind, - "scope": scope, # "chunk" | "note" - "source_id": source_id, - "target_id": target_id, - "note_id": note_id, # Träger/Quelle der Kante (aktuelle Note) - } - if extra: - pl.update(extra) - return pl +def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: + base = f"{kind}:{s}->{t}#{scope}" + if rule_id: + base += f"|{rule_id}" + # kurze stabile ID (BLAKE2s 12 bytes hex) – qdrant_points macht ohnehin UUIDv5, + # diese ID dient der Nachvollziehbarkeit im Payload + try: + import hashlib + return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() + except Exception: + return base + +def _structural_edges(note_id: str, chunks: List[Dict]) -> List[Dict]: + """belongs_to + prev/next (scope=chunk)""" + edges: List[Dict] = [] + # belongs_to + for ch in chunks: + cid = ch.get("chunk_id") or ch.get("id") + if not cid: + continue + e = { + "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), + "kind": "belongs_to", + "scope": "chunk", + "source_id": cid, + "target_id": note_id, + # v2-Felder + "src_note_id": note_id, + "src_chunk_id": cid, + "dst_note_id": note_id, + "relation": "belongs_to", + "provenance": "rule", + "rule_id": "structure:belongs_to:v1", + "confidence": 1.0, + } + edges.append(e) + + # prev/next + ordered = sorted([c for c in chunks if c.get("chunk_id")], key=lambda c: c.get("ord") or c.get("chunk_index") or 0) + for a, b in zip(ordered, ordered[1:]): + a_id = a.get("chunk_id"); b_id = b.get("chunk_id") + if not a_id or not b_id: + continue + # next + e1 = { + "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"), + "kind": "next", + "scope": "chunk", + "source_id": a_id, + "target_id": b_id, + "src_note_id": note_id, + "src_chunk_id": a_id, + "dst_note_id": note_id, + "dst_chunk_id": b_id, + "relation": "next", + "provenance": "rule", + "rule_id": "structure:order:v1", + "confidence": 0.95, + } + # prev (Gegenkante) + e2 = { + "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"), + "kind": "prev", + "scope": "chunk", + "source_id": b_id, + "target_id": a_id, + "src_note_id": note_id, + "src_chunk_id": b_id, + "dst_note_id": note_id, + "dst_chunk_id": a_id, + "relation": "prev", + "provenance": "rule", + "rule_id": "structure:order:v1", + "confidence": 0.95, + } + edges.extend([e1, e2]) + return edges + +def _explicit_edges_from_chunks(note_id: str, chunks: List[Dict]) -> List[Dict]: + edges: List[Dict] = [] + for ch in chunks: + cid = ch.get("chunk_id") or ch.get("id") + window = ch.get("window") or ch.get("text") or "" + for link_text, target_id in extract_wikilinks(window): + # explizite Referenz (chunk-scope) + e = { + "edge_id": _mk_edge_id("references", cid, target_id, "chunk"), + "kind": "references", + "scope": "chunk", + "source_id": cid, + "target_id": target_id, + "note_id": note_id, # v1-Kompatibilität + # v2 + "src_note_id": note_id, + "src_chunk_id": cid, + "dst_note_id": target_id, + "relation": "references", + "provenance": "explicit", + "rule_id": "", + "confidence": 1.0, + "link_text": link_text, + } + edges.append(e) + return edges + +def _explicit_edges_from_note_level(note_id: str, refs: Iterable[str], include_note_scope_refs: bool) -> List[Dict]: + edges: List[Dict] = [] + if not include_note_scope_refs: + return edges + for target_id in refs or []: + e = { + "edge_id": _mk_edge_id("references", note_id, target_id, "note"), + "kind": "references", + "scope": "note", + "source_id": note_id, + "target_id": target_id, + # v2 + "src_note_id": note_id, + "dst_note_id": target_id, + "relation": "references", + "provenance": "explicit", + "rule_id": "", + "confidence": 1.0, + } + edges.append(e) + return edges + +def _apply_type_defaults(note_type: Optional[str], base_edges: List[Dict]) -> List[Dict]: + """ + Ergänzt pro vorhandener (expliziter) Referenz zusätzliche Kanten gemäß + types.yaml.edge_defaults (relationen). Jede Relation wird als eigene Kante erzeugt. + """ + rels = [r for r in _edge_defaults_for(note_type) if r and r != "references"] + if not rels: + return [] + out: List[Dict] = [] + for e in base_edges: + if e.get("relation") != "references": + continue + s_note = e.get("src_note_id") or e.get("note_id") + s_chunk = e.get("src_chunk_id") + t_note = e.get("dst_note_id") or e.get("target_id") + scope = e.get("scope") or "chunk" + for rel in rels: + rule_id = f"type_default:{(note_type or 'unknown')}:{rel}:v1" + k = rel + src = e.get("source_id") + tgt = e.get("target_id") + edge_id = _mk_edge_id(k, src, tgt, scope, rule_id) + out.append({ + "edge_id": edge_id, + "kind": k, + "scope": scope, + "source_id": src, + "target_id": tgt, + "note_id": s_note, + # v2 + "src_note_id": s_note, + "src_chunk_id": s_chunk, + "dst_note_id": t_note, + "relation": k, + "provenance": "rule", + "rule_id": rule_id, + "confidence": 0.7, + }) + return out def build_edges_for_note( note_id: str, - chunks: List[dict], - note_level_references: Optional[List[str]] = None, + chunk_payloads: List[Dict], + note_level_refs: Optional[List[str]] = None, include_note_scope_refs: bool = False, -) -> List[dict]: +) -> List[Dict]: """ - Erzeugt Kanten für eine Note. - - - belongs_to: für jeden Chunk (chunk -> note) - - next / prev: zwischen aufeinanderfolgenden Chunks - - references: pro Chunk aus window/text - - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references + Liefert alle Kanten zu einer Note: + - Struktur: belongs_to, prev/next (scope=chunk, provenance=rule) + - Explizite Referenzen aus Chunks (scope=chunk, provenance=explicit) + - Explizite Referenzen aus Frontmatter (scope=note, wenn aktiviert) + - Type-Default-Regeln (pro expliziter Referenz zusätzliche Kanten, provenance=rule) + - Backlinks auf Note-Ebene (pro Referenz eine Rückkante, provenance=rule) """ - edges: List[dict] = [] + chunks = list(chunk_payloads or []) + note_type = None + if chunks: + note_type = chunks[0].get("type") or chunks[0].get("note_type") - # belongs_to - for ch in chunks: - cid = _get(ch, "chunk_id", "id") - if not cid: + edges: List[Dict] = [] + edges.extend(_structural_edges(note_id, chunks)) + + # Explizite Referenzen + ref_chunk_edges = _explicit_edges_from_chunks(note_id, chunks) + edges.extend(ref_chunk_edges) + ref_note_edges = _explicit_edges_from_note_level(note_id, note_level_refs or [], include_note_scope_refs) + edges.extend(ref_note_edges) + + # Type-Defaults (Regeln) – basierend auf expliziten Referenzen + edges.extend(_apply_type_defaults(note_type, ref_chunk_edges + ref_note_edges)) + + # Backlinks (nur Note-Ebene) – Gegenkanten für 'references' + for e in ref_chunk_edges + ref_note_edges: + t = e.get("target_id") or e.get("dst_note_id") + if not t: continue - edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {"chunk_id": cid})) + scope = "note" + rule_id = "derived:backlink:v1" + back = { + "edge_id": _mk_edge_id("backlink", t, note_id, scope, rule_id), + "kind": "backlink", + "scope": scope, + "source_id": t, + "target_id": note_id, + "note_id": note_id, + # v2 + "src_note_id": t, + "dst_note_id": note_id, + "relation": "backlink", + "provenance": "rule", + "rule_id": rule_id, + "confidence": 0.9, + "original_relation": e.get("relation"), + } + edges.append(back) - # next/prev - for i in range(len(chunks) - 1): - a, b = chunks[i], chunks[i + 1] - a_id = _get(a, "chunk_id", "id") - b_id = _get(b, "chunk_id", "id") - if not a_id or not b_id: - continue - edges.append(_edge("next", "chunk", a_id, b_id, note_id, {"chunk_id": a_id})) - edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {"chunk_id": b_id})) - - # references (chunk-scope) – Links aus window bevorzugen (Overlap-fest) - refs_all: List[str] = [] - for ch in chunks: - cid = _get(ch, "chunk_id", "id") - if not cid: - continue - txt = _chunk_text_for_refs(ch) - refs = extract_wikilinks(txt) # <— Parser-Logik, kompatibel zu deinem System - for r in refs: - edges.append(_edge("references", "chunk", cid, r, note_id, {"chunk_id": cid, "ref_text": r})) - refs_all.extend(refs) - - # optional: note-scope references/backlinks - if include_note_scope_refs: - refs_note = refs_all[:] - if note_level_references: - refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) - refs_note = _dedupe(refs_note) - for r in refs_note: - edges.append(_edge("references", "note", note_id, r, note_id)) - edges.append(_edge("backlink", "note", r, note_id, note_id)) - - return edges + # Final: de-dupe + return _dedupe(edges)