All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
284 lines
9.9 KiB
Python
284 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Modul: app/core/chunk_payload.py
|
||
Version: 2.3.0
|
||
Datum: 2025-11-08
|
||
|
||
Änderungen ggü. 2.2.0
|
||
----------------------
|
||
- Optionaler Parameter `chunk_profile: str | None` (abwärtskompatibel).
|
||
- Bei fehlenden *echten* Fenstern (kein Overlap geliefert) wird das synthetische
|
||
Overlap anhand des Chunk-Profils (short|medium|long) gewählt. Ohne Profil
|
||
bleibt das bisherige Verhalten bestehen (Übernahme aus get_sizes(note_type)).
|
||
|
||
Hinweis
|
||
------
|
||
IDs, Felder und Vektoren bleiben unverändert.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
||
|
||
try:
|
||
# Typgerechtes Overlap aus deiner Konfiguration holen
|
||
from app.core.chunk_config import get_sizes as _get_sizes
|
||
except Exception:
|
||
def _get_sizes(_note_type: str):
|
||
# konservativer Default, falls Import fehlschlägt
|
||
return {"overlap": (40, 60), "target": (250, 350), "max": 500}
|
||
|
||
# NEU: optionaler Import – Overlap-Empfehlungen aus der Type-Registry
|
||
try:
|
||
from app.core.type_registry import profile_overlap as _profile_overlap
|
||
except Exception:
|
||
def _profile_overlap(_profile: Optional[str]) -> tuple[int,int]:
|
||
return (40, 60)
|
||
|
||
|
||
# ------------------------------- Utils ------------------------------- #
|
||
|
||
def _get_attr_or_key(obj: Any, key: str, default=None):
|
||
if obj is None:
|
||
return default
|
||
if isinstance(obj, dict):
|
||
return obj.get(key, default)
|
||
return getattr(obj, key, default)
|
||
|
||
def _as_window_text(chunk: Any) -> str:
|
||
"""Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern)."""
|
||
for k in ("window", "text", "content", "raw"):
|
||
v = _get_attr_or_key(chunk, k, None)
|
||
if isinstance(v, str) and v:
|
||
return v
|
||
return ""
|
||
|
||
def _to_int(x: Any, default: int = 0) -> int:
|
||
try:
|
||
return int(x)
|
||
except Exception:
|
||
return default
|
||
|
||
def _normalize_rel_path(p: str) -> str:
|
||
p = (p or "").replace("\\", "/")
|
||
while p.startswith("/"):
|
||
p = p[1:]
|
||
return p
|
||
|
||
|
||
# ---------------------- Overlap & Offsets ---------------------------- #
|
||
|
||
def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]:
|
||
"""
|
||
Entfernt linkes Overlap aus echten Fenster-Strings.
|
||
Rückgabe: (segments, overlaps_left, reconstructed_text)
|
||
"""
|
||
segments: List[str] = []
|
||
overlaps_left: List[int] = []
|
||
reconstructed = ""
|
||
for w in windows:
|
||
w = w or ""
|
||
max_k = min(len(w), len(reconstructed))
|
||
k = 0
|
||
for cand in range(max_k, -1, -1):
|
||
if reconstructed.endswith(w[:cand]):
|
||
k = cand
|
||
break
|
||
seg = w[k:]
|
||
segments.append(seg)
|
||
overlaps_left.append(k)
|
||
reconstructed += seg
|
||
return segments, overlaps_left, reconstructed
|
||
|
||
def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int:
|
||
"""Länge längsten Suffix(a), der Prefix(b) ist."""
|
||
if not a or not b:
|
||
return 0
|
||
a1 = a[-max_probe:]
|
||
b1 = b[:max_probe]
|
||
n = min(len(a1), len(b1))
|
||
for k in range(n, 0, -1):
|
||
if a1[-k:] == b1[:k]:
|
||
return k
|
||
return 0
|
||
|
||
|
||
# ----------------------------- Public API ---------------------------- #
|
||
|
||
def make_chunk_payloads(
|
||
frontmatter: Dict[str, Any],
|
||
rel_path: str,
|
||
chunks: Iterable[Union[Dict[str, Any], Any]],
|
||
note_text: Optional[str] = None,
|
||
*, # neue, nur-keyword Parameter bleiben abwärtskompatibel
|
||
chunk_profile: Optional[str] = None,
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden,
|
||
erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap.
|
||
"""
|
||
note_id = str(frontmatter.get("id") or "").strip()
|
||
note_type = str(frontmatter.get("type", "")).lower()
|
||
note_title = frontmatter.get("title", None)
|
||
note_tags = frontmatter.get("tags", None)
|
||
rel_path = _normalize_rel_path(rel_path)
|
||
|
||
# 1) Rohdaten sammeln (so wie geliefert)
|
||
chunks_list = list(chunks)
|
||
raw_windows: List[str] = []
|
||
seqs: List[int] = []
|
||
ids_in: List[Optional[str]] = []
|
||
token_counts: List[Optional[int]] = []
|
||
section_titles: List[Optional[str]] = []
|
||
section_paths: List[Optional[str]] = []
|
||
any_explicit_window = False
|
||
|
||
for idx, c in enumerate(chunks_list):
|
||
# Fensterquelle
|
||
w = _get_attr_or_key(c, "window", None)
|
||
if isinstance(w, str) and w:
|
||
any_explicit_window = True
|
||
raw_windows.append(w)
|
||
else:
|
||
raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz
|
||
# Ordnung
|
||
seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx))
|
||
# IDs, Tokens, Sektionen
|
||
cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None))
|
||
ids_in.append(str(cid) if isinstance(cid, str) and cid else None)
|
||
tc = _get_attr_or_key(c, "token_count", None)
|
||
token_counts.append(_to_int(tc, 0) if tc is not None else None)
|
||
section_titles.append(_get_attr_or_key(c, "section_title", None))
|
||
section_paths.append(_get_attr_or_key(c, "section_path", None))
|
||
|
||
# 2) Segmente & Overlaps bestimmen
|
||
if any_explicit_window:
|
||
# Es existieren echte Fenster → dedupe, um Kernsegmente zu finden
|
||
segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows)
|
||
windows_final = raw_windows[:] # bereits mit Overlap geliefert
|
||
else:
|
||
# Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher),
|
||
# wir erzeugen synthetische Fenster mit Overlap.
|
||
segments = [w or "" for w in raw_windows]
|
||
overlaps_left = []
|
||
windows_final = []
|
||
recon = ""
|
||
|
||
try:
|
||
# Bisheriges Verhalten: aus get_sizes(note_type)
|
||
overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60)))
|
||
except Exception:
|
||
overlap_low, overlap_high = (40, 60)
|
||
|
||
# Registry-Profil (falls vorhanden) übersteuert *nur* den Overlap defensiv
|
||
if isinstance(chunk_profile, str) and chunk_profile.strip():
|
||
try:
|
||
o_low, o_high = _profile_overlap(chunk_profile.strip().lower())
|
||
# defensiver Clamp: niemals größer als 3x Default
|
||
overlap_low = max(0, min(o_low, overlap_low * 3))
|
||
overlap_high = max(overlap_low, min(o_high, overlap_high * 3))
|
||
except Exception:
|
||
pass
|
||
|
||
overlap_target = int(overlap_low)
|
||
|
||
for i, seg in enumerate(segments):
|
||
if i == 0:
|
||
# erstes Fenster: kein linker Kontext
|
||
windows_final.append(seg)
|
||
overlaps_left.append(0)
|
||
recon += seg
|
||
else:
|
||
# synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts
|
||
k = min(overlap_target, len(recon))
|
||
left_ctx = recon[-k:] if k > 0 else ""
|
||
windows_final.append(left_ctx + seg)
|
||
overlaps_left.append(k)
|
||
recon += seg # Rekonstruktion bleibt kerntreu
|
||
|
||
# 3) overlap_right bestimmen
|
||
overlaps_right: List[int] = []
|
||
for i in range(len(windows_final)):
|
||
if i + 1 < len(windows_final):
|
||
ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096)
|
||
else:
|
||
ov = 0
|
||
overlaps_right.append(ov)
|
||
|
||
# 4) start/end-Offsets (exakt via note_text, sonst kumulativ)
|
||
starts: List[int] = [0] * len(segments)
|
||
ends: List[int] = [0] * len(segments)
|
||
pos = 0
|
||
if isinstance(note_text, str) and note_text:
|
||
search_pos = 0
|
||
for i, seg in enumerate(segments):
|
||
if not seg:
|
||
starts[i] = ends[i] = search_pos
|
||
continue
|
||
j = note_text.find(seg, search_pos)
|
||
if j >= 0:
|
||
starts[i] = j
|
||
ends[i] = j + len(seg)
|
||
search_pos = ends[i]
|
||
else:
|
||
# Fallback: kumulativ
|
||
starts[i] = pos
|
||
pos += len(seg)
|
||
ends[i] = pos
|
||
else:
|
||
for i, seg in enumerate(segments):
|
||
starts[i] = pos
|
||
pos += len(seg)
|
||
ends[i] = pos
|
||
|
||
# 5) Payload-Dicts
|
||
payloads: List[Dict[str, Any]] = []
|
||
for i, (win, seg) in enumerate(zip(windows_final, segments)):
|
||
chunk_id = ids_in[i] or f"{note_id}#{i+1}"
|
||
pl: Dict[str, Any] = {
|
||
"note_id": note_id,
|
||
"chunk_id": chunk_id,
|
||
"id": chunk_id, # Alias
|
||
"chunk_index": i,
|
||
"seq": seqs[i],
|
||
"path": rel_path,
|
||
"window": win,
|
||
"text": seg,
|
||
"start": starts[i],
|
||
"end": ends[i],
|
||
"overlap_left": overlaps_left[i],
|
||
"overlap_right": overlaps_right[i],
|
||
}
|
||
# optionale Metadaten
|
||
if note_type:
|
||
pl["type"] = note_type
|
||
if note_title is not None:
|
||
pl["title"] = note_title
|
||
if note_tags is not None:
|
||
pl["tags"] = note_tags
|
||
if token_counts[i] is not None:
|
||
pl["token_count"] = int(token_counts[i])
|
||
if section_titles[i] is not None:
|
||
pl["section_title"] = section_titles[i]
|
||
if section_paths[i] is not None:
|
||
sp = str(section_paths[i]).replace("\\", "/")
|
||
pl["section_path"] = sp if sp else "/"
|
||
payloads.append(pl)
|
||
|
||
return payloads
|
||
|
||
|
||
if __name__ == "__main__": # pragma: no cover
|
||
fm = {"id": "demo", "title": "Demo", "type": "concept"}
|
||
# Beispiel ohne echte Fenster → erzeugt synthetische Overlaps
|
||
chunks = [
|
||
{"id": "demo#1", "text": "Alpha Beta Gamma"},
|
||
{"id": "demo#2", "text": "Gamma Delta"},
|
||
{"id": "demo#3", "text": "Delta Epsilon Zeta"},
|
||
]
|
||
pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta", chunk_profile="long")
|
||
from pprint import pprint
|
||
pprint(pls)
|
||
recon = "".join(p["text"] for p in pls)
|
||
print("RECON:", recon)
|