app/core/edges.py hinzugefügt
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-09-03 07:57:51 +02:00
parent f122b2442e
commit 309462dfa9

45
app/core/edges.py Normal file
View File

@ -0,0 +1,45 @@
from __future__ import annotations
from typing import List, Dict
def deriv_edges_for_note(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]:
edges: List[Dict] = []
# Chunk → Note (belongs_to) + prev/next
for idx, ch in enumerate(chunk_payloads):
edges.append({
"src_id": ch["id"], "dst_id": note_meta["id"],
"edge_type": "belongs_to", "scope": "chunk"
})
prev_id = ch.get("neighbors",{}).get("prev")
next_id = ch.get("neighbors",{}).get("next")
if prev_id:
edges.append({"src_id": ch["id"], "dst_id": prev_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": prev_id, "dst_id": ch["id"], "edge_type": "prev", "scope": "chunk"})
if next_id:
edges.append({"src_id": ch["id"], "dst_id": next_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": next_id, "dst_id": ch["id"], "edge_type": "prev", "scope": "chunk"})
# references aus wikilinks (Chunk-Scope)
for ref in ch.get("references", []):
tid = ref.get("target_id")
if not tid: continue
edges.append({"src_id": ch["id"], "dst_id": tid, "edge_type": "references", "scope": "chunk"})
edges.append({"src_id": tid, "dst_id": ch["id"], "edge_type": "backlink", "scope": "chunk"})
# depends_on / assigned_to (Note-Scope, aus Frontmatter)
for dep in note_meta.get("depends_on", []) or []:
edges.append({"src_id": note_meta["id"], "dst_id": dep, "edge_type": "depends_on", "scope": "note"})
for ass in note_meta.get("assigned_to", []) or []:
edges.append({"src_id": note_meta["id"], "dst_id": ass, "edge_type": "assigned_to", "scope": "note"})
# Note-Level references (optional: falls du im Note-Payload `references` sammelst)
for tid in note_meta.get("references", []) or []:
edges.append({"src_id": note_meta["id"], "dst_id": tid, "edge_type": "references", "scope": "note"})
edges.append({"src_id": tid, "dst_id": note_meta["id"], "edge_type": "backlink", "scope": "note"})
# Dedupe
uniq = {}
for e in edges:
k = (e["src_id"], e["edge_type"], e["dst_id"], e.get("scope","note"))
uniq[k] = e
return list(uniq.values())