#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Modul: app/core/chunk_payload.py Version: 2.3.0 Datum: 2025-11-08 Änderungen ggü. 2.2.0 ---------------------- - Optionaler Parameter `chunk_profile: str | None` (abwärtskompatibel). - Bei fehlenden *echten* Fenstern (kein Overlap geliefert) wird das synthetische Overlap anhand des Chunk-Profils (short|medium|long) gewählt. Ohne Profil bleibt das bisherige Verhalten bestehen (Übernahme aus get_sizes(note_type)). Hinweis ------ IDs, Felder und Vektoren bleiben unverändert. """ from __future__ import annotations from typing import Any, Dict, Iterable, List, Optional, Tuple, Union try: # Typgerechtes Overlap aus deiner Konfiguration holen from app.core.chunk_config import get_sizes as _get_sizes except Exception: def _get_sizes(_note_type: str): # konservativer Default, falls Import fehlschlägt return {"overlap": (40, 60), "target": (250, 350), "max": 500} # NEU: optionaler Import – Overlap-Empfehlungen aus der Type-Registry try: from app.core.type_registry import profile_overlap as _profile_overlap except Exception: def _profile_overlap(_profile: Optional[str]) -> tuple[int,int]: return (40, 60) # ------------------------------- Utils ------------------------------- # def _get_attr_or_key(obj: Any, key: str, default=None): if obj is None: return default if isinstance(obj, dict): return obj.get(key, default) return getattr(obj, key, default) def _as_window_text(chunk: Any) -> str: """Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern).""" for k in ("window", "text", "content", "raw"): v = _get_attr_or_key(chunk, k, None) if isinstance(v, str) and v: return v return "" def _to_int(x: Any, default: int = 0) -> int: try: return int(x) except Exception: return default def _normalize_rel_path(p: str) -> str: p = (p or "").replace("\\", "/") while p.startswith("/"): p = p[1:] return p # ---------------------- Overlap & Offsets ---------------------------- # def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]: """ Entfernt linkes Overlap aus echten Fenster-Strings. Rückgabe: (segments, overlaps_left, reconstructed_text) """ segments: List[str] = [] overlaps_left: List[int] = [] reconstructed = "" for w in windows: w = w or "" max_k = min(len(w), len(reconstructed)) k = 0 for cand in range(max_k, -1, -1): if reconstructed.endswith(w[:cand]): k = cand break seg = w[k:] segments.append(seg) overlaps_left.append(k) reconstructed += seg return segments, overlaps_left, reconstructed def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int: """Länge längsten Suffix(a), der Prefix(b) ist.""" if not a or not b: return 0 a1 = a[-max_probe:] b1 = b[:max_probe] n = min(len(a1), len(b1)) for k in range(n, 0, -1): if a1[-k:] == b1[:k]: return k return 0 # ----------------------------- Public API ---------------------------- # def make_chunk_payloads( frontmatter: Dict[str, Any], rel_path: str, chunks: Iterable[Union[Dict[str, Any], Any]], note_text: Optional[str] = None, *, # neue, nur-keyword Parameter bleiben abwärtskompatibel chunk_profile: Optional[str] = None, ) -> List[Dict[str, Any]]: """ Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden, erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap. """ note_id = str(frontmatter.get("id") or "").strip() note_type = str(frontmatter.get("type", "")).lower() note_title = frontmatter.get("title", None) note_tags = frontmatter.get("tags", None) rel_path = _normalize_rel_path(rel_path) # 1) Rohdaten sammeln (so wie geliefert) chunks_list = list(chunks) raw_windows: List[str] = [] seqs: List[int] = [] ids_in: List[Optional[str]] = [] token_counts: List[Optional[int]] = [] section_titles: List[Optional[str]] = [] section_paths: List[Optional[str]] = [] any_explicit_window = False for idx, c in enumerate(chunks_list): # Fensterquelle w = _get_attr_or_key(c, "window", None) if isinstance(w, str) and w: any_explicit_window = True raw_windows.append(w) else: raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz # Ordnung seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) # IDs, Tokens, Sektionen cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) ids_in.append(str(cid) if isinstance(cid, str) and cid else None) tc = _get_attr_or_key(c, "token_count", None) token_counts.append(_to_int(tc, 0) if tc is not None else None) section_titles.append(_get_attr_or_key(c, "section_title", None)) section_paths.append(_get_attr_or_key(c, "section_path", None)) # 2) Segmente & Overlaps bestimmen if any_explicit_window: # Es existieren echte Fenster → dedupe, um Kernsegmente zu finden segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows) windows_final = raw_windows[:] # bereits mit Overlap geliefert else: # Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher), # wir erzeugen synthetische Fenster mit Overlap. segments = [w or "" for w in raw_windows] overlaps_left = [] windows_final = [] recon = "" try: # Bisheriges Verhalten: aus get_sizes(note_type) overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60))) except Exception: overlap_low, overlap_high = (40, 60) # Registry-Profil (falls vorhanden) übersteuert *nur* den Overlap defensiv if isinstance(chunk_profile, str) and chunk_profile.strip(): try: o_low, o_high = _profile_overlap(chunk_profile.strip().lower()) # defensiver Clamp: niemals größer als 3x Default overlap_low = max(0, min(o_low, overlap_low * 3)) overlap_high = max(overlap_low, min(o_high, overlap_high * 3)) except Exception: pass overlap_target = int(overlap_low) for i, seg in enumerate(segments): if i == 0: # erstes Fenster: kein linker Kontext windows_final.append(seg) overlaps_left.append(0) recon += seg else: # synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts k = min(overlap_target, len(recon)) left_ctx = recon[-k:] if k > 0 else "" windows_final.append(left_ctx + seg) overlaps_left.append(k) recon += seg # Rekonstruktion bleibt kerntreu # 3) overlap_right bestimmen overlaps_right: List[int] = [] for i in range(len(windows_final)): if i + 1 < len(windows_final): ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096) else: ov = 0 overlaps_right.append(ov) # 4) start/end-Offsets (exakt via note_text, sonst kumulativ) starts: List[int] = [0] * len(segments) ends: List[int] = [0] * len(segments) pos = 0 if isinstance(note_text, str) and note_text: search_pos = 0 for i, seg in enumerate(segments): if not seg: starts[i] = ends[i] = search_pos continue j = note_text.find(seg, search_pos) if j >= 0: starts[i] = j ends[i] = j + len(seg) search_pos = ends[i] else: # Fallback: kumulativ starts[i] = pos pos += len(seg) ends[i] = pos else: for i, seg in enumerate(segments): starts[i] = pos pos += len(seg) ends[i] = pos # 5) Payload-Dicts payloads: List[Dict[str, Any]] = [] for i, (win, seg) in enumerate(zip(windows_final, segments)): chunk_id = ids_in[i] or f"{note_id}#{i+1}" pl: Dict[str, Any] = { "note_id": note_id, "chunk_id": chunk_id, "id": chunk_id, # Alias "chunk_index": i, "seq": seqs[i], "path": rel_path, "window": win, "text": seg, "start": starts[i], "end": ends[i], "overlap_left": overlaps_left[i], "overlap_right": overlaps_right[i], } # optionale Metadaten if note_type: pl["type"] = note_type if note_title is not None: pl["title"] = note_title if note_tags is not None: pl["tags"] = note_tags if token_counts[i] is not None: pl["token_count"] = int(token_counts[i]) if section_titles[i] is not None: pl["section_title"] = section_titles[i] if section_paths[i] is not None: sp = str(section_paths[i]).replace("\\", "/") pl["section_path"] = sp if sp else "/" payloads.append(pl) return payloads if __name__ == "__main__": # pragma: no cover fm = {"id": "demo", "title": "Demo", "type": "concept"} # Beispiel ohne echte Fenster → erzeugt synthetische Overlaps chunks = [ {"id": "demo#1", "text": "Alpha Beta Gamma"}, {"id": "demo#2", "text": "Gamma Delta"}, {"id": "demo#3", "text": "Delta Epsilon Zeta"}, ] pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta", chunk_profile="long") from pprint import pprint pprint(pls) recon = "".join(p["text"] for p in pls) print("RECON:", recon)