mindnet/app/core/edges.py
Lars 057c12c2d9
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s
app/core/edges.py aktualisiert
2025-09-05 07:32:13 +02:00

45 lines
2.2 KiB
Python

from __future__ import annotations
from typing import List, Dict
def deriv_edges_for_note(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]:
edges: List[Dict] = []
# Chunk → Note (belongs_to) + prev/next
for idx, ch in enumerate(chunk_payloads):
edges.append({
"src_id": ch["id"], "dst_id": note_meta["id"],
"edge_type": "belongs_to", "scope": "chunk"
})
prev_id = ch.get("neighbors",{}).get("prev")
next_id = ch.get("neighbors",{}).get("next")
if prev_id:
edges.append({"src_id": ch["id"], "dst_id": prev_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": prev_id, "dst_id": ch["id"], "edge_type": "prev", "scope": "chunk"})
if next_id:
edges.append({"src_id": ch["id"], "dst_id": next_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": next_id, "dst_id": ch["id"], "edge_type": "prev", "scope": "chunk"})
# references aus wikilinks (Chunk-Scope)
for ref in ch.get("references", []):
tid = ref.get("target_id")
if not tid: continue
edges.append({"src_id": ch["id"], "dst_id": tid, "edge_type": "references", "scope": "chunk"})
edges.append({"src_id": tid, "dst_id": ch["id"], "edge_type": "backlink", "scope": "chunk"})
# depends_on / assigned_to (Note-Scope, aus Frontmatter)
for dep in note_meta.get("depends_on", []) or []:
edges.append({"src_id": note_meta["id"], "dst_id": dep, "edge_type": "depends_on", "scope": "note"})
for ass in note_meta.get("assigned_to", []) or []:
edges.append({"src_id": note_meta["id"], "dst_id": ass, "edge_type": "assigned_to", "scope": "note"})
# Note-Level references (optional: falls du im Note-Payload `references` sammelst)
for tid in note_meta.get("references", []) or []:
edges.append({"src_id": note_meta["id"], "dst_id": tid, "edge_type": "references", "scope": "note"})
edges.append({"src_id": tid, "dst_id": note_meta["id"], "edge_type": "backlink", "scope": "note"})
# Dedupe
uniq = {}
for e in edges:
k = (e["src_id"], e["edge_type"], e["dst_id"], e.get("scope","note"))
uniq[k] = e
return list(uniq.values())