153 lines
7.3 KiB
Python
153 lines
7.3 KiB
Python
"""
|
|
FILE: app/core/graph/graph_derive_edges.py
|
|
DESCRIPTION: Hauptlogik zur Kanten-Aggregation und De-Duplizierung.
|
|
AUDIT:
|
|
- Nutzt parse_link_target
|
|
- Übergibt Section als 'variant' an ID-Gen
|
|
- Dedup basiert jetzt auf Edge-ID (erlaubt Multigraph für Sections)
|
|
"""
|
|
from typing import List, Optional, Dict, Tuple
|
|
from .graph_utils import (
|
|
_get, _edge, _mk_edge_id, _dedupe_seq, parse_link_target,
|
|
PROVENANCE_PRIORITY, load_types_registry, get_edge_defaults_for
|
|
)
|
|
from .graph_extractors import (
|
|
extract_typed_relations, extract_callout_relations, extract_wikilinks
|
|
)
|
|
|
|
def build_edges_for_note(
|
|
note_id: str,
|
|
chunks: List[dict],
|
|
note_level_references: Optional[List[str]] = None,
|
|
include_note_scope_refs: bool = False,
|
|
) -> List[dict]:
|
|
"""Erzeugt und aggregiert alle Kanten für eine Note."""
|
|
edges: List[dict] = []
|
|
note_type = _get(chunks[0], "type") if chunks else "concept"
|
|
|
|
# 1) Struktur-Kanten
|
|
for idx, ch in enumerate(chunks):
|
|
cid = _get(ch, "chunk_id", "id")
|
|
if not cid: continue
|
|
edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, {
|
|
"chunk_id": cid, "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"),
|
|
"provenance": "structure", "rule_id": "structure:belongs_to", "confidence": PROVENANCE_PRIORITY["structure:belongs_to"]
|
|
}))
|
|
if idx < len(chunks) - 1:
|
|
next_id = _get(chunks[idx+1], "chunk_id", "id")
|
|
if next_id:
|
|
edges.append(_edge("next", "chunk", cid, next_id, note_id, {
|
|
"chunk_id": cid, "edge_id": _mk_edge_id("next", cid, next_id, "chunk", "structure:order"),
|
|
"provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"]
|
|
}))
|
|
edges.append(_edge("prev", "chunk", next_id, cid, note_id, {
|
|
"chunk_id": next_id, "edge_id": _mk_edge_id("prev", next_id, cid, "chunk", "structure:order"),
|
|
"provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"]
|
|
}))
|
|
|
|
# 2) Inhaltliche Kanten
|
|
reg = load_types_registry()
|
|
defaults = get_edge_defaults_for(note_type, reg)
|
|
refs_all: List[str] = []
|
|
|
|
for ch in chunks:
|
|
cid = _get(ch, "chunk_id", "id")
|
|
if not cid: continue
|
|
raw = _get(ch, "window") or _get(ch, "text") or ""
|
|
|
|
# Typed
|
|
typed, rem = extract_typed_relations(raw)
|
|
for k, raw_t in typed:
|
|
t, sec = parse_link_target(raw_t, note_id)
|
|
if not t: continue
|
|
payload = {
|
|
"chunk_id": cid,
|
|
# Variant=sec sorgt für eindeutige ID pro Abschnitt
|
|
"edge_id": _mk_edge_id(k, cid, t, "chunk", "inline:rel", variant=sec),
|
|
"provenance": "explicit", "rule_id": "inline:rel", "confidence": PROVENANCE_PRIORITY["inline:rel"]
|
|
}
|
|
if sec: payload["target_section"] = sec
|
|
edges.append(_edge(k, "chunk", cid, t, note_id, payload))
|
|
|
|
# Semantic AI Candidates
|
|
pool = ch.get("candidate_pool") or ch.get("candidate_edges") or []
|
|
for cand in pool:
|
|
raw_t, k, p = cand.get("to"), cand.get("kind", "related_to"), cand.get("provenance", "semantic_ai")
|
|
t, sec = parse_link_target(raw_t, note_id)
|
|
if t:
|
|
payload = {
|
|
"chunk_id": cid,
|
|
"edge_id": _mk_edge_id(k, cid, t, "chunk", f"candidate:{p}", variant=sec),
|
|
"provenance": p, "rule_id": f"candidate:{p}", "confidence": PROVENANCE_PRIORITY.get(p, 0.90)
|
|
}
|
|
if sec: payload["target_section"] = sec
|
|
edges.append(_edge(k, "chunk", cid, t, note_id, payload))
|
|
|
|
# Callouts
|
|
call_pairs, rem2 = extract_callout_relations(rem)
|
|
for k, raw_t in call_pairs:
|
|
t, sec = parse_link_target(raw_t, note_id)
|
|
if not t: continue
|
|
payload = {
|
|
"chunk_id": cid,
|
|
"edge_id": _mk_edge_id(k, cid, t, "chunk", "callout:edge", variant=sec),
|
|
"provenance": "explicit", "rule_id": "callout:edge", "confidence": PROVENANCE_PRIORITY["callout:edge"]
|
|
}
|
|
if sec: payload["target_section"] = sec
|
|
edges.append(_edge(k, "chunk", cid, t, note_id, payload))
|
|
|
|
# Wikilinks & Defaults
|
|
refs = extract_wikilinks(rem2)
|
|
for raw_r in refs:
|
|
r, sec = parse_link_target(raw_r, note_id)
|
|
if not r: continue
|
|
|
|
# Explicit Reference
|
|
payload = {
|
|
"chunk_id": cid, "ref_text": raw_r,
|
|
"edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink", variant=sec),
|
|
"provenance": "explicit", "rule_id": "explicit:wikilink", "confidence": PROVENANCE_PRIORITY["explicit:wikilink"]
|
|
}
|
|
if sec: payload["target_section"] = sec
|
|
edges.append(_edge("references", "chunk", cid, r, note_id, payload))
|
|
|
|
# Defaults (nur einmal pro Target, Section hier irrelevant für Typ-Logik, oder?)
|
|
# Wir erzeugen Defaults auch pro Section, um Konsistenz zu wahren.
|
|
for rel in defaults:
|
|
if rel != "references":
|
|
def_payload = {
|
|
"chunk_id": cid,
|
|
"edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{rel}", variant=sec),
|
|
"provenance": "rule", "rule_id": f"edge_defaults:{rel}", "confidence": PROVENANCE_PRIORITY["edge_defaults"]
|
|
}
|
|
if sec: def_payload["target_section"] = sec
|
|
edges.append(_edge(rel, "chunk", cid, r, note_id, def_payload))
|
|
|
|
refs_all.extend([parse_link_target(r, note_id)[0] for r in refs])
|
|
|
|
# 3) Note-Scope & De-Duplizierung
|
|
if include_note_scope_refs:
|
|
cleaned_note_refs = [parse_link_target(r, note_id)[0] for r in (note_level_references or [])]
|
|
refs_note = _dedupe_seq((refs_all or []) + cleaned_note_refs)
|
|
|
|
for r in refs_note:
|
|
if not r: continue
|
|
edges.append(_edge("references", "note", note_id, r, note_id, {
|
|
"edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"),
|
|
"provenance": "explicit", "confidence": PROVENANCE_PRIORITY["explicit:note_scope"]
|
|
}))
|
|
edges.append(_edge("backlink", "note", r, note_id, note_id, {
|
|
"edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"),
|
|
"provenance": "rule", "confidence": PROVENANCE_PRIORITY["derived:backlink"]
|
|
}))
|
|
|
|
# Deduplizierung: Wir nutzen jetzt die EDGE-ID als Schlüssel.
|
|
# Da die Edge-ID nun 'variant' (Section) enthält, bleiben unterschiedliche Sections erhalten.
|
|
unique_map: Dict[str, dict] = {}
|
|
for e in edges:
|
|
eid = e["edge_id"]
|
|
# Bei Konflikt (gleiche ID = exakt gleiche Kante und Section) gewinnt die höhere Confidence
|
|
if eid not in unique_map or e.get("confidence", 0) > unique_map[eid].get("confidence", 0):
|
|
unique_map[eid] = e
|
|
|
|
return list(unique_map.values()) |