app/core/derive_edges.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
This commit is contained in:
parent
28c01caa1a
commit
6f0b463489
|
|
@ -2,39 +2,36 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Modul: app/core/derive_edges.py
|
||||
Version: 1.5.0 (Mindnet V2)
|
||||
Status: Stable
|
||||
Version: 2.1.0 (V2-superset mit "typed inline relations")
|
||||
|
||||
Ziele
|
||||
Zweck
|
||||
-----
|
||||
1) Beibehalten der bewährten Edge-Ableitung:
|
||||
- belongs_to (chunk -> note)
|
||||
- next / prev (Chunk-Kette)
|
||||
- references (chunk-scope) aus Chunk.window/text via `extract_wikilinks`
|
||||
Bewahrt die bestehende Edgelogik (belongs_to, prev/next, references, backlink)
|
||||
und ergänzt:
|
||||
- Typ-Default-Kanten gemäß config/types.yaml (edge_defaults je Notiztyp)
|
||||
- **Explizite, getypte Inline-Relationen** direkt im Chunk-Text:
|
||||
* [[rel:depends_on | Target Title]]
|
||||
* [[rel:related_to Target Title]]
|
||||
(beides wird erkannt; Groß-/Kleinschreibung egal)
|
||||
|
||||
2) Ergänzung: typenbasierte, abgeleitete Kanten aus `config/types.yaml`:
|
||||
- Für jede gefundene Referenz werden zusätzliche Relationen aus
|
||||
`edge_defaults` des Notiztyps erzeugt (z. B. "depends_on", "related_to").
|
||||
- Optional symmetrische Relationen (z. B. "related_to", "similar_to").
|
||||
- Dedupe bleibt kompatibel (Key: kind, source_id, target_id, scope).
|
||||
Konfiguration
|
||||
-------------
|
||||
- ENV MINDNET_TYPES_FILE (Default: ./config/types.yaml)
|
||||
|
||||
Hinweise
|
||||
--------
|
||||
- Es werden keine Markdown-Links neu geparst; wir bleiben bei der
|
||||
vorhandenen Parser-Logik (`extract_wikilinks`) zur Sicherung der Kompatibilität.
|
||||
- `edge_defaults` werden sowohl für Chunk-scope-Referenzen als auch – falls
|
||||
`include_note_scope_refs=True` – für Note-scope-Referenzen angewendet.
|
||||
Hinweis
|
||||
-------
|
||||
Diese Implementierung ist rückwärtskompatibel zur bisherigen Signatur.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Dict, List, Optional, Iterable, Set
|
||||
|
||||
import os
|
||||
import yaml
|
||||
|
||||
# Wikilinks-Parser beibehalten (Kompatibilität!)
|
||||
from app.core.parser import extract_wikilinks
|
||||
import re
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Set
|
||||
|
||||
try:
|
||||
import yaml # optional, nur für types.yaml
|
||||
except Exception: # pragma: no cover
|
||||
yaml = None
|
||||
|
||||
# ---------------------------- Utilities ------------------------------------
|
||||
|
||||
|
|
@ -54,7 +51,7 @@ def _chunk_text_for_refs(chunk: dict) -> str:
|
|||
or ""
|
||||
)
|
||||
|
||||
def _dedupe(seq: Iterable[str]) -> List[str]:
|
||||
def _dedupe_seq(seq: Iterable[str]) -> List[str]:
|
||||
seen: Set[str] = set()
|
||||
out: List[str] = []
|
||||
for s in seq:
|
||||
|
|
@ -66,20 +63,28 @@ def _dedupe(seq: Iterable[str]) -> List[str]:
|
|||
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
|
||||
pl = {
|
||||
"kind": kind,
|
||||
"scope": scope, # "chunk" | "note"
|
||||
"relation": kind, # v2 Feld
|
||||
"scope": scope, # "chunk" | "note"
|
||||
"source_id": source_id,
|
||||
"target_id": target_id,
|
||||
"note_id": note_id, # Träger/Quelle der Kante (aktuelle Note)
|
||||
"note_id": note_id, # Träger/Quelle der Kante (aktuelle Note)
|
||||
}
|
||||
if extra:
|
||||
pl.update(extra)
|
||||
return pl
|
||||
|
||||
def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str:
|
||||
base = f"{kind}:{s}->{t}#{scope}"
|
||||
if rule_id:
|
||||
base += f"|{rule_id}"
|
||||
try:
|
||||
import hashlib
|
||||
return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest()
|
||||
except Exception: # pragma: no cover
|
||||
return base
|
||||
|
||||
# ---------------------- Typen-Registry (types.yaml) ------------------------
|
||||
|
||||
SYM_REL = {"related_to", "similar_to"} # symmetrische Relationstypen
|
||||
|
||||
def _env(n: str, default: Optional[str] = None) -> str:
|
||||
v = os.getenv(n)
|
||||
return v if v is not None else (default or "")
|
||||
|
|
@ -87,6 +92,8 @@ def _env(n: str, default: Optional[str] = None) -> str:
|
|||
def _load_types_registry() -> dict:
|
||||
"""Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml"""
|
||||
p = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
|
||||
if not os.path.isfile(p) or yaml is None:
|
||||
return {}
|
||||
try:
|
||||
with open(p, "r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) or {}
|
||||
|
|
@ -121,6 +128,38 @@ def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]:
|
|||
# 3) leer
|
||||
return []
|
||||
|
||||
# ------------------------ Parser für Links ---------------------------------
|
||||
|
||||
# Normale Wikilinks (Fallback)
|
||||
_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]")
|
||||
|
||||
# Getypte Inline-Relationen:
|
||||
# [[rel:depends_on | Target]]
|
||||
# [[rel:related_to Target]]
|
||||
_REL_PIPE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s*\|\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
||||
_REL_SPACE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s+(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
||||
|
||||
def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
|
||||
"""
|
||||
Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links,
|
||||
damit die generische Wikilink-Erkennung sie nicht doppelt zählt.
|
||||
"""
|
||||
pairs: List[Tuple[str,str]] = []
|
||||
def _collect(m):
|
||||
k = (m.group("kind") or "").strip().lower()
|
||||
t = (m.group("target") or "").strip()
|
||||
if k and t:
|
||||
pairs.append((k, t))
|
||||
return "" # löschen
|
||||
text = _REL_PIPE.sub(_collect, text)
|
||||
text = _REL_SPACE.sub(_collect, text)
|
||||
return pairs, text
|
||||
|
||||
def _extract_wikilinks(text: str) -> List[str]:
|
||||
ids: List[str] = []
|
||||
for m in _WIKILINK_RE.finditer(text or ""):
|
||||
ids.append(m.group(1).strip())
|
||||
return ids
|
||||
|
||||
# --------------------------- Hauptfunktion ---------------------------------
|
||||
|
||||
|
|
@ -135,9 +174,10 @@ def build_edges_for_note(
|
|||
|
||||
- belongs_to: für jeden Chunk (chunk -> note)
|
||||
- next / prev: zwischen aufeinanderfolgenden Chunks
|
||||
- references: pro Chunk aus window/text (via extract_wikilinks)
|
||||
- references: pro Chunk aus window/text (via Wikilinks)
|
||||
- typed inline relations: [[rel:KIND | Target]] oder [[rel:KIND Target]]
|
||||
- optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references
|
||||
- NEU: typenbasierte, abgeleitete Kanten (edge_defaults) je gefundener Referenz
|
||||
- typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz
|
||||
"""
|
||||
edges: List[dict] = []
|
||||
|
||||
|
|
@ -151,7 +191,13 @@ def build_edges_for_note(
|
|||
cid = _get(ch, "chunk_id", "id")
|
||||
if not cid:
|
||||
continue
|
||||
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {"chunk_id": cid}))
|
||||
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {
|
||||
"chunk_id": cid,
|
||||
"edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": "structure:belongs_to:v1",
|
||||
"confidence": 1.0,
|
||||
}))
|
||||
|
||||
# --- 2) next/prev ---
|
||||
for i in range(len(chunks) - 1):
|
||||
|
|
@ -160,10 +206,22 @@ def build_edges_for_note(
|
|||
b_id = _get(b, "chunk_id", "id")
|
||||
if not a_id or not b_id:
|
||||
continue
|
||||
edges.append(_edge("next", "chunk", a_id, b_id, note_id, {"chunk_id": a_id}))
|
||||
edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {"chunk_id": b_id}))
|
||||
edges.append(_edge("next", "chunk", a_id, b_id, note_id, {
|
||||
"chunk_id": a_id,
|
||||
"edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": "structure:order:v1",
|
||||
"confidence": 0.95,
|
||||
}))
|
||||
edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {
|
||||
"chunk_id": b_id,
|
||||
"edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": "structure:order:v1",
|
||||
"confidence": 0.95,
|
||||
}))
|
||||
|
||||
# --- 3) references (chunk-scope) + abgeleitete Relationen je Ref ---
|
||||
# --- 3) references (chunk-scope) + typed inline relations + abgeleitete Relationen je Ref ---
|
||||
reg = _load_types_registry()
|
||||
defaults = _edge_defaults_for(note_type, reg)
|
||||
refs_all: List[str] = []
|
||||
|
|
@ -172,42 +230,111 @@ def build_edges_for_note(
|
|||
cid = _get(ch, "chunk_id", "id")
|
||||
if not cid:
|
||||
continue
|
||||
txt = _chunk_text_for_refs(ch)
|
||||
refs = extract_wikilinks(txt) # Parser-Logik nicht verändert
|
||||
raw = _chunk_text_for_refs(ch)
|
||||
|
||||
# a) typed inline relations zuerst extrahieren
|
||||
typed, remainder = _extract_typed_relations(raw)
|
||||
for kind, target in typed:
|
||||
edges.append(_edge(kind, "chunk", cid, target, note_id, {
|
||||
"chunk_id": cid,
|
||||
"edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel:v1"),
|
||||
"provenance": "explicit",
|
||||
"rule_id": "inline:rel:v1",
|
||||
"confidence": 0.95,
|
||||
}))
|
||||
# symmetrische Relationen zusätzlich rückwärts
|
||||
if kind in {"related_to", "similar_to"}:
|
||||
edges.append(_edge(kind, "chunk", target, cid, note_id, {
|
||||
"chunk_id": cid,
|
||||
"edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel:v1"),
|
||||
"provenance": "explicit",
|
||||
"rule_id": "inline:rel:v1",
|
||||
"confidence": 0.95,
|
||||
}))
|
||||
|
||||
# b) generische Wikilinks (remainder) → "references"
|
||||
refs = _extract_wikilinks(remainder)
|
||||
for r in refs:
|
||||
# reale Referenz (wie bisher)
|
||||
edges.append(_edge("references", "chunk", cid, r, note_id, {"chunk_id": cid, "ref_text": r}))
|
||||
edges.append(_edge("references", "chunk", cid, r, note_id, {
|
||||
"chunk_id": cid,
|
||||
"ref_text": r,
|
||||
"edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink:v1"),
|
||||
"provenance": "explicit",
|
||||
"rule_id": "explicit:wikilink:v1",
|
||||
"confidence": 1.0,
|
||||
}))
|
||||
# abgeleitete Kanten je default-Relation
|
||||
for rel in defaults:
|
||||
if rel == "references":
|
||||
continue # doppelt vermeiden
|
||||
edges.append(_edge(rel, "chunk", cid, r, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
edges.append(_edge(rel, "chunk", cid, r, note_id, {
|
||||
"chunk_id": cid,
|
||||
"edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": f"edge_defaults:{note_type}:{rel}:v1",
|
||||
"confidence": 0.7,
|
||||
}))
|
||||
# symmetrisch?
|
||||
if rel in {"related_to", "similar_to"}:
|
||||
edges.append(_edge(rel, "chunk", r, cid, note_id, {"chunk_id": cid, "rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
edges.append(_edge(rel, "chunk", r, cid, note_id, {
|
||||
"chunk_id": cid,
|
||||
"edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": f"edge_defaults:{note_type}:{rel}:v1",
|
||||
"confidence": 0.7,
|
||||
}))
|
||||
refs_all.extend(refs)
|
||||
|
||||
# --- 4) optional: note-scope references/backlinks (+ defaults) ---
|
||||
if include_note_scope_refs:
|
||||
refs_note = refs_all[:]
|
||||
refs_note = list(refs_all or [])
|
||||
if note_level_references:
|
||||
refs_note.extend([r for r in note_level_references if isinstance(r, str) and r])
|
||||
refs_note = _dedupe(refs_note)
|
||||
refs_note = _dedupe_seq(refs_note)
|
||||
for r in refs_note:
|
||||
# echte note-scope Referenz & Backlink (wie bisher)
|
||||
edges.append(_edge("references", "note", note_id, r, note_id))
|
||||
edges.append(_edge("backlink", "note", r, note_id, note_id))
|
||||
edges.append(_edge("references", "note", note_id, r, note_id, {
|
||||
"edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope:v1"),
|
||||
"provenance": "explicit",
|
||||
"rule_id": "explicit:note_scope:v1",
|
||||
"confidence": 1.0,
|
||||
}))
|
||||
edges.append(_edge("backlink", "note", r, note_id, note_id, {
|
||||
"edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": "derived:backlink:v1",
|
||||
"confidence": 0.9,
|
||||
}))
|
||||
# und zusätzlich default-Relationen (note-scope)
|
||||
for rel in defaults:
|
||||
if rel == "references":
|
||||
continue
|
||||
edges.append(_edge(rel, "note", note_id, r, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
edges.append(_edge(rel, "note", note_id, r, note_id, {
|
||||
"edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": f"edge_defaults:{note_type}:{rel}:v1",
|
||||
"confidence": 0.7,
|
||||
}))
|
||||
if rel in {"related_to", "similar_to"}:
|
||||
edges.append(_edge(rel, "note", r, note_id, note_id, {"rule_id": f"edge_defaults:{note_type}:{rel}", "confidence": 0.7}))
|
||||
edges.append(_edge(rel, "note", r, note_id, note_id, {
|
||||
"edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}:v1"),
|
||||
"provenance": "rule",
|
||||
"rule_id": f"edge_defaults:{note_type}:{rel}:v1",
|
||||
"confidence": 0.7,
|
||||
}))
|
||||
|
||||
# --- 5) Dedupe (unverändert kompatibel) ---
|
||||
dedup = {}
|
||||
# --- 5) Dedupe (Schlüssel: source_id, target_id, relation, rule_id) ---
|
||||
seen: Set[Tuple[str,str,str,str]] = set()
|
||||
out: List[dict] = []
|
||||
for e in edges:
|
||||
k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", ""))
|
||||
dedup[k] = e
|
||||
return list(dedup.values())
|
||||
s = str(e.get("source_id") or "")
|
||||
t = str(e.get("target_id") or "")
|
||||
rel = str(e.get("relation") or e.get("kind") or "edge")
|
||||
rule = str(e.get("rule_id") or "")
|
||||
key = (s, t, rel, rule)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
out.append(e)
|
||||
return out
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user