mindnet/app/core/chunk_payload.py
Lars dd86a15555
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
app/core/chunk_payload.py aktualisiert
2025-10-01 15:20:10 +02:00

279 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: app/core/chunk_payload.py
Version: 2.1.0
Datum: 2025-10-01
Zweck
-----
Erzeugt Chunk-Payloads für Qdrant. Voll abwärtskompatibel zu v2.0.1 und erweitert um:
- Übernahme vorhandener deterministischer Chunk-IDs (falls vom Chunker geliefert).
- Durchreichen von token_count, section_title, section_path (falls vorhanden).
- Exaktere start/end-Offsets, wenn der vollständige Note-Body (note_text) vorliegt.
- Berechnung von overlap_right (nicht mehr pauschal 0).
Felder (wie 2.0.1, beibehalten):
- window : Fenstertext inkl. Overlap (für Embeddings & Link-Erkennung)
- text : effektiver Segmenttext ohne linkes Overlap (für Rekonstruktion)
- start, end : Offsets des Segments im Gesamtkorpus (0-basiert, [start, end))
- overlap_left : Zeichen-Overlap zum VORHERIGEN Fenster
- overlap_right : Zeichen-Overlap zum NÄCHSTEN Fenster
- note_id, chunk_id, id (Alias), chunk_index, seq, path, type, title, tags
Abwärtskompatible Aliasse / Verhalten:
- 'id' == 'chunk_id' (Edges nutzen häufig 'id').
- Fensterquelle wird robust aus ('window'|'text'|'content'|'raw') gelesen.
- Pfad bleibt relativ und nutzt forward slashes.
Nutzung
-------
from app.core.chunk_payload import make_chunk_payloads
pls = make_chunk_payloads(frontmatter, rel_path, chunks, note_text=full_body)
Parameter
---------
- frontmatter : dict erwartet mind. 'id' (Note-ID); optional 'title','type','tags'
- rel_path : str relativer Pfad der Note im Vault (forward slashes, ohne führenden '/')
- chunks : Seq[object|dict]
- note_text : str|None optional kompletter Body für exakte Offsets
Hinweis
------
Dein Chunker liefert bereits Fenster mit vorangestelltem Overlap. Wir ermitteln den
linken Overlap gegen den bisher rekonstruierten Text und entfernen ihn in 'text'.
Lizenz: MIT (projektintern)
"""
from __future__ import annotations
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
# ------------------------------- Utils ------------------------------- #
def _get_attr_or_key(obj: Any, key: str, default=None):
"""Liest Feld 'key' aus Objekt ODER Dict; sonst default."""
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def _as_window_text(chunk: Any) -> str:
"""
Extrahiert den Fenstertext robust aus Chunk (Objekt oder Dict).
Bevorzugt 'window', dann 'text', 'content', 'raw'.
"""
for k in ("window", "text", "content", "raw"):
v = _get_attr_or_key(chunk, k, None)
if isinstance(v, str) and v:
return v
# Fallback: falls der Chunk selbst ein String ist (exotisch)
if isinstance(chunk, str):
return chunk
return ""
def _to_int(x: Any, default: int = 0) -> int:
try:
return int(x)
except Exception:
return default
def _normalize_rel_path(p: str) -> str:
p = (p or "").replace("\\", "/")
# kein führender Slash (relativ bleiben)
while p.startswith("/"):
p = p[1:]
return p
# ---------------------- Overlap & Offsets ---------------------------- #
def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int]]:
"""
Ermittelt nicht-überlappende Segmente zu einer geordneten Folge von Fenster-Strings.
Gibt (segments, overlaps_left) zurück, wobei:
- segments[i] = Fenster[i] ohne das vorangestellte Overlap
- overlaps_left[i] = Länge des Overlaps von Fenster[i] zum bisher rekonstruierten Text
"""
segments: List[str] = []
overlaps_left: List[int] = []
reconstructed = ""
for w in windows:
w = w or ""
max_k = min(len(w), len(reconstructed))
k = 0
# Suche von groß nach klein (einfach, robust bei kurzen Fenstern)
for cand in range(max_k, -1, -1):
if reconstructed.endswith(w[:cand]):
k = cand
break
seg = w[k:]
segments.append(seg)
overlaps_left.append(k)
reconstructed += seg
return segments, overlaps_left
def _overlap_len_suffix_prefix(a_suffix: str, b_prefix: str, max_probe: int = 4096) -> int:
"""
Länge des längsten Suffixes von a_suffix, der Prefix von b_prefix ist (bruteforce, begrenzt).
"""
if not a_suffix or not b_prefix:
return 0
a = a_suffix[-max_probe:]
b = b_prefix[:max_probe]
n = min(len(a), len(b))
for k in range(n, 0, -1):
if a[-k:] == b[:k]:
return k
return 0
# ----------------------------- Public API ---------------------------- #
def make_chunk_payloads(
frontmatter: Dict[str, Any],
rel_path: str,
chunks: Iterable[Union[Dict[str, Any], Any]],
note_text: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Baut Payload-Dicts pro Chunk (kompatibel zu v2.0.1, erweitert).
Rückgabe-Felder (wichtigste):
note_id, chunk_id, id, chunk_index, seq, path,
window, text, start, end, overlap_left, overlap_right,
token_count?, section_title?, section_path?, type?, title?, tags?
"""
note_id = str(frontmatter.get("id") or "").strip()
note_type = frontmatter.get("type", None)
note_title = frontmatter.get("title", None)
note_tags = frontmatter.get("tags", None)
rel_path = _normalize_rel_path(rel_path)
# 1) Fenstertexte & Sequenzen & (falls vorhanden) vorgegebene IDs sammeln
windows: List[str] = []
seqs: List[int] = []
ids_in: List[Optional[str]] = []
token_counts: List[Optional[int]] = []
section_titles: List[Optional[str]] = []
section_paths: List[Optional[str]] = []
chunks_list = list(chunks) # falls Iterator
for idx, c in enumerate(chunks_list):
windows.append(_as_window_text(c))
seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx))
# übernehme deterministische ID (falls vom Chunker geliefert)
cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None))
ids_in.append(str(cid) if isinstance(cid, str) and cid else None)
token_counts.append(_to_int(_get_attr_or_key(c, "token_count", None), 0) if _get_attr_or_key(c, "token_count", None) is not None else None)
section_titles.append(_get_attr_or_key(c, "section_title", None))
section_paths.append(_get_attr_or_key(c, "section_path", None))
# 2) Nicht-überlappende Segmente berechnen
segments, overlaps_left = _dedupe_windows_to_segments(windows)
# 3) overlap_right berechnen (Blick auf nächstes Fenster)
overlaps_right: List[int] = []
for i in range(len(windows)):
if i + 1 < len(windows):
ov = _overlap_len_suffix_prefix(windows[i], windows[i + 1], max_probe=4096)
else:
ov = 0
overlaps_right.append(ov)
# 4) start/end Offsets bestimmen
# - wenn note_text vorhanden: vorwärtsgerichtetes Matching ab der letzten Endposition
# - sonst kumulativ (wie in v2.0.1)
starts: List[int] = [0] * len(segments)
ends: List[int] = [0] * len(segments)
pos = 0
if isinstance(note_text, str) and note_text:
search_pos = 0
for i, seg in enumerate(segments):
if not seg:
starts[i] = ends[i] = search_pos
continue
j = note_text.find(seg, search_pos)
if j >= 0:
starts[i] = j
ends[i] = j + len(seg)
search_pos = ends[i]
else:
# Fallback: kumulativ
starts[i] = pos
pos += len(seg)
ends[i] = pos
else:
for i, seg in enumerate(segments):
starts[i] = pos
pos += len(seg)
ends[i] = pos
# 5) Payload-Dicts zusammenstellen
payloads: List[Dict[str, Any]] = []
for i, (win, seg) in enumerate(zip(windows, segments)):
# finale chunk_id: bevorzugt deterministische Vorgabe, sonst Fallback
chunk_id = ids_in[i] or f"{note_id}#{i+1}"
pl: Dict[str, Any] = {
# Identität
"note_id": note_id,
"chunk_id": chunk_id,
"id": chunk_id, # Alias für Abwärtskompatibilität
# Ordnung
"chunk_index": i,
"seq": seqs[i],
# Pfad
"path": rel_path,
# Texte
"window": win, # mit Overlap
"text": seg, # Overlap entfernt
# Offsets & Overlaps
"start": starts[i],
"end": ends[i],
"overlap_left": overlaps_left[i],
"overlap_right": overlaps_right[i],
}
# optionale Metafelder aus Note / Chunk
if note_type is not None:
pl["type"] = note_type
if note_title is not None:
pl["title"] = note_title
if note_tags is not None:
pl["tags"] = note_tags
if token_counts[i] is not None:
pl["token_count"] = int(token_counts[i])
if section_titles[i] is not None:
pl["section_title"] = section_titles[i]
if section_paths[i] is not None:
# normalisiere section_path minimal
sp = str(section_paths[i]).replace("\\", "/")
pl["section_path"] = sp if sp else "/"
payloads.append(pl)
return payloads
# __main__ (optionale Mini-Demo)
if __name__ == "__main__": # pragma: no cover
demo_fm = {"id": "demo", "title": "Demo", "type": "concept"}
demo_chunks = [
{"id": "demo#1", "text": "Alpha Beta Gamma", "token_count": 3, "section_title": "Intro", "section_path": "/intro"},
{"id": "demo#2", "text": "Gamma Delta", "token_count": 2, "section_title": "Teil 1", "section_path": "/teil-1"},
{"id": "demo#3", "text": "Delta Epsilon Zeta", "token_count": 3, "section_title": "Teil 2", "section_path": "/teil-2"},
]
pls = make_chunk_payloads(demo_fm, "x/demo.md", demo_chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta")
from pprint import pprint
pprint(pls)
recon = "".join(p["text"] for p in pls)
print("RECON:", recon)