mindnet/app/core/chunk_payload.py
Lars 7b56f696d6
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
Dateien nach "app/core" hochladen
2025-11-08 16:58:47 +01:00

272 lines
9.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: app/core/chunk_payload.py
Version: 2.2.0
Datum: 2025-10-06
Zweck
-----
Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.0.1.
Neu: Wenn der Chunker KEIN Overlap im Fenster liefert (== window fehlt / identisch zur Kernpassage),
erzeugen wir FENSTER mit synthetischem Overlap auf Basis chunk_config.get_sizes(note_type)['overlap'].
Felder (beibehalten aus 2.0.1):
- note_id, chunk_id, id (Alias), chunk_index, seq, path
- window (mit Overlap), text (ohne linkes Overlap)
- start, end (Offsets im gesamten Body)
- overlap_left, overlap_right
- token_count?, section_title?, section_path?, type?, title?, tags?
Kompatibilität:
- 'id' == 'chunk_id' als Alias
- Pfade bleiben relativ (keine führenden '/'), Backslashes → Slashes
- Robust für Chunk-Objekte oder Dicts; Fensterquelle: 'window'|'text'|'content'|'raw'
Lizenz: MIT (projektintern)
"""
from __future__ import annotations
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
try:
# Typgerechtes Overlap aus deiner Konfiguration holen
from app.core.chunk_config import get_sizes as _get_sizes
except Exception:
def _get_sizes(_note_type: str):
# konservativer Default, falls Import fehlschlägt
return {"overlap": (40, 60), "target": (250, 350), "max": 500}
# ------------------------------- Utils ------------------------------- #
def _get_attr_or_key(obj: Any, key: str, default=None):
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def _as_window_text(chunk: Any) -> str:
"""Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern)."""
for k in ("window", "text", "content", "raw"):
v = _get_attr_or_key(chunk, k, None)
if isinstance(v, str) and v:
return v
return ""
def _to_int(x: Any, default: int = 0) -> int:
try:
return int(x)
except Exception:
return default
def _normalize_rel_path(p: str) -> str:
p = (p or "").replace("\\", "/")
while p.startswith("/"):
p = p[1:]
return p
# ---------------------- Overlap & Offsets ---------------------------- #
def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]:
"""
Entfernt linkes Overlap aus echten Fenster-Strings.
Rückgabe: (segments, overlaps_left, reconstructed_text)
"""
segments: List[str] = []
overlaps_left: List[int] = []
reconstructed = ""
for w in windows:
w = w or ""
max_k = min(len(w), len(reconstructed))
k = 0
for cand in range(max_k, -1, -1):
if reconstructed.endswith(w[:cand]):
k = cand
break
seg = w[k:]
segments.append(seg)
overlaps_left.append(k)
reconstructed += seg
return segments, overlaps_left, reconstructed
def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int:
"""Länge längsten Suffix(a), der Prefix(b) ist."""
if not a or not b:
return 0
a1 = a[-max_probe:]
b1 = b[:max_probe]
n = min(len(a1), len(b1))
for k in range(n, 0, -1):
if a1[-k:] == b1[:k]:
return k
return 0
# ----------------------------- Public API ---------------------------- #
def make_chunk_payloads(
frontmatter: Dict[str, Any],
rel_path: str,
chunks: Iterable[Union[Dict[str, Any], Any]],
note_text: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden,
erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap.
"""
note_id = str(frontmatter.get("id") or "").strip()
note_type = str(frontmatter.get("type", "")).lower()
note_title = frontmatter.get("title", None)
note_tags = frontmatter.get("tags", None)
rel_path = _normalize_rel_path(rel_path)
# 1) Rohdaten sammeln (so wie geliefert)
chunks_list = list(chunks)
raw_windows: List[str] = []
seqs: List[int] = []
ids_in: List[Optional[str]] = []
token_counts: List[Optional[int]] = []
section_titles: List[Optional[str]] = []
section_paths: List[Optional[str]] = []
any_explicit_window = False
for idx, c in enumerate(chunks_list):
# Fensterquelle
w = _get_attr_or_key(c, "window", None)
if isinstance(w, str) and w:
any_explicit_window = True
raw_windows.append(w)
else:
raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz
# Ordnung
seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx))
# IDs, Tokens, Sektionen
cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None))
ids_in.append(str(cid) if isinstance(cid, str) and cid else None)
tc = _get_attr_or_key(c, "token_count", None)
token_counts.append(_to_int(tc, 0) if tc is not None else None)
section_titles.append(_get_attr_or_key(c, "section_title", None))
section_paths.append(_get_attr_or_key(c, "section_path", None))
# 2) Segmente & Overlaps bestimmen
if any_explicit_window:
# Es existieren echte Fenster → dedupe, um Kernsegmente zu finden
segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows)
windows_final = raw_windows[:] # bereits mit Overlap geliefert
else:
# Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher),
# wir erzeugen synthetische Fenster mit Overlap gemäß Typ
segments = [w or "" for w in raw_windows]
overlaps_left = []
windows_final = []
recon = ""
try:
overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60)))
except Exception:
overlap_low, overlap_high = (40, 60)
overlap_target = int(overlap_low)
for i, seg in enumerate(segments):
if i == 0:
# erstes Fenster: kein linker Kontext
windows_final.append(seg)
overlaps_left.append(0)
recon += seg
else:
# synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts
k = min(overlap_target, len(recon))
left_ctx = recon[-k:] if k > 0 else ""
windows_final.append(left_ctx + seg)
overlaps_left.append(k)
recon += seg # Rekonstruktion bleibt kerntreu
# 3) overlap_right bestimmen
overlaps_right: List[int] = []
for i in range(len(windows_final)):
if i + 1 < len(windows_final):
ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096)
else:
ov = 0
overlaps_right.append(ov)
# 4) start/end-Offsets (exakt via note_text, sonst kumulativ)
starts: List[int] = [0] * len(segments)
ends: List[int] = [0] * len(segments)
pos = 0
if isinstance(note_text, str) and note_text:
search_pos = 0
for i, seg in enumerate(segments):
if not seg:
starts[i] = ends[i] = search_pos
continue
j = note_text.find(seg, search_pos)
if j >= 0:
starts[i] = j
ends[i] = j + len(seg)
search_pos = ends[i]
else:
# Fallback: kumulativ
starts[i] = pos
pos += len(seg)
ends[i] = pos
else:
for i, seg in enumerate(segments):
starts[i] = pos
pos += len(seg)
ends[i] = pos
# 5) Payload-Dicts
payloads: List[Dict[str, Any]] = []
for i, (win, seg) in enumerate(zip(windows_final, segments)):
chunk_id = ids_in[i] or f"{note_id}#{i+1}"
pl: Dict[str, Any] = {
"note_id": note_id,
"chunk_id": chunk_id,
"id": chunk_id, # Alias
"chunk_index": i,
"seq": seqs[i],
"path": rel_path,
"window": win,
"text": seg,
"start": starts[i],
"end": ends[i],
"overlap_left": overlaps_left[i],
"overlap_right": overlaps_right[i],
}
# optionale Metadaten
if note_type:
pl["type"] = note_type
if note_title is not None:
pl["title"] = note_title
if note_tags is not None:
pl["tags"] = note_tags
if token_counts[i] is not None:
pl["token_count"] = int(token_counts[i])
if section_titles[i] is not None:
pl["section_title"] = section_titles[i]
if section_paths[i] is not None:
sp = str(section_paths[i]).replace("\\", "/")
pl["section_path"] = sp if sp else "/"
payloads.append(pl)
return payloads
# __main__ Demo (optional)
if __name__ == "__main__": # pragma: no cover
fm = {"id": "demo", "title": "Demo", "type": "concept"}
# Beispiel ohne echte Fenster → erzeugt synthetische Overlaps
chunks = [
{"id": "demo#1", "text": "Alpha Beta Gamma"},
{"id": "demo#2", "text": "Gamma Delta"},
{"id": "demo#3", "text": "Delta Epsilon Zeta"},
]
pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta")
from pprint import pprint
pprint(pls)
recon = "".join(p["text"] for p in pls)
print("RECON:", recon)