Dateien nach "app/core" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s

This commit is contained in:
Lars 2025-11-11 16:30:07 +01:00
parent c67777a6b9
commit c01cf3b078

View File

@ -1,131 +1,330 @@
#!/usr/bin/env python3
# app/core/derive_edges.py
# -*- coding: utf-8 -*-
"""
Modul: app/core/derive_edges.py
Version: 1.4.0
Datum: 2025-10-01
Edge-Builder v2 (explicit + type-default "rule" edges)
-----------------------------------------------------
- Extrahiert reale Kanten aus Chunks (Wikilinks) und aus Note-Frontmatter (note_level_refs)
- Ergänzt konfigurierbare Ableitungs-Kanten gemäß config/types.yaml.edge_defaults
- Liefert *idempotente* Edge-Payloads ohne Duplikate
- Payload enthält sowohl v1-Felder (kompatibel zu qdrant_points._normalize_edge_payload)
als auch v2-Felder gem. Playbook (src_note_id, dst_note_id, relation, rule_id, provenance, confidence)
Zweck
-----
Robuste Kantenbildung für mindnet (Notes/Chunks):
- belongs_to (chunk -> note)
- next / prev (chunk-Kette)
- references (chunk-scope) aus Chunk.window/text
- optional references/backlink (note-scope)
Konfiguration
- Pfad zu der Registry via ENV: MINDNET_TYPES_FILE (Default: ./config/types.yaml)
- Struktur (Beispiel):
types:
concept:
retriever_weight: 1.0
chunk_profile: medium
edge_defaults: ["references","related_to"]
journal:
retriever_weight: 0.8
chunk_profile: long
edge_defaults: ["references"]
Wichtig: Wikilinks werden mit der Parser-Funktion `extract_wikilinks` extrahiert,
damit Varianten wie [[id#anchor]] oder [[id|label]] korrekt auf 'id' reduziert werden.
Erwartete Chunk-Payload-Felder:
{
"note_id": "...",
"chunk_id": "...", # Alias "id" ist zulässig
"id": "...",
"chunk_index": int,
"seq": int,
"window": str,
"text": str,
"path": "rel/path.md",
...
}
Siehe auch:
- mindnet_v2_implementation_playbook.md (edge.schema.json, default_edge.schema.json)
"""
from __future__ import annotations
from typing import Dict, List, Optional, Iterable
import os
import re
import json
from typing import Dict, Iterable, List, Optional, Tuple, Set
# WICHTIG: benutze die Parser-Extraktion für saubere Wikilinks
from app.core.parser import extract_wikilinks
try:
import yaml # type: ignore
except Exception:
yaml = None # pragma: no cover
def _get(d: dict, *keys, default=None):
for k in keys:
if k in d and d[k] is not None:
return d[k]
return default
# ---- Projekt-Utilities ----
try:
from app.core.parser import extract_wikilinks
except Exception:
# Fallback: Minimaler Wikilink-Parser [[some-id]] oder [[Title|some-id]]
WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:.]+)\]\]")
def extract_wikilinks(text: str) -> List[Tuple[str, str]]: # (link_text, target_id)
links = []
for m in WIKILINK_RE.finditer(text or ""):
raw = m.group(0)
target = m.group(1)
links.append((raw, target))
return links
def _chunk_text_for_refs(chunk: dict) -> str:
# bevorzugt 'window' → dann 'text' → 'content' → 'raw'
return (
_get(chunk, "window")
or _get(chunk, "text")
or _get(chunk, "content")
or _get(chunk, "raw")
or ""
)
# ---------------------------------------------------------------------------
# Registry-Lader
# ---------------------------------------------------------------------------
def _dedupe(seq: Iterable[str]) -> List[str]:
seen = set()
out: List[str] = []
for s in seq:
if s not in seen:
seen.add(s)
out.append(s)
def _types_path() -> str:
p = os.getenv("MINDNET_TYPES_FILE") or "./config/types.yaml"
return p
def _load_types() -> Dict[str, dict]:
path = _types_path()
if not path or not os.path.isfile(path):
return {}
if yaml is None:
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict):
return data["types"]
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _edge_defaults_for(note_type: Optional[str]) -> List[str]:
types = _load_types()
t = (note_type or "").strip().lower()
cfg = types.get(t) or types.get("concept") or {}
defaults = cfg.get("edge_defaults") or []
if isinstance(defaults, str):
defaults = [defaults]
return [str(x) for x in defaults if isinstance(x, (str, int, float))]
# ---------------------------------------------------------------------------
# Edge-Erzeugung
# ---------------------------------------------------------------------------
def _dedupe(edges: List[Dict]) -> List[Dict]:
"""De-dupliziere anhand (source_id, target_id, relation, rule_id)."""
seen: Set[Tuple[str, str, str, str]] = set()
out: List[Dict] = []
for e in edges:
s = str(e.get("source_id") or e.get("src_note_id") or "")
t = str(e.get("target_id") or e.get("dst_note_id") or "")
rel = str(e.get("relation") or e.get("kind") or "edge")
rule = str(e.get("rule_id") or "")
key = (s, t, rel, rule)
if key in seen:
continue
seen.add(key)
out.append(e)
return out
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
pl = {
"kind": kind,
"scope": scope, # "chunk" | "note"
"source_id": source_id,
"target_id": target_id,
"note_id": note_id, # Träger/Quelle der Kante (aktuelle Note)
def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str:
base = f"{kind}:{s}->{t}#{scope}"
if rule_id:
base += f"|{rule_id}"
# kurze stabile ID (BLAKE2s 12 bytes hex) qdrant_points macht ohnehin UUIDv5,
# diese ID dient der Nachvollziehbarkeit im Payload
try:
import hashlib
return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest()
except Exception:
return base
def _structural_edges(note_id: str, chunks: List[Dict]) -> List[Dict]:
"""belongs_to + prev/next (scope=chunk)"""
edges: List[Dict] = []
# belongs_to
for ch in chunks:
cid = ch.get("chunk_id") or ch.get("id")
if not cid:
continue
e = {
"edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"),
"kind": "belongs_to",
"scope": "chunk",
"source_id": cid,
"target_id": note_id,
# v2-Felder
"src_note_id": note_id,
"src_chunk_id": cid,
"dst_note_id": note_id,
"relation": "belongs_to",
"provenance": "rule",
"rule_id": "structure:belongs_to:v1",
"confidence": 1.0,
}
if extra:
pl.update(extra)
return pl
edges.append(e)
# prev/next
ordered = sorted([c for c in chunks if c.get("chunk_id")], key=lambda c: c.get("ord") or c.get("chunk_index") or 0)
for a, b in zip(ordered, ordered[1:]):
a_id = a.get("chunk_id"); b_id = b.get("chunk_id")
if not a_id or not b_id:
continue
# next
e1 = {
"edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"),
"kind": "next",
"scope": "chunk",
"source_id": a_id,
"target_id": b_id,
"src_note_id": note_id,
"src_chunk_id": a_id,
"dst_note_id": note_id,
"dst_chunk_id": b_id,
"relation": "next",
"provenance": "rule",
"rule_id": "structure:order:v1",
"confidence": 0.95,
}
# prev (Gegenkante)
e2 = {
"edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"),
"kind": "prev",
"scope": "chunk",
"source_id": b_id,
"target_id": a_id,
"src_note_id": note_id,
"src_chunk_id": b_id,
"dst_note_id": note_id,
"dst_chunk_id": a_id,
"relation": "prev",
"provenance": "rule",
"rule_id": "structure:order:v1",
"confidence": 0.95,
}
edges.extend([e1, e2])
return edges
def _explicit_edges_from_chunks(note_id: str, chunks: List[Dict]) -> List[Dict]:
edges: List[Dict] = []
for ch in chunks:
cid = ch.get("chunk_id") or ch.get("id")
window = ch.get("window") or ch.get("text") or ""
for link_text, target_id in extract_wikilinks(window):
# explizite Referenz (chunk-scope)
e = {
"edge_id": _mk_edge_id("references", cid, target_id, "chunk"),
"kind": "references",
"scope": "chunk",
"source_id": cid,
"target_id": target_id,
"note_id": note_id, # v1-Kompatibilität
# v2
"src_note_id": note_id,
"src_chunk_id": cid,
"dst_note_id": target_id,
"relation": "references",
"provenance": "explicit",
"rule_id": "",
"confidence": 1.0,
"link_text": link_text,
}
edges.append(e)
return edges
def _explicit_edges_from_note_level(note_id: str, refs: Iterable[str], include_note_scope_refs: bool) -> List[Dict]:
edges: List[Dict] = []
if not include_note_scope_refs:
return edges
for target_id in refs or []:
e = {
"edge_id": _mk_edge_id("references", note_id, target_id, "note"),
"kind": "references",
"scope": "note",
"source_id": note_id,
"target_id": target_id,
# v2
"src_note_id": note_id,
"dst_note_id": target_id,
"relation": "references",
"provenance": "explicit",
"rule_id": "",
"confidence": 1.0,
}
edges.append(e)
return edges
def _apply_type_defaults(note_type: Optional[str], base_edges: List[Dict]) -> List[Dict]:
"""
Ergänzt pro vorhandener (expliziter) Referenz zusätzliche Kanten gemäß
types.yaml.edge_defaults (relationen). Jede Relation wird als eigene Kante erzeugt.
"""
rels = [r for r in _edge_defaults_for(note_type) if r and r != "references"]
if not rels:
return []
out: List[Dict] = []
for e in base_edges:
if e.get("relation") != "references":
continue
s_note = e.get("src_note_id") or e.get("note_id")
s_chunk = e.get("src_chunk_id")
t_note = e.get("dst_note_id") or e.get("target_id")
scope = e.get("scope") or "chunk"
for rel in rels:
rule_id = f"type_default:{(note_type or 'unknown')}:{rel}:v1"
k = rel
src = e.get("source_id")
tgt = e.get("target_id")
edge_id = _mk_edge_id(k, src, tgt, scope, rule_id)
out.append({
"edge_id": edge_id,
"kind": k,
"scope": scope,
"source_id": src,
"target_id": tgt,
"note_id": s_note,
# v2
"src_note_id": s_note,
"src_chunk_id": s_chunk,
"dst_note_id": t_note,
"relation": k,
"provenance": "rule",
"rule_id": rule_id,
"confidence": 0.7,
})
return out
def build_edges_for_note(
note_id: str,
chunks: List[dict],
note_level_references: Optional[List[str]] = None,
chunk_payloads: List[Dict],
note_level_refs: Optional[List[str]] = None,
include_note_scope_refs: bool = False,
) -> List[dict]:
) -> List[Dict]:
"""
Erzeugt Kanten für eine Note.
- belongs_to: für jeden Chunk (chunk -> note)
- next / prev: zwischen aufeinanderfolgenden Chunks
- references: pro Chunk aus window/text
- optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references
Liefert alle Kanten zu einer Note:
- Struktur: belongs_to, prev/next (scope=chunk, provenance=rule)
- Explizite Referenzen aus Chunks (scope=chunk, provenance=explicit)
- Explizite Referenzen aus Frontmatter (scope=note, wenn aktiviert)
- Type-Default-Regeln (pro expliziter Referenz zusätzliche Kanten, provenance=rule)
- Backlinks auf Note-Ebene (pro Referenz eine Rückkante, provenance=rule)
"""
edges: List[dict] = []
chunks = list(chunk_payloads or [])
note_type = None
if chunks:
note_type = chunks[0].get("type") or chunks[0].get("note_type")
# belongs_to
for ch in chunks:
cid = _get(ch, "chunk_id", "id")
if not cid:
edges: List[Dict] = []
edges.extend(_structural_edges(note_id, chunks))
# Explizite Referenzen
ref_chunk_edges = _explicit_edges_from_chunks(note_id, chunks)
edges.extend(ref_chunk_edges)
ref_note_edges = _explicit_edges_from_note_level(note_id, note_level_refs or [], include_note_scope_refs)
edges.extend(ref_note_edges)
# Type-Defaults (Regeln) basierend auf expliziten Referenzen
edges.extend(_apply_type_defaults(note_type, ref_chunk_edges + ref_note_edges))
# Backlinks (nur Note-Ebene) Gegenkanten für 'references'
for e in ref_chunk_edges + ref_note_edges:
t = e.get("target_id") or e.get("dst_note_id")
if not t:
continue
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {"chunk_id": cid}))
scope = "note"
rule_id = "derived:backlink:v1"
back = {
"edge_id": _mk_edge_id("backlink", t, note_id, scope, rule_id),
"kind": "backlink",
"scope": scope,
"source_id": t,
"target_id": note_id,
"note_id": note_id,
# v2
"src_note_id": t,
"dst_note_id": note_id,
"relation": "backlink",
"provenance": "rule",
"rule_id": rule_id,
"confidence": 0.9,
"original_relation": e.get("relation"),
}
edges.append(back)
# next/prev
for i in range(len(chunks) - 1):
a, b = chunks[i], chunks[i + 1]
a_id = _get(a, "chunk_id", "id")
b_id = _get(b, "chunk_id", "id")
if not a_id or not b_id:
continue
edges.append(_edge("next", "chunk", a_id, b_id, note_id, {"chunk_id": a_id}))
edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {"chunk_id": b_id}))
# references (chunk-scope) Links aus window bevorzugen (Overlap-fest)
refs_all: List[str] = []
for ch in chunks:
cid = _get(ch, "chunk_id", "id")
if not cid:
continue
txt = _chunk_text_for_refs(ch)
refs = extract_wikilinks(txt) # <— Parser-Logik, kompatibel zu deinem System
for r in refs:
edges.append(_edge("references", "chunk", cid, r, note_id, {"chunk_id": cid, "ref_text": r}))
refs_all.extend(refs)
# optional: note-scope references/backlinks
if include_note_scope_refs:
refs_note = refs_all[:]
if note_level_references:
refs_note.extend([r for r in note_level_references if isinstance(r, str) and r])
refs_note = _dedupe(refs_note)
for r in refs_note:
edges.append(_edge("references", "note", note_id, r, note_id))
edges.append(_edge("backlink", "note", r, note_id, note_id))
return edges
# Final: de-dupe
return _dedupe(edges)