app/core/derive_edges.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
This commit is contained in:
parent
bc967f1f6e
commit
95b59e9b0a
|
|
@ -1,42 +1,96 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
app/core/derive_edges.py
|
||||
Mindnet V2 — Edge-Ableitung (real + defaults), idempotent
|
||||
Modul: app/core/derive_edges.py
|
||||
Version: 1.5.0 (Mindnet V2)
|
||||
Status: Stable
|
||||
|
||||
Erzeugt Kanten für eine Note aus:
|
||||
1) Sequenzkanten pro Chunk: belongs_to, next, prev
|
||||
2) Reale Referenzen aus Chunk-Text (Markdown-Links, Wikilinks) + optional Frontmatter-Refs
|
||||
3) Abgeleitete Kanten je Typ-Regel (types.yaml.edge_defaults), z. B. additional relations wie "depends_on", "related_to"
|
||||
- Regel-Tagging via rule_id="edge_defaults:<type>:<relation>"
|
||||
- De-Dupe via Key: (source_id, target_id, relation, rule_id)
|
||||
Ziele
|
||||
-----
|
||||
1) Beibehalten der bewährten Edge-Ableitung:
|
||||
- belongs_to (chunk -> note)
|
||||
- next / prev (Chunk-Kette)
|
||||
- references (chunk-scope) aus Chunk.window/text via `extract_wikilinks`
|
||||
|
||||
Edge-Payload-Minimum:
|
||||
- relation (alias: kind)
|
||||
- note_id (Quelle; also die ID der Note, zu der die Chunks gehören)
|
||||
- source_id (Chunk-ID oder Note-ID, je nach scope)
|
||||
- target_id (Note-/Slug-/URL-ID; deterministisch normalisiert)
|
||||
- chunk_id (falls scope='chunk')
|
||||
- scope: 'chunk'|'note'
|
||||
- confidence: float (bei abgeleitet z. B. 0.7)
|
||||
- rule_id: str | None
|
||||
2) Ergänzung: typenbasierte, abgeleitete Kanten aus `config/types.yaml`:
|
||||
- Für jede gefundene Referenz werden zusätzliche Relationen aus
|
||||
`edge_defaults` des Notiztyps erzeugt (z. B. "depends_on", "related_to").
|
||||
- Optional symmetrische Relationen (z. B. "related_to", "similar_to").
|
||||
- Dedupe bleibt kompatibel (Key: kind, source_id, target_id, scope).
|
||||
|
||||
Hinweise
|
||||
--------
|
||||
- Es werden keine Markdown-Links neu geparst; wir bleiben bei der
|
||||
vorhandenen Parser-Logik (`extract_wikilinks`) zur Sicherung der Kompatibilität.
|
||||
- `edge_defaults` werden sowohl für Chunk-scope-Referenzen als auch – falls
|
||||
`include_note_scope_refs=True` – für Note-scope-Referenzen angewendet.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
import os, re, yaml, hashlib
|
||||
from typing import Dict, List, Optional, Iterable, Set
|
||||
|
||||
# ---------------- Registry Laden ----------------
|
||||
import os
|
||||
import yaml
|
||||
|
||||
def _env(n: str, d: Optional[str]=None) -> str:
|
||||
# Wikilinks-Parser beibehalten (Kompatibilität!)
|
||||
from app.core.parser import extract_wikilinks
|
||||
|
||||
|
||||
# ---------------------------- Utilities ------------------------------------
|
||||
|
||||
def _get(d: dict, *keys, default=None):
|
||||
for k in keys:
|
||||
if k in d and d[k] is not None:
|
||||
return d[k]
|
||||
return default
|
||||
|
||||
def _chunk_text_for_refs(chunk: dict) -> str:
|
||||
# bevorzugt 'window' → dann 'text' → 'content' → 'raw'
|
||||
return (
|
||||
_get(chunk, "window")
|
||||
or _get(chunk, "text")
|
||||
or _get(chunk, "content")
|
||||
or _get(chunk, "raw")
|
||||
or ""
|
||||
)
|
||||
|
||||
def _dedupe(seq: Iterable[str]) -> List[str]:
|
||||
seen: Set[str] = set()
|
||||
out: List[str] = []
|
||||
for s in seq:
|
||||
if s not in seen:
|
||||
seen.add(s)
|
||||
out.append(s)
|
||||
return out
|
||||
|
||||
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
|
||||
pl = {
|
||||
"kind": kind,
|
||||
"scope": scope, # "chunk" | "note"
|
||||
"source_id": source_id,
|
||||
"target_id": target_id,
|
||||
"note_id": note_id, # Träger/Quelle der Kante (aktuelle Note)
|
||||
}
|
||||
if extra:
|
||||
pl.update(extra)
|
||||
return pl
|
||||
|
||||
|
||||
# ---------------------- Typen-Registry (types.yaml) ------------------------
|
||||
|
||||
SYM_REL = {"related_to", "similar_to"} # symmetrische Relationstypen
|
||||
|
||||
def _env(n: str, default: Optional[str] = None) -> str:
|
||||
v = os.getenv(n)
|
||||
return v if v is not None else (d or "")
|
||||
return v if v is not None else (default or "")
|
||||
|
||||
def _load_types() -> dict:
|
||||
def _load_types_registry() -> dict:
|
||||
"""Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml"""
|
||||
p = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
|
||||
try:
|
||||
with open(p, "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
data = yaml.safe_load(f) or {}
|
||||
return data
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
|
@ -45,166 +99,115 @@ def _get_types_map(reg: dict) -> dict:
|
|||
return reg["types"]
|
||||
return reg if isinstance(reg, dict) else {}
|
||||
|
||||
def _edge_defaults_for(note_type: str, reg: dict) -> List[str]:
|
||||
m = _get_types_map(reg)
|
||||
if isinstance(m, dict):
|
||||
t = m.get(note_type) or {}
|
||||
if isinstance(t, dict):
|
||||
vals = t.get("edge_defaults")
|
||||
if isinstance(vals, list):
|
||||
return [str(x) for x in vals if isinstance(x, (str,))]
|
||||
def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]:
|
||||
"""
|
||||
Liefert die edge_defaults-Liste für den gegebenen Notiztyp.
|
||||
Fallback-Reihenfolge:
|
||||
1) reg['types'][note_type]['edge_defaults']
|
||||
2) reg['defaults']['edge_defaults'] (oder 'default'/'global')
|
||||
3) []
|
||||
"""
|
||||
types_map = _get_types_map(reg)
|
||||
# 1) exakter Typ
|
||||
if note_type and isinstance(types_map, dict):
|
||||
t = types_map.get(note_type)
|
||||
if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list):
|
||||
return [str(x) for x in t["edge_defaults"] if isinstance(x, str)]
|
||||
# 2) Fallback
|
||||
for key in ("defaults", "default", "global"):
|
||||
v = reg.get(key)
|
||||
if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list):
|
||||
return [str(x) for x in v["edge_defaults"] if isinstance(x, str)]
|
||||
# 3) leer
|
||||
return []
|
||||
|
||||
# ---------------- Utils ----------------
|
||||
|
||||
SYM_REL = {"related_to", "similar_to"} # symmetrische Relationen
|
||||
# --------------------------- Hauptfunktion ---------------------------------
|
||||
|
||||
def _slug_id(s: str) -> str:
|
||||
s = (s or "").strip().lower()
|
||||
s = re.sub(r"\s+", "-", s)
|
||||
s = re.sub(r"[^\w\-:/#\.]", "", s) # lasse urls, hashes rudimentär zu
|
||||
if not s:
|
||||
s = "ref"
|
||||
return s
|
||||
|
||||
def _mk_edge_id(source_id: str, relation: str, target_id: str, rule_id: Optional[str]) -> str:
|
||||
base = f"{source_id}|{relation}|{target_id}|{rule_id or ''}"
|
||||
h = hashlib.sha1(base.encode("utf-8")).hexdigest()[:16]
|
||||
return f"e_{h}"
|
||||
|
||||
def _add(edge_list: List[Dict[str, Any]],
|
||||
dedupe: set,
|
||||
note_id: str,
|
||||
source_id: str,
|
||||
relation: str,
|
||||
target_id: str,
|
||||
*,
|
||||
chunk_id: Optional[str] = None,
|
||||
scope: str = "chunk",
|
||||
confidence: Optional[float] = None,
|
||||
rule_id: Optional[str] = None) -> None:
|
||||
key = (source_id, target_id, relation, rule_id or "")
|
||||
if key in dedupe:
|
||||
return
|
||||
dedupe.add(key)
|
||||
payload = {
|
||||
"edge_id": _mk_edge_id(source_id, relation, target_id, rule_id),
|
||||
"note_id": note_id,
|
||||
"kind": relation, # alias
|
||||
"relation": relation,
|
||||
"scope": scope,
|
||||
"source_id": source_id,
|
||||
"target_id": target_id,
|
||||
}
|
||||
if chunk_id:
|
||||
payload["chunk_id"] = chunk_id
|
||||
if confidence is not None:
|
||||
payload["confidence"] = float(confidence)
|
||||
if rule_id is not None:
|
||||
payload["rule_id"] = rule_id
|
||||
edge_list.append(payload)
|
||||
|
||||
# ---------------- Refs Parsen ----------------
|
||||
|
||||
MD_LINK = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") # [text](target)
|
||||
WIKI_LINK = re.compile(r"\[\[([^|\]]+)(?:\|[^]]+)?\]\]") # [[Title]] oder [[Title|alias]]
|
||||
|
||||
def _extract_refs(text: str) -> List[Tuple[str, str]]:
|
||||
"""liefert Liste (label, target) – target kann URL, Title, etc. sein"""
|
||||
out: List[Tuple[str,str]] = []
|
||||
if not text:
|
||||
return out
|
||||
for m in MD_LINK.finditer(text):
|
||||
label = (m.group(1) or "").strip()
|
||||
tgt = (m.group(2) or "").strip()
|
||||
out.append((label, tgt))
|
||||
for m in WIKI_LINK.finditer(text):
|
||||
title = (m.group(1) or "").strip()
|
||||
out.append((title, title))
|
||||
return out
|
||||
|
||||
# ---------------- Haupt-API ----------------
|
||||
|
||||
def build_edges_for_note(*,
|
||||
note_id: str,
|
||||
chunk_payloads: List[Dict[str, Any]],
|
||||
note_level_refs: Optional[List[Dict[str, Any]]] = None,
|
||||
include_note_scope_refs: bool = False) -> List[Dict[str, Any]]:
|
||||
def build_edges_for_note(
|
||||
note_id: str,
|
||||
chunks: List[dict],
|
||||
note_level_references: Optional[List[str]] = None,
|
||||
include_note_scope_refs: bool = False,
|
||||
) -> List[dict]:
|
||||
"""
|
||||
Baut alle Kanten für eine Note.
|
||||
- Sequenzkanten (belongs_to, next, prev)
|
||||
- Referenzen aus Chunk-Text (scope=chunk)
|
||||
- Abgeleitete Kanten gemäß edge_defaults aus types.yaml (für jede gefundene Referenz)
|
||||
Erzeugt Kanten für eine Note.
|
||||
|
||||
- belongs_to: für jeden Chunk (chunk -> note)
|
||||
- next / prev: zwischen aufeinanderfolgenden Chunks
|
||||
- references: pro Chunk aus window/text (via extract_wikilinks)
|
||||
- optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references
|
||||
- NEU: typenbasierte, abgeleitete Kanten (edge_defaults) je gefundener Referenz
|
||||
"""
|
||||
edges: List[dict] = []
|
||||
|
||||
# --- 0) Note-Typ ermitteln (aus erstem Chunk erwartet) ---
|
||||
note_type = None
|
||||
if chunk_payloads:
|
||||
note_type = chunk_payloads[0].get("type")
|
||||
reg = _load_types()
|
||||
defaults = _edge_defaults_for(note_type or "concept", reg)
|
||||
if chunks:
|
||||
note_type = _get(chunks[0], "type")
|
||||
|
||||
edges: List[Dict[str, Any]] = []
|
||||
seen = set()
|
||||
# --- 1) belongs_to ---
|
||||
for ch in chunks:
|
||||
cid = _get(ch, "chunk_id", "id")
|
||||
if not cid:
|
||||
continue
|
||||
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {"chunk_id": cid}))
|
||||
|
||||
# 1) Sequenzkanten
|
||||
for ch in chunk_payloads:
|
||||
cid = ch.get("chunk_id") or ch.get("id")
|
||||
nid = ch.get("note_id") or note_id
|
||||
idx = ch.get("index")
|
||||
# belongs_to
|
||||
_add(edges, seen, note_id=nid, source_id=cid, relation="belongs_to",
|
||||
target_id=nid, chunk_id=cid, scope="chunk")
|
||||
# next/prev
|
||||
for nb, rel in ((ch.get("neighbors_next"), "next"), (ch.get("neighbors_prev"), "prev")):
|
||||
if not nb:
|
||||
continue
|
||||
# neighbors sind Listen
|
||||
items = nb if isinstance(nb, list) else [nb]
|
||||
for tid in items:
|
||||
_add(edges, seen, note_id=nid, source_id=cid, relation=rel,
|
||||
target_id=tid, chunk_id=cid, scope="chunk")
|
||||
# --- 2) next/prev ---
|
||||
for i in range(len(chunks) - 1):
|
||||
a, b = chunks[i], chunks[i + 1]
|
||||
a_id = _get(a, "chunk_id", "id")
|
||||
b_id = _get(b, "chunk_id", "id")
|
||||
if not a_id or not b_id:
|
||||
continue
|
||||
edges.append(_edge("next", "chunk", a_id, b_id, note_id, {"chunk_id": a_id}))
|
||||
edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {"chunk_id": b_id}))
|
||||
|
||||
# 2) Refs aus Chunk-Text (+ derived edges je ref)
|
||||
for ch in chunk_payloads:
|
||||
cid = ch.get("chunk_id") or ch.get("id")
|
||||
nid = ch.get("note_id") or note_id
|
||||
text = ch.get("text") or ""
|
||||
for (label, tgt) in _extract_refs(text):
|
||||
target_id = _slug_id(tgt)
|
||||
# real reference
|
||||
_add(edges, seen, note_id=nid, source_id=cid, relation="references",
|
||||
target_id=target_id, chunk_id=cid, scope="chunk")
|
||||
# defaults amplification
|
||||
# --- 3) references (chunk-scope) + abgeleitete Relationen je Ref ---
|
||||
reg = _load_types_registry()
|
||||
defaults = _edge_defaults_for(note_type, reg)
|
||||
refs_all: List[str] = []
|
||||
|
||||
for ch in chunks:
|
||||
cid = _get(ch, "chunk_id", "id")
|
||||
if not cid:
|
||||
continue
|
||||
txt = _chunk_text_for_refs(ch)
|
||||
refs = extract_wikilinks(txt) # Parser-Logik nicht verändert
|
||||
for r in refs:
|
||||
# reale Referenz (wie bisher)
|
||||
edges.append(_edge("references", "chunk", cid, r, note_id, {"chunk_id": cid, "ref_text": r}))
|
||||
# abgeleitete Kanten je default-Relation
|
||||
for rel in defaults:
|
||||
if rel == "references":
|
||||
continue
|
||||
rule = f"edge_defaults:{note_type}:{rel}"
|
||||
_add(edges, seen, note_id=nid, source_id=cid, relation=rel,
|
||||
target_id=target_id, chunk_id=cid, scope="chunk",
|
||||
confidence=0.7, rule_id=rule)
|
||||
continue # doppelt vermeiden
|
||||
edges.append(_edge(rel, "chunk", cid, r, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
# symmetrisch?
|
||||
if rel in SYM_REL:
|
||||
_add(edges, seen, note_id=nid, source_id=target_id, relation=rel,
|
||||
target_id=cid, chunk_id=cid, scope="chunk",
|
||||
confidence=0.7, rule_id=rule)
|
||||
if rel in {"related_to", "similar_to"}:
|
||||
edges.append(_edge(rel, "chunk", r, cid, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
refs_all.extend(refs)
|
||||
|
||||
# 3) optionale Note-Scope-Refs aus Frontmatter (falls geliefert)
|
||||
note_level_refs = note_level_refs or []
|
||||
if include_note_scope_refs and note_level_refs:
|
||||
nid = note_id
|
||||
for r in note_level_refs:
|
||||
tgt = (r or {}).get("target_id") or (r or {}).get("target") or ""
|
||||
if not tgt:
|
||||
continue
|
||||
target_id = _slug_id(str(tgt))
|
||||
_add(edges, seen, note_id=nid, source_id=nid, relation="references",
|
||||
target_id=target_id, chunk_id=None, scope="note")
|
||||
# --- 4) optional: note-scope references/backlinks (+ defaults) ---
|
||||
if include_note_scope_refs:
|
||||
refs_note = refs_all[:]
|
||||
if note_level_references:
|
||||
refs_note.extend([r for r in note_level_references if isinstance(r, str) and r])
|
||||
refs_note = _dedupe(refs_note)
|
||||
for r in refs_note:
|
||||
# echte note-scope Referenz & Backlink (wie bisher)
|
||||
edges.append(_edge("references", "note", note_id, r, note_id))
|
||||
edges.append(_edge("backlink", "note", r, note_id, note_id))
|
||||
# und zusätzlich default-Relationen (note-scope)
|
||||
for rel in defaults:
|
||||
if rel == "references":
|
||||
continue
|
||||
rule = f"edge_defaults:{note_type}:{rel}"
|
||||
_add(edges, seen, note_id=nid, source_id=nid, relation=rel,
|
||||
target_id=target_id, chunk_id=None, scope="note",
|
||||
confidence=0.7, rule_id=rule)
|
||||
|
||||
return edges
|
||||
edges.append(_edge(rel, "note", note_id, r, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
if rel in {"related_to", "similar_to"}:
|
||||
edges.append(_edge(rel, "note", r, note_id, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
|
||||
# --- 5) Dedupe (unverändert kompatibel) ---
|
||||
dedup = {}
|
||||
for e in edges:
|
||||
k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", ""))
|
||||
dedup[k] = e
|
||||
return list(dedup.values())
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user