#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Modul: app/core/chunk_payload.py Version: 2.2.0 Datum: 2025-10-06 Zweck ----- Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.0.1. Neu: Wenn der Chunker KEIN Overlap im Fenster liefert (== window fehlt / identisch zur Kernpassage), erzeugen wir FENSTER mit synthetischem Overlap auf Basis chunk_config.get_sizes(note_type)['overlap']. Felder (beibehalten aus 2.0.1): - note_id, chunk_id, id (Alias), chunk_index, seq, path - window (mit Overlap), text (ohne linkes Overlap) - start, end (Offsets im gesamten Body) - overlap_left, overlap_right - token_count?, section_title?, section_path?, type?, title?, tags? Kompatibilität: - 'id' == 'chunk_id' als Alias - Pfade bleiben relativ (keine führenden '/'), Backslashes → Slashes - Robust für Chunk-Objekte oder Dicts; Fensterquelle: 'window'|'text'|'content'|'raw' Lizenz: MIT (projektintern) """ from __future__ import annotations def _overlap_from_frontmatter(frontmatter: Dict[str, Any], fallback: Tuple[int,int]) -> Tuple[int,int]: prof = str(frontmatter.get("chunk_profile") or "").strip().lower() if prof: try: return profile_overlap(prof) except Exception: return fallback return fallback from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from app.core.type_registry import profile_overlap try: # Typgerechtes Overlap aus deiner Konfiguration holen from app.core.chunk_config import get_sizes as _get_sizes except Exception: def _get_sizes(_note_type: str): # konservativer Default, falls Import fehlschlägt return {"overlap": (40, 60), "target": (250, 350), "max": 500} # ------------------------------- Utils ------------------------------- # def _get_attr_or_key(obj: Any, key: str, default=None): if obj is None: return default if isinstance(obj, dict): return obj.get(key, default) return getattr(obj, key, default) def _as_window_text(chunk: Any) -> str: """Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern).""" for k in ("window", "text", "content", "raw"): v = _get_attr_or_key(chunk, k, None) if isinstance(v, str) and v: return v return "" def _to_int(x: Any, default: int = 0) -> int: try: return int(x) except Exception: return default def _normalize_rel_path(p: str) -> str: p = (p or "").replace("\\", "/") while p.startswith("/"): p = p[1:] return p # ---------------------- Overlap & Offsets ---------------------------- # def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]: """ Entfernt linkes Overlap aus echten Fenster-Strings. Rückgabe: (segments, overlaps_left, reconstructed_text) """ segments: List[str] = [] overlaps_left: List[int] = [] reconstructed = "" for w in windows: w = w or "" max_k = min(len(w), len(reconstructed)) k = 0 for cand in range(max_k, -1, -1): if reconstructed.endswith(w[:cand]): k = cand break seg = w[k:] segments.append(seg) overlaps_left.append(k) reconstructed += seg return segments, overlaps_left, reconstructed def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int: """Länge längsten Suffix(a), der Prefix(b) ist.""" if not a or not b: return 0 a1 = a[-max_probe:] b1 = b[:max_probe] n = min(len(a1), len(b1)) for k in range(n, 0, -1): if a1[-k:] == b1[:k]: return k return 0 # ----------------------------- Public API ---------------------------- # def make_chunk_payloads( frontmatter: Dict[str, Any], rel_path: str, chunks: Iterable[Union[Dict[str, Any], Any]], note_text: Optional[str] = None, ) -> List[Dict[str, Any]]: """ Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden, erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap. """ note_id = str(frontmatter.get("id") or "").strip() note_type = str(frontmatter.get("type", "")).lower() note_title = frontmatter.get("title", None) note_tags = frontmatter.get("tags", None) rel_path = _normalize_rel_path(rel_path) # 1) Rohdaten sammeln (so wie geliefert) chunks_list = list(chunks) raw_windows: List[str] = [] seqs: List[int] = [] ids_in: List[Optional[str]] = [] token_counts: List[Optional[int]] = [] section_titles: List[Optional[str]] = [] section_paths: List[Optional[str]] = [] any_explicit_window = False for idx, c in enumerate(chunks_list): # Fensterquelle w = _get_attr_or_key(c, "window", None) if isinstance(w, str) and w: any_explicit_window = True raw_windows.append(w) else: raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz # Ordnung seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) # IDs, Tokens, Sektionen cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) ids_in.append(str(cid) if isinstance(cid, str) and cid else None) tc = _get_attr_or_key(c, "token_count", None) token_counts.append(_to_int(tc, 0) if tc is not None else None) section_titles.append(_get_attr_or_key(c, "section_title", None)) section_paths.append(_get_attr_or_key(c, "section_path", None)) # 2) Segmente & Overlaps bestimmen if any_explicit_window: # Es existieren echte Fenster → dedupe, um Kernsegmente zu finden segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows) windows_final = raw_windows[:] # bereits mit Overlap geliefert else: # Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher), # wir erzeugen synthetische Fenster mit Overlap gemäß Typ segments = [w or "" for w in raw_windows] overlaps_left = [] windows_final = [] recon = "" try: overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60))) except Exception: overlap_low, overlap_high = (40, 60) overlap_target = int(overlap_low) for i, seg in enumerate(segments): if i == 0: # erstes Fenster: kein linker Kontext windows_final.append(seg) overlaps_left.append(0) recon += seg else: # synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts k = min(overlap_target, len(recon)) left_ctx = recon[-k:] if k > 0 else "" windows_final.append(left_ctx + seg) overlaps_left.append(k) recon += seg # Rekonstruktion bleibt kerntreu # 3) overlap_right bestimmen overlaps_right: List[int] = [] for i in range(len(windows_final)): if i + 1 < len(windows_final): ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096) else: ov = 0 overlaps_right.append(ov) # 4) start/end-Offsets (exakt via note_text, sonst kumulativ) starts: List[int] = [0] * len(segments) ends: List[int] = [0] * len(segments) pos = 0 if isinstance(note_text, str) and note_text: search_pos = 0 for i, seg in enumerate(segments): if not seg: starts[i] = ends[i] = search_pos continue j = note_text.find(seg, search_pos) if j >= 0: starts[i] = j ends[i] = j + len(seg) search_pos = ends[i] else: # Fallback: kumulativ starts[i] = pos pos += len(seg) ends[i] = pos else: for i, seg in enumerate(segments): starts[i] = pos pos += len(seg) ends[i] = pos # 5) Payload-Dicts payloads: List[Dict[str, Any]] = [] for i, (win, seg) in enumerate(zip(windows_final, segments)): chunk_id = ids_in[i] or f"{note_id}#{i+1}" pl: Dict[str, Any] = { "note_id": note_id, "chunk_id": chunk_id, "id": chunk_id, # Alias "chunk_index": i, "seq": seqs[i], "path": rel_path, "window": win, "text": seg, "start": starts[i], "end": ends[i], "overlap_left": overlaps_left[i], "overlap_right": overlaps_right[i], } # optionale Metadaten if note_type: pl["type"] = note_type if note_title is not None: pl["title"] = note_title if note_tags is not None: pl["tags"] = note_tags if token_counts[i] is not None: pl["token_count"] = int(token_counts[i]) if section_titles[i] is not None: pl["section_title"] = section_titles[i] if section_paths[i] is not None: sp = str(section_paths[i]).replace("\\", "/") pl["section_path"] = sp if sp else "/" payloads.append(pl) return payloads # __main__ Demo (optional) if __name__ == "__main__": # pragma: no cover fm = {"id": "demo", "title": "Demo", "type": "concept"} # Beispiel ohne echte Fenster → erzeugt synthetische Overlaps chunks = [ {"id": "demo#1", "text": "Alpha Beta Gamma"}, {"id": "demo#2", "text": "Gamma Delta"}, {"id": "demo#3", "text": "Delta Epsilon Zeta"}, ] pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta") from pprint import pprint pprint(pls) recon = "".join(p["text"] for p in pls) print("RECON:", recon)