mindnet/app/core/edges.py
Lars 6f32c60c47
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 2s
app/core/edges.py aktualisiert
2025-09-09 11:51:28 +02:00

121 lines
3.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: app/core/edges.py
Version: 1.0.0
Datum: 2025-09-09
Zweck
-----
Zentrale, konsistente Erzeugung von Edge-Payloads im **neuen Schema**:
- kind : "belongs_to" | "next" | "prev" | "references" | "backlink"
- source_id : ID des Quellknotens (Chunk- oder Note-ID)
- target_id : ID des Zielknotens
- scope : "chunk" | "note"
- note_id : Owner-Note (für performantes Filtern/Löschen)
- seq : optional (z. B. Reihenfolge von Vorkommen)
Hinweise
--------
- Edges werden dedupliziert (key=(kind,source_id,target_id,scope)).
- Für Chunk-Edges wird `note_id` aus dem Chunk-Payload entnommen.
- Für Note-Scope-Edges ist `note_id` die Quell-Note-ID.
"""
from __future__ import annotations
from typing import Dict, List
def build_edges_for_note(
note_id: str,
chunk_payloads: List[Dict],
note_level_refs: List[str] | None,
*,
include_note_scope_refs: bool = False,
) -> List[Dict]:
edges: List[Dict] = []
# Chunk-Scope: belongs_to / prev / next / references
for ch in chunk_payloads:
cid = ch["id"]
owner = ch.get("note_id") or note_id
# belongs_to
edges.append({
"kind": "belongs_to",
"source_id": cid,
"target_id": note_id,
"scope": "chunk",
"note_id": owner,
})
# Nachbarn
nb = ch.get("neighbors") or {}
prev_id = nb.get("prev")
next_id = nb.get("next")
if prev_id:
edges.append({
"kind": "prev",
"source_id": cid,
"target_id": prev_id,
"scope": "chunk",
"note_id": owner,
})
edges.append({
"kind": "next",
"source_id": prev_id,
"target_id": cid,
"scope": "chunk",
"note_id": owner,
})
if next_id:
edges.append({
"kind": "next",
"source_id": cid,
"target_id": next_id,
"scope": "chunk",
"note_id": owner,
})
edges.append({
"kind": "prev",
"source_id": next_id,
"target_id": cid,
"scope": "chunk",
"note_id": owner,
})
# references aus Chunk
for ref in (ch.get("references") or []):
tid = ref.get("target_id")
if not tid:
continue
edges.append({
"kind": "references",
"source_id": cid,
"target_id": tid,
"scope": "chunk",
"note_id": owner,
})
# Note-Scope: backlink (immer); references (optional)
unique_refs = list(dict.fromkeys(note_level_refs or []))
for tid in unique_refs:
if include_note_scope_refs:
edges.append({
"kind": "references",
"source_id": note_id,
"target_id": tid,
"scope": "note",
"note_id": note_id,
})
edges.append({
"kind": "backlink",
"source_id": tid,
"target_id": note_id,
"scope": "note",
"note_id": note_id,
})
# Dedupe
dedup = {}
for e in edges:
k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", ""))
dedup[k] = e
return list(dedup.values())