""" FILE: app/core/graph/graph_derive_edges.py DESCRIPTION: Hauptlogik zur Kanten-Aggregation und De-Duplizierung. """ from typing import List, Optional, Dict, Tuple from .graph_utils import ( _get, _edge, _mk_edge_id, _dedupe_seq, PROVENANCE_PRIORITY, load_types_registry, get_edge_defaults_for ) from .graph_extractors import ( extract_typed_relations, extract_callout_relations, extract_wikilinks ) def build_edges_for_note( note_id: str, chunks: List[dict], note_level_references: Optional[List[str]] = None, include_note_scope_refs: bool = False, ) -> List[dict]: """Erzeugt und aggregiert alle Kanten für eine Note (WP-15b).""" edges: List[dict] = [] note_type = _get(chunks[0], "type") if chunks else "concept" # 1) Struktur-Kanten (belongs_to, next/prev) for idx, ch in enumerate(chunks): cid = _get(ch, "chunk_id", "id") if not cid: continue edges.append(_edge("belongs_to", "chunk", cid, note_id, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to"), "provenance": "structure", "rule_id": "structure:belongs_to", "confidence": PROVENANCE_PRIORITY["structure:belongs_to"] })) if idx < len(chunks) - 1: next_id = _get(chunks[idx+1], "chunk_id", "id") if next_id: edges.append(_edge("next", "chunk", cid, next_id, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id("next", cid, next_id, "chunk", "structure:order"), "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"] })) edges.append(_edge("prev", "chunk", next_id, cid, note_id, { "chunk_id": next_id, "edge_id": _mk_edge_id("prev", next_id, cid, "chunk", "structure:order"), "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"] })) # 2) Inhaltliche Kanten reg = load_types_registry() defaults = get_edge_defaults_for(note_type, reg) refs_all: List[str] = [] for ch in chunks: cid = _get(ch, "chunk_id", "id") if not cid: continue raw = _get(ch, "window") or _get(ch, "text") or "" # Typed & Candidate Pool (WP-15b Integration) typed, rem = extract_typed_relations(raw) for k, t in typed: edges.append(_edge(k, "chunk", cid, t, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, t, "chunk", "inline:rel"), "provenance": "explicit", "rule_id": "inline:rel", "confidence": PROVENANCE_PRIORITY["inline:rel"] })) pool = ch.get("candidate_pool") or ch.get("candidate_edges") or [] for cand in pool: t, k, p = cand.get("to"), cand.get("kind", "related_to"), cand.get("provenance", "semantic_ai") if t: edges.append(_edge(k, "chunk", cid, t, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, t, "chunk", f"candidate:{p}"), "provenance": p, "rule_id": f"candidate:{p}", "confidence": PROVENANCE_PRIORITY.get(p, 0.90) })) # Callouts & Wikilinks call_pairs, rem2 = extract_callout_relations(rem) for k, t in call_pairs: edges.append(_edge(k, "chunk", cid, t, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(k, cid, t, "chunk", "callout:edge"), "provenance": "explicit", "rule_id": "callout:edge", "confidence": PROVENANCE_PRIORITY["callout:edge"] })) refs = extract_wikilinks(rem2) for r in refs: edges.append(_edge("references", "chunk", cid, r, note_id, { "chunk_id": cid, "ref_text": r, "edge_id": _mk_edge_id("references", cid, r, "chunk", "explicit:wikilink"), "provenance": "explicit", "rule_id": "explicit:wikilink", "confidence": PROVENANCE_PRIORITY["explicit:wikilink"] })) for rel in defaults: if rel != "references": edges.append(_edge(rel, "chunk", cid, r, note_id, { "chunk_id": cid, "edge_id": _mk_edge_id(rel, cid, r, "chunk", f"edge_defaults:{rel}"), "provenance": "rule", "rule_id": f"edge_defaults:{rel}", "confidence": PROVENANCE_PRIORITY["edge_defaults"] })) refs_all.extend(refs) # 3) Note-Scope & De-Duplizierung if include_note_scope_refs: refs_note = _dedupe_seq((refs_all or []) + (note_level_references or [])) for r in refs_note: edges.append(_edge("references", "note", note_id, r, note_id, { "edge_id": _mk_edge_id("references", note_id, r, "note", "explicit:note_scope"), "provenance": "explicit", "confidence": PROVENANCE_PRIORITY["explicit:note_scope"] })) edges.append(_edge("backlink", "note", r, note_id, note_id, { "edge_id": _mk_edge_id("backlink", r, note_id, "note", "derived:backlink"), "provenance": "rule", "confidence": PROVENANCE_PRIORITY["derived:backlink"] })) unique_map: Dict[Tuple[str, str, str], dict] = {} for e in edges: key = (str(e.get("source_id")), str(e.get("target_id")), str(e.get("kind"))) if key not in unique_map or e.get("confidence", 0) > unique_map[key].get("confidence", 0): unique_map[key] = e return list(unique_map.values())