app/core/derive_edges.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
This commit is contained in:
parent
9a7bc32051
commit
aab010ff17
|
|
@ -1,435 +1,364 @@
|
||||||
#!/usr/bin/env python3
|
# app/core/derive_edges.py
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
"""
|
"""
|
||||||
Modul: app/core/derive_edges.py
|
Edge-Ableitung (V2)
|
||||||
Zweck:
|
Beibehaltung der bestehenden Funktionalität + Erweiterung:
|
||||||
- Bewahrt bestehende Edgelogik (belongs_to, prev/next, references, backlink)
|
- Mehrere Inline-Referenzen in einer Zeile: rel: <relation> [[A]] [[B]] ...
|
||||||
- Ergänzt typenbasierte Default-Kanten (edge_defaults aus config/types.yaml)
|
Kompatibel mit:
|
||||||
- Unterstützt "typed inline relations":
|
- Strukturkanten: belongs_to / next / prev
|
||||||
* [[rel:KIND | Target]]
|
- Explizite Wikilinks -> references
|
||||||
* [[rel:KIND Target]]
|
- Inline-Relationen -> inline:rel
|
||||||
* rel: KIND [[Target]]
|
- Callout-Kanten -> callout:edge
|
||||||
- Unterstützt Obsidian-Callouts:
|
- Typbasierte Default-Kanten (edge_defaults aus types.yaml)
|
||||||
* > [!edge] KIND: [[Target]] [[Target2]] ...
|
|
||||||
Kompatibilität:
|
|
||||||
- build_edges_for_note(...) Signatur unverändert
|
|
||||||
- rule_id Werte:
|
|
||||||
* structure:belongs_to
|
|
||||||
* structure:order
|
|
||||||
* explicit:wikilink
|
|
||||||
* inline:rel
|
|
||||||
* callout:edge
|
|
||||||
* edge_defaults:<type>:<relation>
|
|
||||||
* derived:backlink
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import os
|
|
||||||
import re
|
import re
|
||||||
from typing import Iterable, List, Optional, Tuple, Set, Dict
|
from typing import Dict, List, Iterable, Tuple, Set
|
||||||
|
|
||||||
try:
|
# ----------------------------------------------------------------------
|
||||||
import yaml # optional, nur für types.yaml
|
# Regex-Bausteine
|
||||||
except Exception: # pragma: no cover
|
# ----------------------------------------------------------------------
|
||||||
yaml = None
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
# Wikilinks: [[Title]] oder [[Title|Alias]]
|
||||||
# Utilities
|
RE_WIKILINK = re.compile(r"\[\[([^\]|#]+)(?:#[^\]|]+)?(?:\|[^\]]+)?\]\]")
|
||||||
# --------------------------------------------------------------------------- #
|
|
||||||
|
|
||||||
def _get(d: dict, *keys, default=None):
|
# Inline-Relationen (Variante B – von dir im Einsatz):
|
||||||
for k in keys:
|
# rel: <relation> [[Target A]] [[Target B]] ...
|
||||||
if isinstance(d, dict) and k in d and d[k] is not None:
|
RE_INLINE_REL_LINE = re.compile(
|
||||||
return d[k]
|
r"(?i)\brel\s*:\s*(?P<rel>[a-z_][a-z0-9_]+)\s+(?P<body>.+)$"
|
||||||
return default
|
|
||||||
|
|
||||||
def _chunk_text_for_refs(chunk: dict) -> str:
|
|
||||||
# bevorzugt 'window' → dann 'text' → 'content' → 'raw'
|
|
||||||
return (
|
|
||||||
_get(chunk, "window")
|
|
||||||
or _get(chunk, "text")
|
|
||||||
or _get(chunk, "content")
|
|
||||||
or _get(chunk, "raw")
|
|
||||||
or ""
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _dedupe_seq(seq: Iterable[str]) -> List[str]:
|
# Callout:
|
||||||
seen: Set[str] = set()
|
# > [!edge] <relation>: [[A]] [[B]]
|
||||||
out: List[str] = []
|
RE_CALLOUT_HEADER = re.compile(r"^\s{0,3}>\s*\[\!edge\]\s*(?P<rel>[a-z_][a-z0-9_]+)\s*:\s*(?P<body>.*)$", re.IGNORECASE)
|
||||||
for s in seq:
|
|
||||||
if s not in seen:
|
|
||||||
seen.add(s)
|
|
||||||
out.append(s)
|
|
||||||
return out
|
|
||||||
|
|
||||||
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
|
# ----------------------------------------------------------------------
|
||||||
|
# Utilities
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _neighbors_chain(chunk_ids: List[str]) -> Iterable[Tuple[str, str]]:
|
||||||
|
"""Erzeugt (prev, next) Paare entlang der Chunk-Sequenz."""
|
||||||
|
for i in range(len(chunk_ids) - 1):
|
||||||
|
yield chunk_ids[i], chunk_ids[i + 1]
|
||||||
|
|
||||||
|
def _mk_edge_payload(
|
||||||
|
*,
|
||||||
|
kind: str,
|
||||||
|
scope: str,
|
||||||
|
note_id: str,
|
||||||
|
chunk_id: str | None = None,
|
||||||
|
source_id: str,
|
||||||
|
target_id: str,
|
||||||
|
rule_id: str,
|
||||||
|
confidence: float,
|
||||||
|
) -> Dict:
|
||||||
|
"""
|
||||||
|
Einheitliches Edge-Payload-Format.
|
||||||
|
"""
|
||||||
pl = {
|
pl = {
|
||||||
"kind": kind,
|
"kind": kind, # z.B. references, depends_on, related_to, similar_to
|
||||||
"relation": kind, # Alias (v2)
|
"scope": scope, # "chunk" oder "note"
|
||||||
"scope": scope, # "chunk" | "note"
|
"note_id": note_id, # Note-Kontext (Quelle)
|
||||||
"source_id": source_id,
|
"source_id": source_id, # id der Quelle (Chunk-ID oder Note-ID)
|
||||||
"target_id": target_id,
|
"target_id": target_id, # Ziel (Note-ID oder Titel, falls Auflösung extern erfolgt)
|
||||||
"note_id": note_id, # Träger-Note der Kante
|
"rule_id": rule_id,
|
||||||
|
"confidence": confidence,
|
||||||
}
|
}
|
||||||
if extra:
|
if chunk_id:
|
||||||
pl.update(extra)
|
pl["chunk_id"] = chunk_id
|
||||||
return pl
|
return pl
|
||||||
|
|
||||||
def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str:
|
|
||||||
base = f"{kind}:{s}->{t}#{scope}"
|
|
||||||
if rule_id:
|
|
||||||
base += f"|{rule_id}"
|
|
||||||
try:
|
|
||||||
import hashlib
|
|
||||||
return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest()
|
|
||||||
except Exception: # pragma: no cover
|
|
||||||
return base
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
|
||||||
# Typen-Registry (types.yaml)
|
|
||||||
# --------------------------------------------------------------------------- #
|
|
||||||
|
|
||||||
def _env(n: str, default: Optional[str] = None) -> str:
|
|
||||||
v = os.getenv(n)
|
|
||||||
return v if v is not None else (default or "")
|
|
||||||
|
|
||||||
def _load_types_registry() -> dict:
|
|
||||||
"""Lädt die YAML-Registry aus MINDNET_TYPES_FILE oder ./config/types.yaml"""
|
|
||||||
p = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
|
|
||||||
if not os.path.isfile(p) or yaml is None:
|
|
||||||
return {}
|
|
||||||
try:
|
|
||||||
with open(p, "r", encoding="utf-8") as f:
|
|
||||||
data = yaml.safe_load(f) or {}
|
|
||||||
return data
|
|
||||||
except Exception:
|
|
||||||
return {}
|
|
||||||
|
|
||||||
def _get_types_map(reg: dict) -> dict:
|
|
||||||
if isinstance(reg, dict) and isinstance(reg.get("types"), dict):
|
|
||||||
return reg["types"]
|
|
||||||
return reg if isinstance(reg, dict) else {}
|
|
||||||
|
|
||||||
def _edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]:
|
|
||||||
"""
|
|
||||||
Liefert die edge_defaults-Liste für den gegebenen Notiztyp.
|
|
||||||
Fallback-Reihenfolge:
|
|
||||||
1) reg['types'][note_type]['edge_defaults']
|
|
||||||
2) reg['defaults']['edge_defaults'] (oder 'default'/'global')
|
|
||||||
3) []
|
|
||||||
"""
|
|
||||||
types_map = _get_types_map(reg)
|
|
||||||
if note_type and isinstance(types_map, dict):
|
|
||||||
t = types_map.get(note_type)
|
|
||||||
if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list):
|
|
||||||
return [str(x) for x in t["edge_defaults"] if isinstance(x, str)]
|
|
||||||
for key in ("defaults", "default", "global"):
|
|
||||||
v = reg.get(key)
|
|
||||||
if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list):
|
|
||||||
return [str(x) for x in v["edge_defaults"] if isinstance(x, str)]
|
|
||||||
return []
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
|
||||||
# Parser für Links / Relationen
|
|
||||||
# --------------------------------------------------------------------------- #
|
|
||||||
|
|
||||||
# Normale Wikilinks (Fallback)
|
|
||||||
_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]")
|
|
||||||
|
|
||||||
# Getypte Inline-Relationen:
|
|
||||||
# [[rel:KIND | Target]]
|
|
||||||
# [[rel:KIND Target]]
|
|
||||||
_REL_PIPE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s*\|\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
|
||||||
_REL_SPACE = re.compile(r"\[\[\s*rel:(?P<kind>[a-z_]+)\s+(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
|
||||||
# rel: KIND [[Target]] (reines Textmuster)
|
|
||||||
_REL_TEXT = re.compile(r"rel\s*:\s*(?P<kind>[a-z_]+)\s*\[\[\s*(?P<target>[^\]]+?)\s*\]\]", re.IGNORECASE)
|
|
||||||
|
|
||||||
def _extract_typed_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
|
|
||||||
"""
|
|
||||||
Gibt Liste (kind, target) zurück und den Text mit entfernten getypten Relation-Links,
|
|
||||||
damit die generische Wikilink-Erkennung sie nicht doppelt zählt.
|
|
||||||
Unterstützt drei Varianten:
|
|
||||||
- [[rel:KIND | Target]]
|
|
||||||
- [[rel:KIND Target]]
|
|
||||||
- rel: KIND [[Target]]
|
|
||||||
"""
|
|
||||||
pairs: List[Tuple[str,str]] = []
|
|
||||||
def _collect(m):
|
|
||||||
k = (m.group("kind") or "").strip().lower()
|
|
||||||
t = (m.group("target") or "").strip()
|
|
||||||
if k and t:
|
|
||||||
pairs.append((k, t))
|
|
||||||
return "" # Link entfernen
|
|
||||||
|
|
||||||
text = _REL_PIPE.sub(_collect, text)
|
|
||||||
text = _REL_SPACE.sub(_collect, text)
|
|
||||||
text = _REL_TEXT.sub(_collect, text)
|
|
||||||
return pairs, text
|
|
||||||
|
|
||||||
# Obsidian Callout Parser
|
|
||||||
_CALLOUT_START = re.compile(r"^\s*>\s*\[!edge\]\s*(.*)$", re.IGNORECASE)
|
|
||||||
_REL_LINE = re.compile(r"^(?P<kind>[a-z_]+)\s*:\s*(?P<targets>.+?)\s*$", re.IGNORECASE)
|
|
||||||
_WIKILINKS_IN_LINE = re.compile(r"\[\[([^\]]+)\]\]")
|
|
||||||
|
|
||||||
def _extract_callout_relations(text: str) -> Tuple[List[Tuple[str,str]], str]:
|
|
||||||
"""
|
|
||||||
Findet [!edge]-Callouts und extrahiert (kind, target). Entfernt den gesamten
|
|
||||||
Callout-Block aus dem Text (damit Wikilinks daraus nicht zusätzlich als
|
|
||||||
"references" gezählt werden).
|
|
||||||
"""
|
|
||||||
if not text:
|
|
||||||
return [], text
|
|
||||||
|
|
||||||
lines = text.splitlines()
|
|
||||||
out_pairs: List[Tuple[str,str]] = []
|
|
||||||
keep_lines: List[str] = []
|
|
||||||
i = 0
|
|
||||||
|
|
||||||
while i < len(lines):
|
|
||||||
m = _CALLOUT_START.match(lines[i])
|
|
||||||
if not m:
|
|
||||||
keep_lines.append(lines[i])
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
block_lines: List[str] = []
|
|
||||||
first_rest = m.group(1) or ""
|
|
||||||
if first_rest.strip():
|
|
||||||
block_lines.append(first_rest)
|
|
||||||
|
|
||||||
i += 1
|
|
||||||
while i < len(lines) and lines[i].lstrip().startswith('>'):
|
|
||||||
block_lines.append(lines[i].lstrip()[1:].lstrip())
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
for bl in block_lines:
|
|
||||||
mrel = _REL_LINE.match(bl)
|
|
||||||
if not mrel:
|
|
||||||
continue
|
|
||||||
kind = (mrel.group("kind") or "").strip().lower()
|
|
||||||
targets = mrel.group("targets") or ""
|
|
||||||
found = _WIKILINKS_IN_LINE.findall(targets)
|
|
||||||
if found:
|
|
||||||
for t in found:
|
|
||||||
t = t.strip()
|
|
||||||
if t:
|
|
||||||
out_pairs.append((kind, t))
|
|
||||||
else:
|
|
||||||
for raw in re.split(r"[,;]", targets):
|
|
||||||
t = raw.strip()
|
|
||||||
if t:
|
|
||||||
out_pairs.append((kind, t))
|
|
||||||
|
|
||||||
# Callout wird NICHT in keep_lines übernommen
|
|
||||||
continue
|
|
||||||
|
|
||||||
remainder = "\n".join(keep_lines)
|
|
||||||
return out_pairs, remainder
|
|
||||||
|
|
||||||
def _extract_wikilinks(text: str) -> List[str]:
|
def _extract_wikilinks(text: str) -> List[str]:
|
||||||
ids: List[str] = []
|
|
||||||
for m in _WIKILINK_RE.finditer(text or ""):
|
|
||||||
ids.append(m.group(1).strip())
|
|
||||||
return ids
|
|
||||||
|
|
||||||
# --------------------------------------------------------------------------- #
|
|
||||||
# Hauptfunktion
|
|
||||||
# --------------------------------------------------------------------------- #
|
|
||||||
|
|
||||||
def build_edges_for_note(
|
|
||||||
note_id: str,
|
|
||||||
chunks: List[dict],
|
|
||||||
note_level_references: Optional[List[str]] = None,
|
|
||||||
include_note_scope_refs: bool = False,
|
|
||||||
) -> List[dict]:
|
|
||||||
"""
|
"""
|
||||||
Erzeugt Kanten für eine Note.
|
Extrahiert alle Wikilink-Ziele (als Titel-Strings).
|
||||||
|
|
||||||
- belongs_to: für jeden Chunk (chunk -> note)
|
|
||||||
- next / prev: zwischen aufeinanderfolgenden Chunks
|
|
||||||
- references: pro Chunk aus window/text (via Wikilinks)
|
|
||||||
- typed inline relations: [[rel:KIND | Target]] / [[rel:KIND Target]] / rel: KIND [[Target]]
|
|
||||||
- Obsidian Callouts: > [!edge] KIND: [[Target]] [[Target2]]
|
|
||||||
- optional note-scope references/backlinks: dedupliziert über alle Chunk-Funde + note_level_references
|
|
||||||
- typenbasierte Default-Kanten (edge_defaults) je gefundener Referenz
|
|
||||||
"""
|
"""
|
||||||
edges: List[dict] = []
|
return [m.group(1).strip() for m in RE_WIKILINK.finditer(text or "")]
|
||||||
|
|
||||||
# Note-Typ (aus erstem Chunk erwartet)
|
def _extract_inline_relations_lines(text: str) -> List[Tuple[str, List[str]]]:
|
||||||
note_type = None
|
"""
|
||||||
if chunks:
|
Findet Inline-Relationen in Zeilen wie:
|
||||||
note_type = _get(chunks[0], "type")
|
rel: <relation> [[Target A]] [[Target B]]
|
||||||
|
Liefert Liste von (relation, [targets...]).
|
||||||
|
"""
|
||||||
|
out: List[Tuple[str, List[str]]] = []
|
||||||
|
if not text:
|
||||||
|
return out
|
||||||
|
for line in text.splitlines():
|
||||||
|
m = RE_INLINE_REL_LINE.search(line)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
rel = m.group("rel").strip().lower()
|
||||||
|
body = m.group("body")
|
||||||
|
# alle [[...]] Ziele aus body herausziehen:
|
||||||
|
targets = _extract_wikilinks(body)
|
||||||
|
# falls im Body keine [[...]] vorkommen, versuche verbleibenden Text als ein Ziel (robust):
|
||||||
|
if not targets:
|
||||||
|
cleaned = body.strip()
|
||||||
|
if cleaned:
|
||||||
|
targets = [cleaned]
|
||||||
|
if targets:
|
||||||
|
out.append((rel, targets))
|
||||||
|
return out
|
||||||
|
|
||||||
# 1) belongs_to
|
def _extract_callout_edges(text: str) -> List[Tuple[str, List[str]]]:
|
||||||
for ch in chunks:
|
"""
|
||||||
cid = _get(ch, "chunk_id", "id")
|
Callout-Edges:
|
||||||
|
> [!edge] <relation>: [[A]] [[B]]
|
||||||
|
pro Zeile eine Relation + 1..n Ziele
|
||||||
|
"""
|
||||||
|
out: List[Tuple[str, List[str]]] = []
|
||||||
|
if not text:
|
||||||
|
return out
|
||||||
|
for line in text.splitlines():
|
||||||
|
m = RE_CALLOUT_HEADER.match(line)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
rel = m.group("rel").strip().lower()
|
||||||
|
body = m.group("body")
|
||||||
|
targets = _extract_wikilinks(body)
|
||||||
|
# Robustheit: wenn keine [[...]] vorhanden, restlicher body als ein Ziel
|
||||||
|
if not targets:
|
||||||
|
cleaned = body.strip()
|
||||||
|
if cleaned:
|
||||||
|
targets = [cleaned]
|
||||||
|
if targets:
|
||||||
|
out.append((rel, targets))
|
||||||
|
return out
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Haupt-API
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def derive_edges(
|
||||||
|
note: Dict,
|
||||||
|
chunks: List[Dict],
|
||||||
|
types_cfg: Dict | None = None,
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
Leitet Kanten für eine Note ab.
|
||||||
|
|
||||||
|
Erwartete Felder:
|
||||||
|
note: {
|
||||||
|
"note_id": str,
|
||||||
|
"title": str,
|
||||||
|
"type": str,
|
||||||
|
"text": str
|
||||||
|
}
|
||||||
|
chunks: [{
|
||||||
|
"chunk_id": str,
|
||||||
|
"index": int,
|
||||||
|
"text": str,
|
||||||
|
...
|
||||||
|
}, ...]
|
||||||
|
|
||||||
|
types_cfg (aus types.yaml geladen) mit:
|
||||||
|
types_cfg["types"][<type>]["edge_defaults"] = [relation, ...]
|
||||||
|
(optional)
|
||||||
|
"""
|
||||||
|
edges: List[Dict] = []
|
||||||
|
|
||||||
|
note_id = note.get("note_id") or note.get("id")
|
||||||
|
note_title = note.get("title") or ""
|
||||||
|
note_type = (note.get("type") or "").strip().lower()
|
||||||
|
note_text = note.get("text") or ""
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# 1) Strukturkanten je Chunk: belongs_to / next / prev
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
chunk_ids = [c.get("chunk_id") for c in chunks if c.get("chunk_id")]
|
||||||
|
# belongs_to
|
||||||
|
for c in chunks:
|
||||||
|
cid = c.get("chunk_id")
|
||||||
if not cid:
|
if not cid:
|
||||||
continue
|
continue
|
||||||
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {
|
edges.append(
|
||||||
"chunk_id": cid,
|
_mk_edge_payload(
|
||||||
"edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"),
|
kind="belongs_to",
|
||||||
"provenance": "rule",
|
scope="chunk",
|
||||||
"rule_id": "structure:belongs_to",
|
note_id=note_id,
|
||||||
"confidence": 1.0,
|
chunk_id=cid,
|
||||||
}))
|
source_id=cid,
|
||||||
|
target_id=note_id,
|
||||||
|
rule_id="structure:belongs_to",
|
||||||
|
confidence=1.0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# next/prev
|
||||||
|
for prev_id, next_id in _neighbors_chain(chunk_ids):
|
||||||
|
# next
|
||||||
|
edges.append(
|
||||||
|
_mk_edge_payload(
|
||||||
|
kind="next",
|
||||||
|
scope="chunk",
|
||||||
|
note_id=note_id,
|
||||||
|
chunk_id=prev_id,
|
||||||
|
source_id=prev_id,
|
||||||
|
target_id=next_id,
|
||||||
|
rule_id="structure:next",
|
||||||
|
confidence=1.0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# prev
|
||||||
|
edges.append(
|
||||||
|
_mk_edge_payload(
|
||||||
|
kind="prev",
|
||||||
|
scope="chunk",
|
||||||
|
note_id=note_id,
|
||||||
|
chunk_id=next_id,
|
||||||
|
source_id=next_id,
|
||||||
|
target_id=prev_id,
|
||||||
|
rule_id="structure:prev",
|
||||||
|
confidence=1.0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# 2) next / prev
|
# ------------------------------------------------------------------
|
||||||
for i in range(len(chunks) - 1):
|
# 2) Explizite Referenzen (Wikilinks) + Inline-Relationen + Callouts
|
||||||
a, b = chunks[i], chunks[i + 1]
|
# - Alles chunk-scope, Quelle = chunk_id (falls vorhanden),
|
||||||
a_id = _get(a, "chunk_id", "id")
|
# sonst Note-scope als Fallback.
|
||||||
b_id = _get(b, "chunk_id", "id")
|
# ------------------------------------------------------------------
|
||||||
if not a_id or not b_id:
|
# Sammle alle expliziten Ziele (für spätere edge_defaults)
|
||||||
continue
|
explicit_targets: Set[str] = set()
|
||||||
edges.append(_edge("next", "chunk", a_id, b_id, note_id, {
|
|
||||||
"chunk_id": a_id,
|
|
||||||
"edge_id": _mk_edge_id("next", a_id, b_id, "chunk", "structure:order"),
|
|
||||||
"provenance": "rule",
|
|
||||||
"rule_id": "structure:order",
|
|
||||||
"confidence": 0.95,
|
|
||||||
}))
|
|
||||||
edges.append(_edge("prev", "chunk", b_id, a_id, note_id, {
|
|
||||||
"chunk_id": b_id,
|
|
||||||
"edge_id": _mk_edge_id("prev", b_id, a_id, "chunk", "structure:order"),
|
|
||||||
"provenance": "rule",
|
|
||||||
"rule_id": "structure:order",
|
|
||||||
"confidence": 0.95,
|
|
||||||
}))
|
|
||||||
|
|
||||||
# 3) references + typed inline + callouts + defaults (chunk-scope)
|
# pro Chunk prüfen
|
||||||
reg = _load_types_registry()
|
for c in chunks:
|
||||||
defaults = _edge_defaults_for(note_type, reg)
|
cid = c.get("chunk_id")
|
||||||
refs_all: List[str] = []
|
ctxt = c.get("text") or ""
|
||||||
|
|
||||||
for ch in chunks:
|
# 2a) Wikilinks -> references
|
||||||
cid = _get(ch, "chunk_id", "id")
|
for tgt in _extract_wikilinks(ctxt):
|
||||||
if not cid:
|
explicit_targets.add(tgt)
|
||||||
continue
|
edges.append(
|
||||||
raw = _chunk_text_for_refs(ch)
|
_mk_edge_payload(
|
||||||
|
kind="references",
|
||||||
|
scope="chunk",
|
||||||
|
note_id=note_id,
|
||||||
|
chunk_id=cid,
|
||||||
|
source_id=cid,
|
||||||
|
target_id=tgt,
|
||||||
|
rule_id="explicit:wikilink",
|
||||||
|
confidence=1.0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# 3a) typed inline relations
|
# 2b) Inline-Relationen (mehrere Ziele erlaubt)
|
||||||
typed, remainder = _extract_typed_relations(raw)
|
for rel, targets in _extract_inline_relations_lines(ctxt):
|
||||||
for kind, target in typed:
|
for tgt in targets:
|
||||||
kind = kind.strip().lower()
|
explicit_targets.add(tgt)
|
||||||
if not kind or not target:
|
edges.append(
|
||||||
continue
|
_mk_edge_payload(
|
||||||
edges.append(_edge(kind, "chunk", cid, target, note_id, {
|
kind=rel,
|
||||||
"chunk_id": cid,
|
scope="chunk",
|
||||||
"edge_id": _mk_edge_id(kind, cid, target, "chunk", "inline:rel"),
|
note_id=note_id,
|
||||||
"provenance": "explicit",
|
chunk_id=cid,
|
||||||
"rule_id": "inline:rel",
|
source_id=cid,
|
||||||
"confidence": 0.95,
|
target_id=tgt,
|
||||||
}))
|
rule_id="inline:rel",
|
||||||
if kind in {"related_to", "similar_to"}:
|
confidence=0.95,
|
||||||
edges.append(_edge(kind, "chunk", target, cid, note_id, {
|
)
|
||||||
"chunk_id": cid,
|
)
|
||||||
"edge_id": _mk_edge_id(kind, target, cid, "chunk", "inline:rel"),
|
|
||||||
"provenance": "explicit",
|
|
||||||
"rule_id": "inline:rel",
|
|
||||||
"confidence": 0.95,
|
|
||||||
}))
|
|
||||||
|
|
||||||
# 3b) callouts
|
# 2c) Callout-Edges (mehrere Ziele erlaubt)
|
||||||
call_pairs, remainder2 = _extract_callout_relations(remainder)
|
for rel, targets in _extract_callout_edges(ctxt):
|
||||||
for kind, target in call_pairs:
|
for tgt in targets:
|
||||||
k = (kind or "").strip().lower()
|
explicit_targets.add(tgt)
|
||||||
if not k or not target:
|
edges.append(
|
||||||
continue
|
_mk_edge_payload(
|
||||||
edges.append(_edge(k, "chunk", cid, target, note_id, {
|
kind=rel,
|
||||||
"chunk_id": cid,
|
scope="chunk",
|
||||||
"edge_id": _mk_edge_id(k, cid, target, "chunk", "callout:edge"),
|
note_id=note_id,
|
||||||
"provenance": "explicit",
|
chunk_id=cid,
|
||||||
"rule_id": "callout:edge",
|
source_id=cid,
|
||||||
"confidence": 0.95,
|
target_id=tgt,
|
||||||
}))
|
rule_id="callout:edge",
|
||||||
if k in {"related_to", "similar_to"}:
|
confidence=0.9,
|
||||||
edges.append(_edge(k, "chunk", target, cid, note_id, {
|
)
|
||||||
"chunk_id": cid,
|
)
|
||||||
"edge_id": _mk_edge_id(k, target, cid, "chunk", "callout:edge"),
|
|
||||||
"provenance": "explicit",
|
|
||||||
"rule_id": "callout:edge",
|
|
||||||
"confidence": 0.95,
|
|
||||||
}))
|
|
||||||
|
|
||||||
# 3c) generische Wikilinks → references (+ defaults je Ref)
|
# Fallback: Falls Note keinen Chunk-Text enthielt (theoretisch),
|
||||||
refs = _extract_wikilinks(remainder2)
|
# prüfe Note-Text einmal global (liefert note-scope Kanten).
|
||||||
for r in refs:
|
if not chunks and note_text:
|
||||||
edges.append(_edge("references", "chunk", cid, r, note_id, {
|
# Wikilinks
|
||||||
"chunk_id": cid,
|
for tgt in _extract_wikilinks(note_text):
|
||||||
"ref_text": r,
|
explicit_targets.add(tgt)
|
||||||
"edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"),
|
edges.append(
|
||||||
"provenance": "explicit",
|
_mk_edge_payload(
|
||||||
"rule_id": "explicit:wikilink",
|
kind="references",
|
||||||
"confidence": 1.0,
|
scope="note",
|
||||||
}))
|
note_id=note_id,
|
||||||
|
source_id=note_id,
|
||||||
|
target_id=tgt,
|
||||||
|
rule_id="explicit:wikilink",
|
||||||
|
confidence=1.0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Inline
|
||||||
|
for rel, targets in _extract_inline_relations_lines(note_text):
|
||||||
|
for tgt in targets:
|
||||||
|
explicit_targets.add(tgt)
|
||||||
|
edges.append(
|
||||||
|
_mk_edge_payload(
|
||||||
|
kind=rel,
|
||||||
|
scope="note",
|
||||||
|
note_id=note_id,
|
||||||
|
source_id=note_id,
|
||||||
|
target_id=tgt,
|
||||||
|
rule_id="inline:rel",
|
||||||
|
confidence=0.95,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Callouts
|
||||||
|
for rel, targets in _extract_callout_edges(note_text):
|
||||||
|
for tgt in targets:
|
||||||
|
explicit_targets.add(tgt)
|
||||||
|
edges.append(
|
||||||
|
_mk_edge_payload(
|
||||||
|
kind=rel,
|
||||||
|
scope="note",
|
||||||
|
note_id=note_id,
|
||||||
|
source_id=note_id,
|
||||||
|
target_id=tgt,
|
||||||
|
rule_id="callout:edge",
|
||||||
|
confidence=0.9,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# 3) Typbasierte Default-Kanten (edge_defaults)
|
||||||
|
# - nur, wenn es explizite Ziele gibt (sonst kein Ableitungsanker)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
if types_cfg and explicit_targets:
|
||||||
|
type_entry = (types_cfg.get("types") or {}).get(note_type) or {}
|
||||||
|
defaults: List[str] = type_entry.get("edge_defaults") or []
|
||||||
|
defaults = [str(d).strip().lower() for d in defaults if str(d).strip()]
|
||||||
|
if defaults:
|
||||||
|
# default-Kanten als "note"-Scope (Konzeption: vom Note-Kontext aus)
|
||||||
for rel in defaults:
|
for rel in defaults:
|
||||||
if rel == "references":
|
rule = f"edge_defaults:{note_type}:{rel}"
|
||||||
continue
|
for tgt in sorted(explicit_targets):
|
||||||
edges.append(_edge(rel, "chunk", cid, r, note_id, {
|
edges.append(
|
||||||
"chunk_id": cid,
|
_mk_edge_payload(
|
||||||
"edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{note_type}:{rel}"),
|
kind=rel,
|
||||||
"provenance": "rule",
|
scope="note",
|
||||||
"rule_id": f"edge_defaults:{note_type}:{rel}",
|
note_id=note_id,
|
||||||
"confidence": 0.7,
|
source_id=note_id,
|
||||||
}))
|
target_id=tgt,
|
||||||
if rel in {"related_to", "similar_to"}:
|
rule_id=rule,
|
||||||
edges.append(_edge(rel, "chunk", r, cid, note_id, {
|
confidence=0.7,
|
||||||
"chunk_id": cid,
|
)
|
||||||
"edge_id": _mk_edge_id(rel, r, cid, "chunk", f"edge_defaults:{note_type}:{rel}"),
|
)
|
||||||
"provenance": "rule",
|
|
||||||
"rule_id": f"edge_defaults:{note_type}:{rel}",
|
|
||||||
"confidence": 0.7,
|
|
||||||
}))
|
|
||||||
|
|
||||||
refs_all.extend(refs)
|
# ------------------------------------------------------------------
|
||||||
|
# 4) De-Duplizierung (idempotent): Schlüssel (kind, scope, source_id, target_id, rule_id)
|
||||||
# 4) optional note-scope refs/backlinks (+ defaults)
|
# ------------------------------------------------------------------
|
||||||
if include_note_scope_refs:
|
seen: Set[Tuple[str, str, str, str, str]] = set()
|
||||||
refs_note = list(refs_all or [])
|
uniq: List[Dict] = []
|
||||||
if note_level_references:
|
|
||||||
refs_note.extend([r for r in note_level_references if isinstance(r, str) and r])
|
|
||||||
refs_note = _dedupe_seq(refs_note)
|
|
||||||
for r in refs_note:
|
|
||||||
edges.append(_edge("references", "note", note_id, r, note_id, {
|
|
||||||
"edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"),
|
|
||||||
"provenance": "explicit",
|
|
||||||
"rule_id": "explicit:note_scope",
|
|
||||||
"confidence": 1.0,
|
|
||||||
}))
|
|
||||||
edges.append(_edge("backlink", "note", r, note_id, note_id, {
|
|
||||||
"edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"),
|
|
||||||
"provenance": "rule",
|
|
||||||
"rule_id": "derived:backlink",
|
|
||||||
"confidence": 0.9,
|
|
||||||
}))
|
|
||||||
for rel in defaults:
|
|
||||||
if rel == "references":
|
|
||||||
continue
|
|
||||||
edges.append(_edge(rel, "note", note_id, r, note_id, {
|
|
||||||
"edge_id": _mk_edge_id(rel, note_id, r, "note", f"edge_defaults:{note_type}:{rel}"),
|
|
||||||
"provenance": "rule",
|
|
||||||
"rule_id": f"edge_defaults:{note_type}:{rel}",
|
|
||||||
"confidence": 0.7,
|
|
||||||
}))
|
|
||||||
if rel in {"related_to", "similar_to"}:
|
|
||||||
edges.append(_edge(rel, "note", r, note_id, note_id, {
|
|
||||||
"edge_id": _mk_edge_id(rel, r, note_id, "note", f"edge_defaults:{note_type}:{rel}"),
|
|
||||||
"provenance": "rule",
|
|
||||||
"rule_id": f"edge_defaults:{note_type}:{rel}",
|
|
||||||
"confidence": 0.7,
|
|
||||||
}))
|
|
||||||
|
|
||||||
# 5) De-Dupe (source_id, target_id, relation, rule_id)
|
|
||||||
seen: Set[Tuple[str,str,str,str]] = set()
|
|
||||||
out: List[dict] = []
|
|
||||||
for e in edges:
|
for e in edges:
|
||||||
s = str(e.get("source_id") or "")
|
key = (e["kind"], e["scope"], e["source_id"], e["target_id"], e["rule_id"])
|
||||||
t = str(e.get("target_id") or "")
|
|
||||||
rel = str(e.get("relation") or e.get("kind") or "edge")
|
|
||||||
rule = str(e.get("rule_id") or "")
|
|
||||||
key = (s, t, rel, rule)
|
|
||||||
if key in seen:
|
if key in seen:
|
||||||
continue
|
continue
|
||||||
seen.add(key)
|
seen.add(key)
|
||||||
out.append(e)
|
uniq.append(e)
|
||||||
return out
|
|
||||||
|
return uniq
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user