diff --git a/app/core/derive_edges.py b/app/core/derive_edges.py index 5ee9da3..bf4565b 100644 --- a/app/core/derive_edges.py +++ b/app/core/derive_edges.py @@ -1,330 +1,120 @@ -# app/core/derive_edges.py +#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Edge-Builder v2 (explicit + type-default "rule" edges) ------------------------------------------------------ -- Extrahiert reale Kanten aus Chunks (Wikilinks) und aus Note-Frontmatter (note_level_refs) -- Ergänzt konfigurierbare Ableitungs-Kanten gemäß config/types.yaml.edge_defaults -- Liefert *idempotente* Edge-Payloads ohne Duplikate -- Payload enthält sowohl v1-Felder (kompatibel zu qdrant_points._normalize_edge_payload) - als auch v2-Felder gem. Playbook (src_note_id, dst_note_id, relation, rule_id, provenance, confidence) +Modul: app/core/derive_edges.py +Version: 1.4.0 -Konfiguration -- Pfad zu der Registry via ENV: MINDNET_TYPES_FILE (Default: ./config/types.yaml) -- Struktur (Beispiel): - types: - concept: - retriever_weight: 1.0 - chunk_profile: medium - edge_defaults: ["references","related_to"] - journal: - retriever_weight: 0.8 - chunk_profile: long - edge_defaults: ["references"] +Robuste Kantenbildung für mindnet (Notes/Chunks): +- belongs_to (chunk -> note) +- next / prev (chunk-Kette) +- references (chunk-scope) aus Chunk.window/text +- optional references/backlink (note-scope) -Siehe auch: -- mindnet_v2_implementation_playbook.md (edge.schema.json, default_edge.schema.json) +Wichtig: Wikilinks werden mit der Parser-Funktion `extract_wikilinks` extrahiert, +damit Varianten wie [[id#anchor]] oder [[id|label]] korrekt auf 'id' reduziert werden. """ from __future__ import annotations -import os -import re -import json -from typing import Dict, Iterable, List, Optional, Tuple, Set +from typing import Dict, List, Optional, Iterable -try: - import yaml # type: ignore -except Exception: - yaml = None # pragma: no cover +# Parser-Extraktion für saubere Wikilinks +from app.core.parser import extract_wikilinks -# ---- Projekt-Utilities ---- -try: - from app.core.parser import extract_wikilinks -except Exception: - # Fallback: Minimaler Wikilink-Parser [[some-id]] oder [[Title|some-id]] - WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:.]+)\]\]") - def extract_wikilinks(text: str) -> List[Tuple[str, str]]: # (link_text, target_id) - links = [] - for m in WIKILINK_RE.finditer(text or ""): - raw = m.group(0) - target = m.group(1) - links.append((raw, target)) - return links +def _get(d: dict, *keys, default=None): + for k in keys: + if k in d and d[k] is not None: + return d[k] + return default -# --------------------------------------------------------------------------- -# Registry-Lader -# --------------------------------------------------------------------------- +def _chunk_text_for_refs(chunk: dict) -> str: + # bevorzugt 'window' → dann 'text' → 'content' → 'raw' + return ( + _get(chunk, "window") + or _get(chunk, "text") + or _get(chunk, "content") + or _get(chunk, "raw") + or "" + ) -def _types_path() -> str: - p = os.getenv("MINDNET_TYPES_FILE") or "./config/types.yaml" - return p - -def _load_types() -> Dict[str, dict]: - path = _types_path() - if not path or not os.path.isfile(path): - return {} - if yaml is None: - return {} - try: - with open(path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) or {} - if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict): - return data["types"] - return data if isinstance(data, dict) else {} - except Exception: - return {} - -def _edge_defaults_for(note_type: Optional[str]) -> List[str]: - types = _load_types() - t = (note_type or "").strip().lower() - cfg = types.get(t) or types.get("concept") or {} - defaults = cfg.get("edge_defaults") or [] - if isinstance(defaults, str): - defaults = [defaults] - return [str(x) for x in defaults if isinstance(x, (str, int, float))] - -# --------------------------------------------------------------------------- -# Edge-Erzeugung -# --------------------------------------------------------------------------- - -def _dedupe(edges: List[Dict]) -> List[Dict]: - """De-dupliziere anhand (source_id, target_id, relation, rule_id).""" - seen: Set[Tuple[str, str, str, str]] = set() - out: List[Dict] = [] - for e in edges: - s = str(e.get("source_id") or e.get("src_note_id") or "") - t = str(e.get("target_id") or e.get("dst_note_id") or "") - rel = str(e.get("relation") or e.get("kind") or "edge") - rule = str(e.get("rule_id") or "") - key = (s, t, rel, rule) - if key in seen: - continue - seen.add(key) - out.append(e) +def _dedupe(seq: Iterable[str]) -> List[str]: + seen = set() + out: List[str] = [] + for s in seq: + if s not in seen: + seen.add(s) + out.append(s) return out -def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: - base = f"{kind}:{s}->{t}#{scope}" - if rule_id: - base += f"|{rule_id}" - # kurze stabile ID (BLAKE2s 12 bytes hex) – qdrant_points macht ohnehin UUIDv5, - # diese ID dient der Nachvollziehbarkeit im Payload - try: - import hashlib - return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() - except Exception: - return base - -def _structural_edges(note_id: str, chunks: List[Dict]) -> List[Dict]: - """belongs_to + prev/next (scope=chunk)""" - edges: List[Dict] = [] - # belongs_to - for ch in chunks: - cid = ch.get("chunk_id") or ch.get("id") - if not cid: - continue - e = { - "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), - "kind": "belongs_to", - "scope": "chunk", - "source_id": cid, - "target_id": note_id, - # v2-Felder - "src_note_id": note_id, - "src_chunk_id": cid, - "dst_note_id": note_id, - "relation": "belongs_to", - "provenance": "rule", - "rule_id": "structure:belongs_to:v1", - "confidence": 1.0, - } - edges.append(e) - - # prev/next - ordered = sorted([c for c in chunks if c.get("chunk_id")], key=lambda c: c.get("ord") or c.get("chunk_index") or 0) - for a, b in zip(ordered, ordered[1:]): - a_id = a.get("chunk_id"); b_id = b.get("chunk_id") - if not a_id or not b_id: - continue - # next - e1 = { - "edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"), - "kind": "next", - "scope": "chunk", - "source_id": a_id, - "target_id": b_id, - "src_note_id": note_id, - "src_chunk_id": a_id, - "dst_note_id": note_id, - "dst_chunk_id": b_id, - "relation": "next", - "provenance": "rule", - "rule_id": "structure:order:v1", - "confidence": 0.95, - } - # prev (Gegenkante) - e2 = { - "edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"), - "kind": "prev", - "scope": "chunk", - "source_id": b_id, - "target_id": a_id, - "src_note_id": note_id, - "src_chunk_id": b_id, - "dst_note_id": note_id, - "dst_chunk_id": a_id, - "relation": "prev", - "provenance": "rule", - "rule_id": "structure:order:v1", - "confidence": 0.95, - } - edges.extend([e1, e2]) - return edges - -def _explicit_edges_from_chunks(note_id: str, chunks: List[Dict]) -> List[Dict]: - edges: List[Dict] = [] - for ch in chunks: - cid = ch.get("chunk_id") or ch.get("id") - window = ch.get("window") or ch.get("text") or "" - for link_text, target_id in extract_wikilinks(window): - # explizite Referenz (chunk-scope) - e = { - "edge_id": _mk_edge_id("references", cid, target_id, "chunk"), - "kind": "references", - "scope": "chunk", - "source_id": cid, - "target_id": target_id, - "note_id": note_id, # v1-Kompatibilität - # v2 - "src_note_id": note_id, - "src_chunk_id": cid, - "dst_note_id": target_id, - "relation": "references", - "provenance": "explicit", - "rule_id": "", - "confidence": 1.0, - "link_text": link_text, - } - edges.append(e) - return edges - -def _explicit_edges_from_note_level(note_id: str, refs: Iterable[str], include_note_scope_refs: bool) -> List[Dict]: - edges: List[Dict] = [] - if not include_note_scope_refs: - return edges - for target_id in refs or []: - e = { - "edge_id": _mk_edge_id("references", note_id, target_id, "note"), - "kind": "references", - "scope": "note", - "source_id": note_id, - "target_id": target_id, - # v2 - "src_note_id": note_id, - "dst_note_id": target_id, - "relation": "references", - "provenance": "explicit", - "rule_id": "", - "confidence": 1.0, - } - edges.append(e) - return edges - -def _apply_type_defaults(note_type: Optional[str], base_edges: List[Dict]) -> List[Dict]: - """ - Ergänzt pro vorhandener (expliziter) Referenz zusätzliche Kanten gemäß - types.yaml.edge_defaults (relationen). Jede Relation wird als eigene Kante erzeugt. - """ - rels = [r for r in _edge_defaults_for(note_type) if r and r != "references"] - if not rels: - return [] - out: List[Dict] = [] - for e in base_edges: - if e.get("relation") != "references": - continue - s_note = e.get("src_note_id") or e.get("note_id") - s_chunk = e.get("src_chunk_id") - t_note = e.get("dst_note_id") or e.get("target_id") - scope = e.get("scope") or "chunk" - for rel in rels: - rule_id = f"type_default:{(note_type or 'unknown')}:{rel}:v1" - k = rel - src = e.get("source_id") - tgt = e.get("target_id") - edge_id = _mk_edge_id(k, src, tgt, scope, rule_id) - out.append({ - "edge_id": edge_id, - "kind": k, - "scope": scope, - "source_id": src, - "target_id": tgt, - "note_id": s_note, - # v2 - "src_note_id": s_note, - "src_chunk_id": s_chunk, - "dst_note_id": t_note, - "relation": k, - "provenance": "rule", - "rule_id": rule_id, - "confidence": 0.7, - }) - return out +def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: + pl = { + "kind": kind, + "scope": scope, # "chunk" | "note" + "source_id": source_id, + "target_id": target_id, + "note_id": note_id, # Träger/Quelle der Kante (aktuelle Note) + } + if extra: + pl.update(extra) + return pl def build_edges_for_note( note_id: str, - chunk_payloads: List[Dict], - note_level_refs: Optional[List[str]] = None, + chunks: List[dict], + note_level_references: Optional[List[str]] = None, include_note_scope_refs: bool = False, -) -> List[Dict]: +) -> List[dict]: """ - Liefert alle Kanten zu einer Note: - - Struktur: belongs_to, prev/next (scope=chunk, provenance=rule) - - Explizite Referenzen aus Chunks (scope=chunk, provenance=explicit) - - Explizite Referenzen aus Frontmatter (scope=note, wenn aktiviert) - - Type-Default-Regeln (pro expliziter Referenz zusätzliche Kanten, provenance=rule) - - Backlinks auf Note-Ebene (pro Referenz eine Rückkante, provenance=rule) + Erzeugt Kanten für eine Note. + + - belongs_to: für jeden Chunk (chunk -> note) + - next / prev: zwischen aufeinanderfolgenden Chunks + - references: pro Chunk aus window/text + - optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references """ - chunks = list(chunk_payloads or []) - note_type = None - if chunks: - note_type = chunks[0].get("type") or chunks[0].get("note_type") + edges: List[dict] = [] - edges: List[Dict] = [] - edges.extend(_structural_edges(note_id, chunks)) - - # Explizite Referenzen - ref_chunk_edges = _explicit_edges_from_chunks(note_id, chunks) - edges.extend(ref_chunk_edges) - ref_note_edges = _explicit_edges_from_note_level(note_id, note_level_refs or [], include_note_scope_refs) - edges.extend(ref_note_edges) - - # Type-Defaults (Regeln) – basierend auf expliziten Referenzen - edges.extend(_apply_type_defaults(note_type, ref_chunk_edges + ref_note_edges)) - - # Backlinks (nur Note-Ebene) – Gegenkanten für 'references' - for e in ref_chunk_edges + ref_note_edges: - t = e.get("target_id") or e.get("dst_note_id") - if not t: + # belongs_to + for ch in chunks: + cid = _get(ch, "chunk_id", "id") + if not cid: continue - scope = "note" - rule_id = "derived:backlink:v1" - back = { - "edge_id": _mk_edge_id("backlink", t, note_id, scope, rule_id), - "kind": "backlink", - "scope": scope, - "source_id": t, - "target_id": note_id, - "note_id": note_id, - # v2 - "src_note_id": t, - "dst_note_id": note_id, - "relation": "backlink", - "provenance": "rule", - "rule_id": rule_id, - "confidence": 0.9, - "original_relation": e.get("relation"), - } - edges.append(back) + edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {"chunk_id": cid})) - # Final: de-dupe - return _dedupe(edges) + # next/prev + for i in range(len(chunks) - 1): + a, b = chunks[i], chunks[i + 1] + a_id = _get(a, "chunk_id", "id") + b_id = _get(b, "chunk_id", "id") + if not a_id or not b_id: + continue + edges.append(_edge("next", "chunk", a_id, b_id, note_id, {"chunk_id": a_id})) + edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {"chunk_id": b_id})) + + # references (chunk-scope) – Links aus window bevorzugen (Overlap-fest) + refs_all: List[str] = [] + for ch in chunks: + cid = _get(ch, "chunk_id", "id") + if not cid: + continue + txt = _chunk_text_for_refs(ch) + refs = extract_wikilinks(txt) # Parser-Logik + for r in refs: + edges.append(_edge("references", "chunk", cid, r, note_id, {"chunk_id": cid, "ref_text": r})) + refs_all.extend(refs) + + # optional: note-scope references/backlinks + if include_note_scope_refs: + refs_note = refs_all[:] + if note_level_references: + refs_note.extend([r for r in note_level_references if isinstance(r, str) and r]) + refs_note = _dedupe(refs_note) + for r in refs_note: + edges.append(_edge("references", "note", note_id, r, note_id)) + edges.append(_edge("backlink", "note", r, note_id, note_id)) + + # Dedupe (Schlüssel: kind,source_id,target_id,scope) + dedup = {} + for e in edges: + k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", "")) + dedup[k] = e + return list(dedup.values())