diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index f02b0da..b1d893c 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -1,135 +1,283 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ -chunk_payload.py v2.2.1 +Modul: app/core/chunk_payload.py +Version: 2.3.0 +Datum: 2025-11-08 -Zweck: - - Erzeugt Chunk-Payloads (inkl. 'text' und 'window'), vollständig - rückwärtskompatibel zu v2.2.0 (Signaturen beibehalten). - - Neu: optionale Typ-Profile via Type-Registry (chunk_profile), ohne - bestehendes Verhalten zu brechen. Fällt automatisch auf die bisherigen - Einstellungen zurück, wenn keine Registry vorhanden/konfiguriert ist. +Änderungen ggü. 2.2.0 +---------------------- +- Optionaler Parameter `chunk_profile: str | None` (abwärtskompatibel). +- Bei fehlenden *echten* Fenstern (kein Overlap geliefert) wird das synthetische + Overlap anhand des Chunk-Profils (short|medium|long) gewählt. Ohne Profil + bleibt das bisherige Verhalten bestehen (Übernahme aus get_sizes(note_type)). -Wichtig: - - Signatur von make_chunk_payloads bleibt kompatibel: - make_chunk_payloads(chunks, note_id, note_title, note_type, note_path, ...) - - 'window' != 'text', wenn Overlap > 0; ansonsten identisch. +Hinweis +------ +IDs, Felder und Vektoren bleiben unverändert. """ - from __future__ import annotations -from typing import Any, Dict, Iterable, List, Optional, Tuple -# Annahme: Diese Utilities existieren bei dir bereits (aus funktionierendem Stand): -# - Chunk Typ (dataclass/NamedTuple) mit Feldern: id (oder idx), text, start, end -# - windowing-Funktion (oder wir erzeugen ein simples Fenster über Nachbar-Chunks) +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union + try: - from app.core.chunker import Chunk # type: ignore + # Typgerechtes Overlap aus deiner Konfiguration holen + from app.core.chunk_config import get_sizes as _get_sizes except Exception: - # Minimaler Fallback, falls die Import-Umgebung abweicht; in produktiven - # Deployments wird der echte Chunk-Typ vorhanden sein. - from typing import NamedTuple - class Chunk(NamedTuple): - idx: int - text: str - start: int - end: int + def _get_sizes(_note_type: str): + # konservativer Default, falls Import fehlschlägt + return {"overlap": (40, 60), "target": (250, 350), "max": 500} -# Optional: Registry einbinden +# NEU: optionaler Import – Overlap-Empfehlungen aus der Type-Registry try: - from app.core.type_registry import resolve_chunk_profile + from app.core.type_registry import profile_overlap as _profile_overlap except Exception: - def resolve_chunk_profile(note_type: str, default_profile: str = "default") -> str: - return default_profile + def _profile_overlap(_profile: Optional[str]) -> tuple[int,int]: + return (40, 60) -DEFAULT_OVERLAP = 0 # falls euer chunker keinen Overlap liefert -DEFAULT_PROFILE_TO_OVERLAP = { - "short": 40, - "medium": 80, - "long": 120, - "default": 0, -} -def _estimate_overlap(profile: str) -> int: - return int(DEFAULT_PROFILE_TO_OVERLAP.get(profile, DEFAULT_OVERLAP)) +# ------------------------------- Utils ------------------------------- # -def _make_window_text(chunks: List[Chunk], i: int, overlap_chars: int) -> str: +def _get_attr_or_key(obj: Any, key: str, default=None): + if obj is None: + return default + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + +def _as_window_text(chunk: Any) -> str: + """Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern).""" + for k in ("window", "text", "content", "raw"): + v = _get_attr_or_key(chunk, k, None) + if isinstance(v, str) and v: + return v + return "" + +def _to_int(x: Any, default: int = 0) -> int: + try: + return int(x) + except Exception: + return default + +def _normalize_rel_path(p: str) -> str: + p = (p or "").replace("\\", "/") + while p.startswith("/"): + p = p[1:] + return p + + +# ---------------------- Overlap & Offsets ---------------------------- # + +def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]: """ - Erzeugt ein einfaches Fenster: left-overlap-Anteil aus dem vorherigen Chunk - + eigener Text + right-overlap-Anteil aus dem nächsten Chunk. + Entfernt linkes Overlap aus echten Fenster-Strings. + Rückgabe: (segments, overlaps_left, reconstructed_text) """ - center = chunks[i].text - if overlap_chars <= 0: - return center + segments: List[str] = [] + overlaps_left: List[int] = [] + reconstructed = "" + for w in windows: + w = w or "" + max_k = min(len(w), len(reconstructed)) + k = 0 + for cand in range(max_k, -1, -1): + if reconstructed.endswith(w[:cand]): + k = cand + break + seg = w[k:] + segments.append(seg) + overlaps_left.append(k) + reconstructed += seg + return segments, overlaps_left, reconstructed - left = "" - right = "" - if i > 0: - ltxt = chunks[i - 1].text - left = ltxt[-overlap_chars:] if len(ltxt) > overlap_chars else ltxt - if i + 1 < len(chunks): - rtxt = chunks[i + 1].text - right = rtxt[:overlap_chars] if len(rtxt) > overlap_chars else rtxt +def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int: + """Länge längsten Suffix(a), der Prefix(b) ist.""" + if not a or not b: + return 0 + a1 = a[-max_probe:] + b1 = b[:max_probe] + n = min(len(a1), len(b1)) + for k in range(n, 0, -1): + if a1[-k:] == b1[:k]: + return k + return 0 - # Verhindere doppelte Leerzeichen - pieces = [p for p in [left, center, right] if p] - return (" ".join(pieces)).strip() + +# ----------------------------- Public API ---------------------------- # def make_chunk_payloads( - chunks: Iterable[Chunk], - note_id: str, - note_title: str, - note_type: Optional[str], - note_path: str, - *, + frontmatter: Dict[str, Any], + rel_path: str, + chunks: Iterable[Union[Dict[str, Any], Any]], + note_text: Optional[str] = None, + *, # neue, nur-keyword Parameter bleiben abwärtskompatibel chunk_profile: Optional[str] = None, - window_overwrite: Optional[int] = None, - extra_payload: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """ - Rückwärtskompatible Fabrik für Chunk-Payloads. - - Parameter: - - chunks: Iterable[Chunk] — Ergebnis aus eurem chunker. - - note_id: str — ID der Note (z. B. aus Frontmatter 'id'). - - note_title: str — Titel der Note. - - note_type: Optional[str] — Typ aus Frontmatter (concept, task, ...). - - note_path: str — relativer Pfad im Vault. - - chunk_profile: Optional[str] — Override-Profil (z. B. 'short'). - - window_overwrite: Optional[int] — erzwinge Overlap in Zeichen. - - extra_payload: Optional[dict] — zusätzliche Felder in Payload. - - Rückgabe: - - Liste von dict-Payloads; jedes enthält mind.: - { "note_id", "note_title", "note_type", "path", "chunk_id", - "text", "window", "start", "end" } + Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden, + erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap. """ - clist = list(chunks) - # Profil bestimmen (Registry → default → Override) - effective_profile = ( - chunk_profile - or (resolve_chunk_profile(note_type or "concept") if note_type else "default") - or "default" - ) - overlap = window_overwrite if isinstance(window_overwrite, int) else _estimate_overlap(effective_profile) + note_id = str(frontmatter.get("id") or "").strip() + note_type = str(frontmatter.get("type", "")).lower() + note_title = frontmatter.get("title", None) + note_tags = frontmatter.get("tags", None) + rel_path = _normalize_rel_path(rel_path) + # 1) Rohdaten sammeln (so wie geliefert) + chunks_list = list(chunks) + raw_windows: List[str] = [] + seqs: List[int] = [] + ids_in: List[Optional[str]] = [] + token_counts: List[Optional[int]] = [] + section_titles: List[Optional[str]] = [] + section_paths: List[Optional[str]] = [] + any_explicit_window = False + + for idx, c in enumerate(chunks_list): + # Fensterquelle + w = _get_attr_or_key(c, "window", None) + if isinstance(w, str) and w: + any_explicit_window = True + raw_windows.append(w) + else: + raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz + # Ordnung + seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) + # IDs, Tokens, Sektionen + cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) + ids_in.append(str(cid) if isinstance(cid, str) and cid else None) + tc = _get_attr_or_key(c, "token_count", None) + token_counts.append(_to_int(tc, 0) if tc is not None else None) + section_titles.append(_get_attr_or_key(c, "section_title", None)) + section_paths.append(_get_attr_or_key(c, "section_path", None)) + + # 2) Segmente & Overlaps bestimmen + if any_explicit_window: + # Es existieren echte Fenster → dedupe, um Kernsegmente zu finden + segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows) + windows_final = raw_windows[:] # bereits mit Overlap geliefert + else: + # Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher), + # wir erzeugen synthetische Fenster mit Overlap. + segments = [w or "" for w in raw_windows] + overlaps_left = [] + windows_final = [] + recon = "" + + try: + # Bisheriges Verhalten: aus get_sizes(note_type) + overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60))) + except Exception: + overlap_low, overlap_high = (40, 60) + + # Registry-Profil (falls vorhanden) übersteuert *nur* den Overlap defensiv + if isinstance(chunk_profile, str) and chunk_profile.strip(): + try: + o_low, o_high = _profile_overlap(chunk_profile.strip().lower()) + # defensiver Clamp: niemals größer als 3x Default + overlap_low = max(0, min(o_low, overlap_low * 3)) + overlap_high = max(overlap_low, min(o_high, overlap_high * 3)) + except Exception: + pass + + overlap_target = int(overlap_low) + + for i, seg in enumerate(segments): + if i == 0: + # erstes Fenster: kein linker Kontext + windows_final.append(seg) + overlaps_left.append(0) + recon += seg + else: + # synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts + k = min(overlap_target, len(recon)) + left_ctx = recon[-k:] if k > 0 else "" + windows_final.append(left_ctx + seg) + overlaps_left.append(k) + recon += seg # Rekonstruktion bleibt kerntreu + + # 3) overlap_right bestimmen + overlaps_right: List[int] = [] + for i in range(len(windows_final)): + if i + 1 < len(windows_final): + ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096) + else: + ov = 0 + overlaps_right.append(ov) + + # 4) start/end-Offsets (exakt via note_text, sonst kumulativ) + starts: List[int] = [0] * len(segments) + ends: List[int] = [0] * len(segments) + pos = 0 + if isinstance(note_text, str) and note_text: + search_pos = 0 + for i, seg in enumerate(segments): + if not seg: + starts[i] = ends[i] = search_pos + continue + j = note_text.find(seg, search_pos) + if j >= 0: + starts[i] = j + ends[i] = j + len(seg) + search_pos = ends[i] + else: + # Fallback: kumulativ + starts[i] = pos + pos += len(seg) + ends[i] = pos + else: + for i, seg in enumerate(segments): + starts[i] = pos + pos += len(seg) + ends[i] = pos + + # 5) Payload-Dicts payloads: List[Dict[str, Any]] = [] - for i, c in enumerate(clist): - # Chunk-ID stabil: note_id#(laufende Nummer 1..n) – rückwärtskompatibel - # Falls euer Chunk bereits eine ID hat, könnt ihr sie beibehalten. - cid = f"{note_id}#{i+1}" - window_txt = _make_window_text(clist, i, overlap) - + for i, (win, seg) in enumerate(zip(windows_final, segments)): + chunk_id = ids_in[i] or f"{note_id}#{i+1}" pl: Dict[str, Any] = { "note_id": note_id, - "note_title": note_title, - "note_type": note_type, - "path": note_path, - "chunk_id": cid, - "text": getattr(c, "text", ""), - "window": window_txt, - "start": getattr(c, "start", 0), - "end": getattr(c, "end", 0), + "chunk_id": chunk_id, + "id": chunk_id, # Alias + "chunk_index": i, + "seq": seqs[i], + "path": rel_path, + "window": win, + "text": seg, + "start": starts[i], + "end": ends[i], + "overlap_left": overlaps_left[i], + "overlap_right": overlaps_right[i], } - if extra_payload: - pl.update(extra_payload) + # optionale Metadaten + if note_type: + pl["type"] = note_type + if note_title is not None: + pl["title"] = note_title + if note_tags is not None: + pl["tags"] = note_tags + if token_counts[i] is not None: + pl["token_count"] = int(token_counts[i]) + if section_titles[i] is not None: + pl["section_title"] = section_titles[i] + if section_paths[i] is not None: + sp = str(section_paths[i]).replace("\\", "/") + pl["section_path"] = sp if sp else "/" payloads.append(pl) return payloads + + +if __name__ == "__main__": # pragma: no cover + fm = {"id": "demo", "title": "Demo", "type": "concept"} + # Beispiel ohne echte Fenster → erzeugt synthetische Overlaps + chunks = [ + {"id": "demo#1", "text": "Alpha Beta Gamma"}, + {"id": "demo#2", "text": "Gamma Delta"}, + {"id": "demo#3", "text": "Delta Epsilon Zeta"}, + ] + pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta", chunk_profile="long") + from pprint import pprint + pprint(pls) + recon = "".join(p["text"] for p in pls) + print("RECON:", recon)