mindnet/app/core/edges.py
Lars 4ea62e6886
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
Dateien nach "app/core" hochladen
2025-11-11 16:45:35 +01:00

297 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: app/core/edges.py
Version: 2.0.0 (V2superset, rückwärtskompatibel zu v1 vom 20250909)
Zweck
-----
Bewahrt die bestehende Edgelogik (belongs_to, prev/next, references, backlink)
und ergänzt V2Felder + TypDefaultKanten gemäß config/types.yaml (edge_defaults).
Die Funktion ist **idempotent** und **rückwärtskompatibel** zur bisherigen Signatur.
Kompatibilitätsgarantien (gegenüber v1):
- **Input**: akzeptiert identische ChunkPayloads wie v1:
* `id` (ChunkID), `note_id` (Owner), `neighbors.prev|next` (optional),
`references: [{target_id: ...}]` (optional),
alternativ: `chunk_id`, `chunk_index|ord`, `window|text`
- **Output (v1Felder)**: `kind`, `source_id`, `target_id`, `scope`, `note_id`, `edge_id`
- **Neu (v2Felder)**: `relation`, `src_note_id`, `src_chunk_id?`, `dst_note_id`, `dst_chunk_id?`,
`provenance` (`explicit|rule`), `rule_id?`, `confidence?`
Regeln
------
- Deduplizierungsschlüssel: (source_id, target_id, relation, rule_id)
- Strukturkanten:
* belongs_to: 1× pro Chunk
* next/prev: Sequenz der Chunks; nutzt bevorzugt neighbors; sonst ord/chunk_index
- Explizite Referenzen:
* aus Chunk: `references[].target_id` (falls vorhanden)
* Fallback: Wikilinks in `window|text`: [[Some Title|some-id]] oder [[some-id]]
- NoteScope:
* backlink immer; references nur, wenn include_note_scope_refs=True
- TypDefaults (edge_defaults aus config/types.yaml des **QuellNotiztyps**):
* Für jede explizite Referenz wird je defaultRelation eine RegelKante erzeugt
* rule_id: "type_default:{note_type}:{relation}:v1", provenance="rule"
Konfiguration
-------------
- ENV MINDNET_TYPES_FILE (Default: ./config/types.yaml)
Lizenz/Autor
------------
- Erstimplementierung v1 (20250909) — Projekt Mindnet
- Erweiterung v2 (20251111) — kompatible SupersetImplementierung
"""
from __future__ import annotations
import os
import re
from typing import Dict, Iterable, List, Optional, Tuple, Set
try:
import yaml # optional, nur für types.yaml
except Exception: # pragma: no cover
yaml = None
# ------------------------------------------------------------
# Hilfen: types.yaml laden (edge_defaults)
# ------------------------------------------------------------
def _types_path() -> str:
return os.getenv("MINDNET_TYPES_FILE") or "./config/types.yaml"
def _load_types() -> Dict[str, dict]:
p = _types_path()
if not os.path.isfile(p) or yaml is None:
return {}
try:
with open(p, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict):
return data["types"]
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _edge_defaults_for(note_type: Optional[str]) -> List[str]:
types = _load_types()
t = (note_type or "").strip().lower()
cfg = types.get(t) or {}
defaults = cfg.get("edge_defaults") or []
if isinstance(defaults, str):
defaults = [defaults]
return [str(x) for x in defaults if isinstance(x, (str, int, float))]
# ------------------------------------------------------------
# WikilinkParser (Fallback, wenn ch["references"] fehlt)
# ------------------------------------------------------------
_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]")
def _extract_wikilinks(text: str) -> List[str]:
ids: List[str] = []
for m in _WIKILINK_RE.finditer(text or ""):
ids.append(m.group(1).strip())
return ids
# ------------------------------------------------------------
# Utility
# ------------------------------------------------------------
def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str:
base = f"{kind}:{s}->{t}#{scope}"
if rule_id:
base += f"|{rule_id}"
try:
import hashlib
return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest()
except Exception: # pragma: no cover
return base
def _dedupe(edges: List[Dict]) -> List[Dict]:
seen: Set[Tuple[str,str,str,str]] = set()
out: List[Dict] = []
for e in edges:
s = str(e.get("source_id") or "")
t = str(e.get("target_id") or "")
rel = str(e.get("relation") or e.get("kind") or "edge")
rule = str(e.get("rule_id") or "")
key = (s, t, rel, rule)
if key in seen:
continue
seen.add(key)
out.append(e)
return out
def _first(v: dict, *keys, default=None):
for k in keys:
if k in v and v[k] is not None:
return v[k]
return default
# ------------------------------------------------------------
# Hauptfunktion
# ------------------------------------------------------------
def build_edges_for_note(
note_id: str,
chunk_payloads: List[Dict],
note_level_refs: Optional[List[str]] = None,
*,
include_note_scope_refs: bool = False,
) -> List[Dict]:
edges: List[Dict] = []
chunks = list(chunk_payloads or [])
# Notiztyp aus erstem Chunk ableiten (kompatibel zu existierenden Payloads)
note_type = (chunks[0].get("type") if chunks else None) or (chunks[0].get("note_type") if chunks else None)
# --- Strukturkanten ------------------------------------------------------
# belongs_to
for ch in chunks:
cid = _first(ch, "id", "chunk_id")
if not cid:
continue
owner = ch.get("note_id") or note_id
e = {
"edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"),
"kind": "belongs_to",
"relation": "belongs_to",
"scope": "chunk",
"source_id": cid,
"target_id": note_id,
"note_id": owner, # v1-Kompat
# v2
"src_note_id": owner,
"src_chunk_id": cid,
"dst_note_id": note_id,
"provenance": "rule",
"rule_id": "structure:belongs_to:v1",
"confidence": 1.0,
}
edges.append(e)
# next/prev — bevorzugt neighbors.prev/next; sonst via ord/chunk_index
# Map der Chunks nach Index
ordered = list(chunks)
def _idx(c):
return _first(c, "chunk_index", "ord", default=0)
ordered.sort(key=_idx)
for i, ch in enumerate(ordered):
cid = _first(ch, "id", "chunk_id")
if not cid:
continue
owner = ch.get("note_id") or note_id
nb = ch.get("neighbors") or {}
prev_id = nb.get("prev")
next_id = nb.get("next")
# Fallback-Reihenfolge
if prev_id is None and i > 0:
prev_id = _first(ordered[i-1], "id", "chunk_id")
if next_id is None and i+1 < len(ordered):
next_id = _first(ordered[i+1], "id", "chunk_id")
if prev_id:
edges.append({
"edge_id": _mk_edge_id("prev", cid, prev_id, "chunk", "structure:order:v1"),
"kind": "prev", "relation": "prev", "scope": "chunk",
"source_id": cid, "target_id": prev_id, "note_id": owner,
"src_note_id": owner, "src_chunk_id": cid,
"dst_note_id": owner, "dst_chunk_id": prev_id,
"provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95,
})
edges.append({
"edge_id": _mk_edge_id("next", prev_id, cid, "chunk", "structure:order:v1"),
"kind": "next", "relation": "next", "scope": "chunk",
"source_id": prev_id, "target_id": cid, "note_id": owner,
"src_note_id": owner, "src_chunk_id": prev_id,
"dst_note_id": owner, "dst_chunk_id": cid,
"provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95,
})
# --- Explizite Referenzen (ChunkScope) ---------------------------------
explicit_refs: List[Dict] = []
for ch in chunks:
cid = _first(ch, "id", "chunk_id")
if not cid:
continue
owner = ch.get("note_id") or note_id
# 1) bevorzugt vorhandene ch["references"]
refs = ch.get("references") or []
targets = [r.get("target_id") for r in refs if isinstance(r, dict) and r.get("target_id")]
# 2) Fallback: Wikilinks aus Text
if not targets:
text = _first(ch, "window", "text", default="") or ""
targets = _extract_wikilinks(text)
for tid in targets:
if not isinstance(tid, str) or not tid.strip():
continue
e = {
"edge_id": _mk_edge_id("references", cid, tid, "chunk"),
"kind": "references",
"relation": "references",
"scope": "chunk",
"source_id": cid,
"target_id": tid,
"note_id": owner,
# v2
"src_note_id": owner,
"src_chunk_id": cid,
"dst_note_id": tid,
"provenance": "explicit",
"rule_id": "",
"confidence": 1.0,
}
edges.append(e)
explicit_refs.append(e)
# --- NoteScope: references (optional) + backlink (immer) ----------------
unique_refs = []
if note_level_refs:
seen = set()
for tid in note_level_refs:
if isinstance(tid, str) and tid.strip() and tid not in seen:
unique_refs.append(tid); seen.add(tid)
for tid in unique_refs:
if include_note_scope_refs:
edges.append({
"edge_id": _mk_edge_id("references", note_id, tid, "note"),
"kind": "references", "relation": "references", "scope": "note",
"source_id": note_id, "target_id": tid, "note_id": note_id,
"src_note_id": note_id, "dst_note_id": tid,
"provenance": "explicit", "rule_id": "", "confidence": 1.0,
})
edges.append({
"edge_id": _mk_edge_id("backlink", tid, note_id, "note", "derived:backlink:v1"),
"kind": "backlink", "relation": "backlink", "scope": "note",
"source_id": tid, "target_id": note_id, "note_id": note_id,
"src_note_id": tid, "dst_note_id": note_id,
"provenance": "rule", "rule_id": "derived:backlink:v1", "confidence": 0.9,
})
# --- TypeDefaults je expliziter Referenz --------------------------------
defaults = [d for d in _edge_defaults_for(note_type) if d and d != "references"]
if defaults:
for e in explicit_refs + ([ ] if not include_note_scope_refs else []):
# wir nutzen die bereits erzeugten explicitEdges als Vorlage
src = e["source_id"]; tgt = e["target_id"]
scope = e.get("scope", "chunk")
s_note = e.get("src_note_id") or note_id
s_chunk = e.get("src_chunk_id")
t_note = e.get("dst_note_id") or tgt
for rel in defaults:
rule_id = f"type_default:{(note_type or 'unknown')}:{rel}:v1"
edges.append({
"edge_id": _mk_edge_id(rel, src, tgt, scope, rule_id),
"kind": rel, "relation": rel, "scope": scope,
"source_id": src, "target_id": tgt, "note_id": s_note,
"src_note_id": s_note, "src_chunk_id": s_chunk,
"dst_note_id": t_note,
"provenance": "rule", "rule_id": rule_id, "confidence": 0.7,
})
# --- Dedupe & Return -----------------------------------------------------
return _dedupe(edges)