From 4ea62e68863ad50c186b0cf0dda2103f4f80f091 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 11 Nov 2025 16:45:35 +0100 Subject: [PATCH] Dateien nach "app/core" hochladen --- app/core/edges.py | 338 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 257 insertions(+), 81 deletions(-) diff --git a/app/core/edges.py b/app/core/edges.py index 4359cea..c8b7fd7 100644 --- a/app/core/edges.py +++ b/app/core/edges.py @@ -2,119 +2,295 @@ # -*- coding: utf-8 -*- """ Modul: app/core/edges.py -Version: 1.0.0 -Datum: 2025-09-09 +Version: 2.0.0 (V2‑superset, rückwärtskompatibel zu v1 vom 2025‑09‑09) Zweck ----- -Zentrale, konsistente Erzeugung von Edge-Payloads im **neuen Schema**: - - kind : "belongs_to" | "next" | "prev" | "references" | "backlink" - - source_id : ID des Quellknotens (Chunk- oder Note-ID) - - target_id : ID des Zielknotens - - scope : "chunk" | "note" - - note_id : Owner-Note (für performantes Filtern/Löschen) - - seq : optional (z. B. Reihenfolge von Vorkommen) +Bewahrt die bestehende Edgelogik (belongs_to, prev/next, references, backlink) +und ergänzt V2‑Felder + Typ‑Default‑Kanten gemäß config/types.yaml (edge_defaults). +Die Funktion ist **idempotent** und **rückwärtskompatibel** zur bisherigen Signatur. -Hinweise --------- -- Edges werden dedupliziert (key=(kind,source_id,target_id,scope)). -- Für Chunk-Edges wird `note_id` aus dem Chunk-Payload entnommen. -- Für Note-Scope-Edges ist `note_id` die Quell-Note-ID. +Kompatibilitätsgarantien (gegenüber v1): +- **Input**: akzeptiert identische Chunk‑Payloads wie v1: + * `id` (Chunk‑ID), `note_id` (Owner), `neighbors.prev|next` (optional), + `references: [{target_id: ...}]` (optional), + alternativ: `chunk_id`, `chunk_index|ord`, `window|text` +- **Output (v1‑Felder)**: `kind`, `source_id`, `target_id`, `scope`, `note_id`, `edge_id` +- **Neu (v2‑Felder)**: `relation`, `src_note_id`, `src_chunk_id?`, `dst_note_id`, `dst_chunk_id?`, + `provenance` (`explicit|rule`), `rule_id?`, `confidence?` + +Regeln +------ +- Deduplizierungsschlüssel: (source_id, target_id, relation, rule_id) +- Strukturkanten: + * belongs_to: 1× pro Chunk + * next/prev: Sequenz der Chunks; nutzt bevorzugt neighbors; sonst ord/chunk_index +- Explizite Referenzen: + * aus Chunk: `references[].target_id` (falls vorhanden) + * Fallback: Wikilinks in `window|text`: [[Some Title|some-id]] oder [[some-id]] +- Note‑Scope: + * backlink immer; references nur, wenn include_note_scope_refs=True +- Typ‑Defaults (edge_defaults aus config/types.yaml des **Quell‑Notiztyps**): + * Für jede explizite Referenz wird je default‑Relation eine Regel‑Kante erzeugt + * rule_id: "type_default:{note_type}:{relation}:v1", provenance="rule" + +Konfiguration +------------- +- ENV MINDNET_TYPES_FILE (Default: ./config/types.yaml) + +Lizenz/Autor +------------ +- Erstimplementierung v1 (2025‑09‑09) — Projekt Mindnet +- Erweiterung v2 (2025‑11‑11) — kompatible Superset‑Implementierung """ - from __future__ import annotations -from typing import Dict, List + +import os +import re +from typing import Dict, Iterable, List, Optional, Tuple, Set + +try: + import yaml # optional, nur für types.yaml +except Exception: # pragma: no cover + yaml = None + +# ------------------------------------------------------------ +# Hilfen: types.yaml laden (edge_defaults) +# ------------------------------------------------------------ + +def _types_path() -> str: + return os.getenv("MINDNET_TYPES_FILE") or "./config/types.yaml" + +def _load_types() -> Dict[str, dict]: + p = _types_path() + if not os.path.isfile(p) or yaml is None: + return {} + try: + with open(p, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict): + return data["types"] + return data if isinstance(data, dict) else {} + except Exception: + return {} + +def _edge_defaults_for(note_type: Optional[str]) -> List[str]: + types = _load_types() + t = (note_type or "").strip().lower() + cfg = types.get(t) or {} + defaults = cfg.get("edge_defaults") or [] + if isinstance(defaults, str): + defaults = [defaults] + return [str(x) for x in defaults if isinstance(x, (str, int, float))] + +# ------------------------------------------------------------ +# Wikilink‑Parser (Fallback, wenn ch["references"] fehlt) +# ------------------------------------------------------------ + +_WIKILINK_RE = re.compile(r"\[\[(?:[^\|\]]+\|)?([a-zA-Z0-9_\-#:. ]+)\]\]") + +def _extract_wikilinks(text: str) -> List[str]: + ids: List[str] = [] + for m in _WIKILINK_RE.finditer(text or ""): + ids.append(m.group(1).strip()) + return ids + +# ------------------------------------------------------------ +# Utility +# ------------------------------------------------------------ + +def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None) -> str: + base = f"{kind}:{s}->{t}#{scope}" + if rule_id: + base += f"|{rule_id}" + try: + import hashlib + return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() + except Exception: # pragma: no cover + return base + +def _dedupe(edges: List[Dict]) -> List[Dict]: + seen: Set[Tuple[str,str,str,str]] = set() + out: List[Dict] = [] + for e in edges: + s = str(e.get("source_id") or "") + t = str(e.get("target_id") or "") + rel = str(e.get("relation") or e.get("kind") or "edge") + rule = str(e.get("rule_id") or "") + key = (s, t, rel, rule) + if key in seen: + continue + seen.add(key) + out.append(e) + return out + +def _first(v: dict, *keys, default=None): + for k in keys: + if k in v and v[k] is not None: + return v[k] + return default + +# ------------------------------------------------------------ +# Hauptfunktion +# ------------------------------------------------------------ def build_edges_for_note( note_id: str, chunk_payloads: List[Dict], - note_level_refs: List[str] | None, + note_level_refs: Optional[List[str]] = None, *, include_note_scope_refs: bool = False, ) -> List[Dict]: edges: List[Dict] = [] + chunks = list(chunk_payloads or []) + # Notiztyp aus erstem Chunk ableiten (kompatibel zu existierenden Payloads) + note_type = (chunks[0].get("type") if chunks else None) or (chunks[0].get("note_type") if chunks else None) - # Chunk-Scope: belongs_to / prev / next / references - for ch in chunk_payloads: - cid = ch["id"] + # --- Strukturkanten ------------------------------------------------------ + # belongs_to + for ch in chunks: + cid = _first(ch, "id", "chunk_id") + if not cid: + continue owner = ch.get("note_id") or note_id - # belongs_to - edges.append({ + e = { + "edge_id": _mk_edge_id("belongs_to", cid, note_id, "chunk", "structure:belongs_to:v1"), "kind": "belongs_to", + "relation": "belongs_to", + "scope": "chunk", "source_id": cid, "target_id": note_id, - "scope": "chunk", - "note_id": owner, - }) - # Nachbarn + "note_id": owner, # v1-Kompat + # v2 + "src_note_id": owner, + "src_chunk_id": cid, + "dst_note_id": note_id, + "provenance": "rule", + "rule_id": "structure:belongs_to:v1", + "confidence": 1.0, + } + edges.append(e) + + # next/prev — bevorzugt neighbors.prev/next; sonst via ord/chunk_index + # Map der Chunks nach Index + ordered = list(chunks) + def _idx(c): + return _first(c, "chunk_index", "ord", default=0) + ordered.sort(key=_idx) + + for i, ch in enumerate(ordered): + cid = _first(ch, "id", "chunk_id") + if not cid: + continue + owner = ch.get("note_id") or note_id nb = ch.get("neighbors") or {} prev_id = nb.get("prev") next_id = nb.get("next") + # Fallback-Reihenfolge + if prev_id is None and i > 0: + prev_id = _first(ordered[i-1], "id", "chunk_id") + if next_id is None and i+1 < len(ordered): + next_id = _first(ordered[i+1], "id", "chunk_id") + if prev_id: edges.append({ - "kind": "prev", - "source_id": cid, - "target_id": prev_id, - "scope": "chunk", - "note_id": owner, + "edge_id": _mk_edge_id("prev", cid, prev_id, "chunk", "structure:order:v1"), + "kind": "prev", "relation": "prev", "scope": "chunk", + "source_id": cid, "target_id": prev_id, "note_id": owner, + "src_note_id": owner, "src_chunk_id": cid, + "dst_note_id": owner, "dst_chunk_id": prev_id, + "provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95, }) edges.append({ - "kind": "next", - "source_id": prev_id, - "target_id": cid, - "scope": "chunk", - "note_id": owner, - }) - if next_id: - edges.append({ - "kind": "next", - "source_id": cid, - "target_id": next_id, - "scope": "chunk", - "note_id": owner, - }) - edges.append({ - "kind": "prev", - "source_id": next_id, - "target_id": cid, - "scope": "chunk", - "note_id": owner, - }) - # references aus Chunk - for ref in (ch.get("references") or []): - tid = ref.get("target_id") - if not tid: - continue - edges.append({ - "kind": "references", - "source_id": cid, - "target_id": tid, - "scope": "chunk", - "note_id": owner, + "edge_id": _mk_edge_id("next", prev_id, cid, "chunk", "structure:order:v1"), + "kind": "next", "relation": "next", "scope": "chunk", + "source_id": prev_id, "target_id": cid, "note_id": owner, + "src_note_id": owner, "src_chunk_id": prev_id, + "dst_note_id": owner, "dst_chunk_id": cid, + "provenance": "rule", "rule_id": "structure:order:v1", "confidence": 0.95, }) - # Note-Scope: backlink (immer); references (optional) - unique_refs = list(dict.fromkeys(note_level_refs or [])) + # --- Explizite Referenzen (Chunk‑Scope) --------------------------------- + explicit_refs: List[Dict] = [] + for ch in chunks: + cid = _first(ch, "id", "chunk_id") + if not cid: + continue + owner = ch.get("note_id") or note_id + # 1) bevorzugt vorhandene ch["references"] + refs = ch.get("references") or [] + targets = [r.get("target_id") for r in refs if isinstance(r, dict) and r.get("target_id")] + # 2) Fallback: Wikilinks aus Text + if not targets: + text = _first(ch, "window", "text", default="") or "" + targets = _extract_wikilinks(text) + for tid in targets: + if not isinstance(tid, str) or not tid.strip(): + continue + e = { + "edge_id": _mk_edge_id("references", cid, tid, "chunk"), + "kind": "references", + "relation": "references", + "scope": "chunk", + "source_id": cid, + "target_id": tid, + "note_id": owner, + # v2 + "src_note_id": owner, + "src_chunk_id": cid, + "dst_note_id": tid, + "provenance": "explicit", + "rule_id": "", + "confidence": 1.0, + } + edges.append(e) + explicit_refs.append(e) + + # --- Note‑Scope: references (optional) + backlink (immer) ---------------- + unique_refs = [] + if note_level_refs: + seen = set() + for tid in note_level_refs: + if isinstance(tid, str) and tid.strip() and tid not in seen: + unique_refs.append(tid); seen.add(tid) + for tid in unique_refs: if include_note_scope_refs: edges.append({ - "kind": "references", - "source_id": note_id, - "target_id": tid, - "scope": "note", - "note_id": note_id, + "edge_id": _mk_edge_id("references", note_id, tid, "note"), + "kind": "references", "relation": "references", "scope": "note", + "source_id": note_id, "target_id": tid, "note_id": note_id, + "src_note_id": note_id, "dst_note_id": tid, + "provenance": "explicit", "rule_id": "", "confidence": 1.0, }) edges.append({ - "kind": "backlink", - "source_id": tid, - "target_id": note_id, - "scope": "note", - "note_id": note_id, + "edge_id": _mk_edge_id("backlink", tid, note_id, "note", "derived:backlink:v1"), + "kind": "backlink", "relation": "backlink", "scope": "note", + "source_id": tid, "target_id": note_id, "note_id": note_id, + "src_note_id": tid, "dst_note_id": note_id, + "provenance": "rule", "rule_id": "derived:backlink:v1", "confidence": 0.9, }) - # Dedupe - dedup = {} - for e in edges: - k = (e["kind"], e["source_id"], e["target_id"], e.get("scope", "")) - dedup[k] = e - return list(dedup.values()) + # --- Type‑Defaults je expliziter Referenz -------------------------------- + defaults = [d for d in _edge_defaults_for(note_type) if d and d != "references"] + if defaults: + for e in explicit_refs + ([ ] if not include_note_scope_refs else []): + # wir nutzen die bereits erzeugten explicit‑Edges als Vorlage + src = e["source_id"]; tgt = e["target_id"] + scope = e.get("scope", "chunk") + s_note = e.get("src_note_id") or note_id + s_chunk = e.get("src_chunk_id") + t_note = e.get("dst_note_id") or tgt + for rel in defaults: + rule_id = f"type_default:{(note_type or 'unknown')}:{rel}:v1" + edges.append({ + "edge_id": _mk_edge_id(rel, src, tgt, scope, rule_id), + "kind": rel, "relation": rel, "scope": scope, + "source_id": src, "target_id": tgt, "note_id": s_note, + "src_note_id": s_note, "src_chunk_id": s_chunk, + "dst_note_id": t_note, + "provenance": "rule", "rule_id": rule_id, "confidence": 0.7, + }) + + # --- Dedupe & Return ----------------------------------------------------- + return _dedupe(edges)