diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index 3bcc0cc..5676190 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -2,46 +2,26 @@ # -*- coding: utf-8 -*- """ Modul: app/core/chunk_payload.py -Version: 2.1.0 -Datum: 2025-10-01 +Version: 2.2.0 +Datum: 2025-10-06 Zweck ----- -Erzeugt Chunk-Payloads für Qdrant. Voll abwärtskompatibel zu v2.0.1 und erweitert um: -- Übernahme vorhandener deterministischer Chunk-IDs (falls vom Chunker geliefert). -- Durchreichen von token_count, section_title, section_path (falls vorhanden). -- Exaktere start/end-Offsets, wenn der vollständige Note-Body (note_text) vorliegt. -- Berechnung von overlap_right (nicht mehr pauschal 0). +Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.0.1. +Neu: Wenn der Chunker KEIN Overlap im Fenster liefert (== window fehlt / identisch zur Kernpassage), +erzeugen wir FENSTER mit synthetischem Overlap auf Basis chunk_config.get_sizes(note_type)['overlap']. -Felder (wie 2.0.1, beibehalten): - - window : Fenstertext inkl. Overlap (für Embeddings & Link-Erkennung) - - text : effektiver Segmenttext ohne linkes Overlap (für Rekonstruktion) - - start, end : Offsets des Segments im Gesamtkorpus (0-basiert, [start, end)) - - overlap_left : Zeichen-Overlap zum VORHERIGEN Fenster - - overlap_right : Zeichen-Overlap zum NÄCHSTEN Fenster - - note_id, chunk_id, id (Alias), chunk_index, seq, path, type, title, tags +Felder (beibehalten aus 2.0.1): + - note_id, chunk_id, id (Alias), chunk_index, seq, path + - window (mit Overlap), text (ohne linkes Overlap) + - start, end (Offsets im gesamten Body) + - overlap_left, overlap_right + - token_count?, section_title?, section_path?, type?, title?, tags? -Abwärtskompatible Aliasse / Verhalten: - - 'id' == 'chunk_id' (Edges nutzen häufig 'id'). - - Fensterquelle wird robust aus ('window'|'text'|'content'|'raw') gelesen. - - Pfad bleibt relativ und nutzt forward slashes. - -Nutzung -------- - from app.core.chunk_payload import make_chunk_payloads - pls = make_chunk_payloads(frontmatter, rel_path, chunks, note_text=full_body) - -Parameter ---------- -- frontmatter : dict – erwartet mind. 'id' (Note-ID); optional 'title','type','tags' -- rel_path : str – relativer Pfad der Note im Vault (forward slashes, ohne führenden '/') -- chunks : Seq[object|dict] -- note_text : str|None – optional kompletter Body für exakte Offsets - -Hinweis ------- -Dein Chunker liefert bereits Fenster mit vorangestelltem Overlap. Wir ermitteln den -linken Overlap gegen den bisher rekonstruierten Text und entfernen ihn in 'text'. +Kompatibilität: + - 'id' == 'chunk_id' als Alias + - Pfade bleiben relativ (keine führenden '/'), Backslashes → Slashes + - Robust für Chunk-Objekte oder Dicts; Fensterquelle: 'window'|'text'|'content'|'raw' Lizenz: MIT (projektintern) """ @@ -49,11 +29,18 @@ from __future__ import annotations from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +try: + # Typgerechtes Overlap aus deiner Konfiguration holen + from app.core.chunk_config import get_sizes as _get_sizes +except Exception: + def _get_sizes(_note_type: str): + # konservativer Default, falls Import fehlschlägt + return {"overlap": (40, 60), "target": (250, 350), "max": 500} + # ------------------------------- Utils ------------------------------- # def _get_attr_or_key(obj: Any, key: str, default=None): - """Liest Feld 'key' aus Objekt ODER Dict; sonst default.""" if obj is None: return default if isinstance(obj, dict): @@ -61,17 +48,11 @@ def _get_attr_or_key(obj: Any, key: str, default=None): return getattr(obj, key, default) def _as_window_text(chunk: Any) -> str: - """ - Extrahiert den Fenstertext robust aus Chunk (Objekt oder Dict). - Bevorzugt 'window', dann 'text', 'content', 'raw'. - """ + """Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern).""" for k in ("window", "text", "content", "raw"): v = _get_attr_or_key(chunk, k, None) if isinstance(v, str) and v: return v - # Fallback: falls der Chunk selbst ein String ist (exotisch) - if isinstance(chunk, str): - return chunk return "" def _to_int(x: Any, default: int = 0) -> int: @@ -82,7 +63,6 @@ def _to_int(x: Any, default: int = 0) -> int: def _normalize_rel_path(p: str) -> str: p = (p or "").replace("\\", "/") - # kein führender Slash (relativ bleiben) while p.startswith("/"): p = p[1:] return p @@ -90,12 +70,10 @@ def _normalize_rel_path(p: str) -> str: # ---------------------- Overlap & Offsets ---------------------------- # -def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int]]: +def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]: """ - Ermittelt nicht-überlappende Segmente zu einer geordneten Folge von Fenster-Strings. - Gibt (segments, overlaps_left) zurück, wobei: - - segments[i] = Fenster[i] ohne das vorangestellte Overlap - - overlaps_left[i] = Länge des Overlaps von Fenster[i] zum bisher rekonstruierten Text + Entfernt linkes Overlap aus echten Fenster-Strings. + Rückgabe: (segments, overlaps_left, reconstructed_text) """ segments: List[str] = [] overlaps_left: List[int] = [] @@ -104,7 +82,6 @@ def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int w = w or "" max_k = min(len(w), len(reconstructed)) k = 0 - # Suche von groß nach klein (einfach, robust bei kurzen Fenstern) for cand in range(max_k, -1, -1): if reconstructed.endswith(w[:cand]): k = cand @@ -113,19 +90,17 @@ def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int segments.append(seg) overlaps_left.append(k) reconstructed += seg - return segments, overlaps_left + return segments, overlaps_left, reconstructed -def _overlap_len_suffix_prefix(a_suffix: str, b_prefix: str, max_probe: int = 4096) -> int: - """ - Länge des längsten Suffixes von a_suffix, der Prefix von b_prefix ist (bruteforce, begrenzt). - """ - if not a_suffix or not b_prefix: +def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int: + """Länge längsten Suffix(a), der Prefix(b) ist.""" + if not a or not b: return 0 - a = a_suffix[-max_probe:] - b = b_prefix[:max_probe] - n = min(len(a), len(b)) + a1 = a[-max_probe:] + b1 = b[:max_probe] + n = min(len(a1), len(b1)) for k in range(n, 0, -1): - if a[-k:] == b[:k]: + if a1[-k:] == b1[:k]: return k return 0 @@ -139,53 +114,85 @@ def make_chunk_payloads( note_text: Optional[str] = None, ) -> List[Dict[str, Any]]: """ - Baut Payload-Dicts pro Chunk (kompatibel zu v2.0.1, erweitert). - - Rückgabe-Felder (wichtigste): - note_id, chunk_id, id, chunk_index, seq, path, - window, text, start, end, overlap_left, overlap_right, - token_count?, section_title?, section_path?, type?, title?, tags? + Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden, + erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap. """ note_id = str(frontmatter.get("id") or "").strip() - note_type = frontmatter.get("type", None) + note_type = str(frontmatter.get("type", "")).lower() note_title = frontmatter.get("title", None) note_tags = frontmatter.get("tags", None) rel_path = _normalize_rel_path(rel_path) - # 1) Fenstertexte & Sequenzen & (falls vorhanden) vorgegebene IDs sammeln - windows: List[str] = [] + # 1) Rohdaten sammeln (so wie geliefert) + chunks_list = list(chunks) + raw_windows: List[str] = [] seqs: List[int] = [] ids_in: List[Optional[str]] = [] token_counts: List[Optional[int]] = [] section_titles: List[Optional[str]] = [] section_paths: List[Optional[str]] = [] + any_explicit_window = False - chunks_list = list(chunks) # falls Iterator for idx, c in enumerate(chunks_list): - windows.append(_as_window_text(c)) + # Fensterquelle + w = _get_attr_or_key(c, "window", None) + if isinstance(w, str) and w: + any_explicit_window = True + raw_windows.append(w) + else: + raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz + # Ordnung seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) - # übernehme deterministische ID (falls vom Chunker geliefert) + # IDs, Tokens, Sektionen cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) ids_in.append(str(cid) if isinstance(cid, str) and cid else None) - token_counts.append(_to_int(_get_attr_or_key(c, "token_count", None), 0) if _get_attr_or_key(c, "token_count", None) is not None else None) + tc = _get_attr_or_key(c, "token_count", None) + token_counts.append(_to_int(tc, 0) if tc is not None else None) section_titles.append(_get_attr_or_key(c, "section_title", None)) section_paths.append(_get_attr_or_key(c, "section_path", None)) - # 2) Nicht-überlappende Segmente berechnen - segments, overlaps_left = _dedupe_windows_to_segments(windows) + # 2) Segmente & Overlaps bestimmen + if any_explicit_window: + # Es existieren echte Fenster → dedupe, um Kernsegmente zu finden + segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows) + windows_final = raw_windows[:] # bereits mit Overlap geliefert + else: + # Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher), + # wir erzeugen synthetische Fenster mit Overlap gemäß Typ + segments = [w or "" for w in raw_windows] + overlaps_left = [] + windows_final = [] + recon = "" + try: + overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60))) + except Exception: + overlap_low, overlap_high = (40, 60) + overlap_target = int(overlap_low) - # 3) overlap_right berechnen (Blick auf nächstes Fenster) + for i, seg in enumerate(segments): + if i == 0: + # erstes Fenster: kein linker Kontext + windows_final.append(seg) + overlaps_left.append(0) + recon += seg + else: + # synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts + k = min(overlap_target, len(recon)) + left_ctx = recon[-k:] if k > 0 else "" + windows_final.append(left_ctx + seg) + overlaps_left.append(k) + recon += seg # Rekonstruktion bleibt kerntreu + + # 3) overlap_right bestimmen overlaps_right: List[int] = [] - for i in range(len(windows)): - if i + 1 < len(windows): - ov = _overlap_len_suffix_prefix(windows[i], windows[i + 1], max_probe=4096) + for i in range(len(windows_final)): + if i + 1 < len(windows_final): + ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096) else: ov = 0 overlaps_right.append(ov) - # 4) start/end Offsets bestimmen - # - wenn note_text vorhanden: vorwärtsgerichtetes Matching ab der letzten Endposition - # - sonst kumulativ (wie in v2.0.1) + # 4) start/end-Offsets (exakt via note_text, sonst kumulativ) starts: List[int] = [0] * len(segments) ends: List[int] = [0] * len(segments) pos = 0 @@ -211,67 +218,53 @@ def make_chunk_payloads( pos += len(seg) ends[i] = pos - # 5) Payload-Dicts zusammenstellen + # 5) Payload-Dicts payloads: List[Dict[str, Any]] = [] - for i, (win, seg) in enumerate(zip(windows, segments)): - # finale chunk_id: bevorzugt deterministische Vorgabe, sonst Fallback + for i, (win, seg) in enumerate(zip(windows_final, segments)): chunk_id = ids_in[i] or f"{note_id}#{i+1}" - pl: Dict[str, Any] = { - # Identität "note_id": note_id, "chunk_id": chunk_id, - "id": chunk_id, # Alias für Abwärtskompatibilität - - # Ordnung + "id": chunk_id, # Alias "chunk_index": i, "seq": seqs[i], - - # Pfad "path": rel_path, - - # Texte - "window": win, # mit Overlap - "text": seg, # Overlap entfernt - - # Offsets & Overlaps + "window": win, + "text": seg, "start": starts[i], "end": ends[i], "overlap_left": overlaps_left[i], "overlap_right": overlaps_right[i], } - - # optionale Metafelder aus Note / Chunk - if note_type is not None: + # optionale Metadaten + if note_type: pl["type"] = note_type if note_title is not None: pl["title"] = note_title if note_tags is not None: pl["tags"] = note_tags - if token_counts[i] is not None: pl["token_count"] = int(token_counts[i]) if section_titles[i] is not None: pl["section_title"] = section_titles[i] if section_paths[i] is not None: - # normalisiere section_path minimal sp = str(section_paths[i]).replace("\\", "/") pl["section_path"] = sp if sp else "/" - payloads.append(pl) return payloads -# __main__ (optionale Mini-Demo) +# __main__ Demo (optional) if __name__ == "__main__": # pragma: no cover - demo_fm = {"id": "demo", "title": "Demo", "type": "concept"} - demo_chunks = [ - {"id": "demo#1", "text": "Alpha Beta Gamma", "token_count": 3, "section_title": "Intro", "section_path": "/intro"}, - {"id": "demo#2", "text": "Gamma Delta", "token_count": 2, "section_title": "Teil 1", "section_path": "/teil-1"}, - {"id": "demo#3", "text": "Delta Epsilon Zeta", "token_count": 3, "section_title": "Teil 2", "section_path": "/teil-2"}, + fm = {"id": "demo", "title": "Demo", "type": "concept"} + # Beispiel ohne echte Fenster → erzeugt synthetische Overlaps + chunks = [ + {"id": "demo#1", "text": "Alpha Beta Gamma"}, + {"id": "demo#2", "text": "Gamma Delta"}, + {"id": "demo#3", "text": "Delta Epsilon Zeta"}, ] - pls = make_chunk_payloads(demo_fm, "x/demo.md", demo_chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta") + pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta") from pprint import pprint pprint(pls) recon = "".join(p["text"] for p in pls)