diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index b0c2a76..3bcc0cc 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -2,66 +2,93 @@ # -*- coding: utf-8 -*- """ Modul: app/core/chunk_payload.py -Version: 2.0.1 -Datum: 2025-09-30 +Version: 2.1.0 +Datum: 2025-10-01 Zweck ----- -Erzeugt Chunk-Payloads für Qdrant. Unterstützt abwärtskompatibel bisherige Felder und -ergänzt Felder für **verlustfreie Rekonstruktion** bei überlappenden Fenstern: +Erzeugt Chunk-Payloads für Qdrant. Voll abwärtskompatibel zu v2.0.1 und erweitert um: +- Übernahme vorhandener deterministischer Chunk-IDs (falls vom Chunker geliefert). +- Durchreichen von token_count, section_title, section_path (falls vorhanden). +- Exaktere start/end-Offsets, wenn der vollständige Note-Body (note_text) vorliegt. +- Berechnung von overlap_right (nicht mehr pauschal 0). - - text : effektiver, nicht-überlappender Segmenttext (für Rekonstruktion) - - window : Fenstertext inkl. Overlap (für Embeddings) - - start, end : absolute Offsets (0-basiert) des effektiven Segments im Gesamtkorpus - - overlap_left : Anzahl überlappender Zeichen zum **vorigen** Fenster - - overlap_right : Anzahl überlappender Zeichen zum **nächsten** Fenster +Felder (wie 2.0.1, beibehalten): + - window : Fenstertext inkl. Overlap (für Embeddings & Link-Erkennung) + - text : effektiver Segmenttext ohne linkes Overlap (für Rekonstruktion) + - start, end : Offsets des Segments im Gesamtkorpus (0-basiert, [start, end)) + - overlap_left : Zeichen-Overlap zum VORHERIGEN Fenster + - overlap_right : Zeichen-Overlap zum NÄCHSTEN Fenster + - note_id, chunk_id, id (Alias), chunk_index, seq, path, type, title, tags -Abwärtskompatible Aliasse: - - id : == chunk_id (wird u. a. von build_edges_for_note erwartet) - - content/raw : bleiben leer; Primärfelder sind window/text +Abwärtskompatible Aliasse / Verhalten: + - 'id' == 'chunk_id' (Edges nutzen häufig 'id'). + - Fensterquelle wird robust aus ('window'|'text'|'content'|'raw') gelesen. + - Pfad bleibt relativ und nutzt forward slashes. -Typische Nutzung ----------------- +Nutzung +------- from app.core.chunk_payload import make_chunk_payloads - payloads = make_chunk_payloads(frontmatter, rel_path, chunks, note_text=full_body) + pls = make_chunk_payloads(frontmatter, rel_path, chunks, note_text=full_body) -`chunks` ist eine Sequenz von Objekten oder Dicts, die mindestens ein Fenster enthalten: - c.text ODER c.content ODER c.raw (falls Objekt) - bzw. c["text"] ODER c["content"] ODER c["raw"] (falls Dict) +Parameter +--------- +- frontmatter : dict – erwartet mind. 'id' (Note-ID); optional 'title','type','tags' +- rel_path : str – relativer Pfad der Note im Vault (forward slashes, ohne führenden '/') +- chunks : Seq[object|dict] +- note_text : str|None – optional kompletter Body für exakte Offsets + +Hinweis +------ +Dein Chunker liefert bereits Fenster mit vorangestelltem Overlap. Wir ermitteln den +linken Overlap gegen den bisher rekonstruierten Text und entfernen ihn in 'text'. + +Lizenz: MIT (projektintern) """ from __future__ import annotations from typing import Any, Dict, Iterable, List, Optional, Tuple, Union + # ------------------------------- Utils ------------------------------- # -def _as_text(window_candidate: Any) -> str: - """Extrahiert Fenstertext aus beliebigem Chunk-Objekt/Dikt.""" - if window_candidate is None: - return "" - # Objekt mit Attributen - if not isinstance(window_candidate, dict): - for k in ("window", "text", "content", "raw"): - v = getattr(window_candidate, k, None) - if isinstance(v, str) and v: - return v - else: - for k in ("window", "text", "content", "raw"): - v = window_candidate.get(k) - if isinstance(v, str) and v: - return v - # Fallback: string-repr - if isinstance(window_candidate, str): - return window_candidate +def _get_attr_or_key(obj: Any, key: str, default=None): + """Liest Feld 'key' aus Objekt ODER Dict; sonst default.""" + if obj is None: + return default + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + +def _as_window_text(chunk: Any) -> str: + """ + Extrahiert den Fenstertext robust aus Chunk (Objekt oder Dict). + Bevorzugt 'window', dann 'text', 'content', 'raw'. + """ + for k in ("window", "text", "content", "raw"): + v = _get_attr_or_key(chunk, k, None) + if isinstance(v, str) and v: + return v + # Fallback: falls der Chunk selbst ein String ist (exotisch) + if isinstance(chunk, str): + return chunk return "" -def _get_int(x: Any, default: int = 0) -> int: +def _to_int(x: Any, default: int = 0) -> int: try: return int(x) except Exception: return default -# ---------------------- Overlap-Dedupe Algorithmus ------------------- # +def _normalize_rel_path(p: str) -> str: + p = (p or "").replace("\\", "/") + # kein führender Slash (relativ bleiben) + while p.startswith("/"): + p = p[1:] + return p + + +# ---------------------- Overlap & Offsets ---------------------------- # def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int]]: """ @@ -77,7 +104,7 @@ def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int w = w or "" max_k = min(len(w), len(reconstructed)) k = 0 - # Suche von groß nach klein (einfach, ausreichend bei kurzen Fenstern) + # Suche von groß nach klein (einfach, robust bei kurzen Fenstern) for cand in range(max_k, -1, -1): if reconstructed.endswith(w[:cand]): k = cand @@ -88,6 +115,21 @@ def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int reconstructed += seg return segments, overlaps_left +def _overlap_len_suffix_prefix(a_suffix: str, b_prefix: str, max_probe: int = 4096) -> int: + """ + Länge des längsten Suffixes von a_suffix, der Prefix von b_prefix ist (bruteforce, begrenzt). + """ + if not a_suffix or not b_prefix: + return 0 + a = a_suffix[-max_probe:] + b = b_prefix[:max_probe] + n = min(len(a), len(b)) + for k in range(n, 0, -1): + if a[-k:] == b[:k]: + return k + return 0 + + # ----------------------------- Public API ---------------------------- # def make_chunk_payloads( @@ -97,68 +139,100 @@ def make_chunk_payloads( note_text: Optional[str] = None, ) -> List[Dict[str, Any]]: """ - Baut Payload-Dicts pro Chunk. + Baut Payload-Dicts pro Chunk (kompatibel zu v2.0.1, erweitert). - Parameter - --------- - frontmatter : dict – erwartete Keys: id (note_id), title, type, tags (optional) - rel_path : str – relativer Pfad der Note im Vault - chunks : iter – Sequenz von Chunk-Objekten/-Dicts mit Fenstertext - note_text : str? – optionaler Gesamtkorpus (Body) für exakte Offsets - - Rückgabe - -------- - Liste von Payload-Dicts. Wichtige Felder: - note_id, id, chunk_id, chunk_index, seq, path, text, window, - start, end, overlap_left, overlap_right, type, title, tags + Rückgabe-Felder (wichtigste): + note_id, chunk_id, id, chunk_index, seq, path, + window, text, start, end, overlap_left, overlap_right, + token_count?, section_title?, section_path?, type?, title?, tags? """ note_id = str(frontmatter.get("id") or "").strip() note_type = frontmatter.get("type", None) note_title = frontmatter.get("title", None) note_tags = frontmatter.get("tags", None) + rel_path = _normalize_rel_path(rel_path) - # 1) Fenstertexte + Sequenzen extrahieren + # 1) Fenstertexte & Sequenzen & (falls vorhanden) vorgegebene IDs sammeln windows: List[str] = [] seqs: List[int] = [] - for idx, c in enumerate(chunks): - windows.append(_as_text(c)) - if isinstance(c, dict): - s = c.get("seq", c.get("chunk_index", idx)) - else: - s = getattr(c, "seq", getattr(c, "chunk_index", idx)) - seqs.append(_get_int(s, idx)) + ids_in: List[Optional[str]] = [] + token_counts: List[Optional[int]] = [] + section_titles: List[Optional[str]] = [] + section_paths: List[Optional[str]] = [] + + chunks_list = list(chunks) # falls Iterator + for idx, c in enumerate(chunks_list): + windows.append(_as_window_text(c)) + seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) + # übernehme deterministische ID (falls vom Chunker geliefert) + cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) + ids_in.append(str(cid) if isinstance(cid, str) and cid else None) + token_counts.append(_to_int(_get_attr_or_key(c, "token_count", None), 0) if _get_attr_or_key(c, "token_count", None) is not None else None) + section_titles.append(_get_attr_or_key(c, "section_title", None)) + section_paths.append(_get_attr_or_key(c, "section_path", None)) # 2) Nicht-überlappende Segmente berechnen segments, overlaps_left = _dedupe_windows_to_segments(windows) - overlaps_right = [0] * len(segments) # optional: später präzisieren - # 3) Offsets bestimmen (ohne/mit note_text gleich: kumulativ) + # 3) overlap_right berechnen (Blick auf nächstes Fenster) + overlaps_right: List[int] = [] + for i in range(len(windows)): + if i + 1 < len(windows): + ov = _overlap_len_suffix_prefix(windows[i], windows[i + 1], max_probe=4096) + else: + ov = 0 + overlaps_right.append(ov) + + # 4) start/end Offsets bestimmen + # - wenn note_text vorhanden: vorwärtsgerichtetes Matching ab der letzten Endposition + # - sonst kumulativ (wie in v2.0.1) starts: List[int] = [0] * len(segments) ends: List[int] = [0] * len(segments) pos = 0 - for i, seg in enumerate(segments): - starts[i] = pos - pos += len(seg) - ends[i] = pos + if isinstance(note_text, str) and note_text: + search_pos = 0 + for i, seg in enumerate(segments): + if not seg: + starts[i] = ends[i] = search_pos + continue + j = note_text.find(seg, search_pos) + if j >= 0: + starts[i] = j + ends[i] = j + len(seg) + search_pos = ends[i] + else: + # Fallback: kumulativ + starts[i] = pos + pos += len(seg) + ends[i] = pos + else: + for i, seg in enumerate(segments): + starts[i] = pos + pos += len(seg) + ends[i] = pos - # 4) Payload-Dicts zusammenstellen + # 5) Payload-Dicts zusammenstellen payloads: List[Dict[str, Any]] = [] for i, (win, seg) in enumerate(zip(windows, segments)): - chunk_id = f"{note_id}#{i+1}" + # finale chunk_id: bevorzugt deterministische Vorgabe, sonst Fallback + chunk_id = ids_in[i] or f"{note_id}#{i+1}" + pl: Dict[str, Any] = { # Identität "note_id": note_id, "chunk_id": chunk_id, - "id": chunk_id, # <— WICHTIG: Alias für Abwärtskompatibilität (Edges erwarten 'id') + "id": chunk_id, # Alias für Abwärtskompatibilität - # Indexierung + # Ordnung "chunk_index": i, "seq": seqs[i], - "path": rel_path.replace("\\", "/").lstrip("/"), + + # Pfad + "path": rel_path, # Texte - "window": win, # für Embeddings (inkl. Overlap) - "text": seg, # überlappungsfreier Anteil für exakte Rekonstruktion + "window": win, # mit Overlap + "text": seg, # Overlap entfernt # Offsets & Overlaps "start": starts[i], @@ -166,6 +240,8 @@ def make_chunk_payloads( "overlap_left": overlaps_left[i], "overlap_right": overlaps_right[i], } + + # optionale Metafelder aus Note / Chunk if note_type is not None: pl["type"] = note_type if note_title is not None: @@ -173,18 +249,27 @@ def make_chunk_payloads( if note_tags is not None: pl["tags"] = note_tags + if token_counts[i] is not None: + pl["token_count"] = int(token_counts[i]) + if section_titles[i] is not None: + pl["section_title"] = section_titles[i] + if section_paths[i] is not None: + # normalisiere section_path minimal + sp = str(section_paths[i]).replace("\\", "/") + pl["section_path"] = sp if sp else "/" + payloads.append(pl) return payloads -# __main__ (optional: Mini-Demo) +# __main__ (optionale Mini-Demo) if __name__ == "__main__": # pragma: no cover demo_fm = {"id": "demo", "title": "Demo", "type": "concept"} demo_chunks = [ - {"text": "Alpha Beta Gamma"}, - {"text": "Gamma Delta"}, - {"text": "Delta Epsilon Zeta"}, + {"id": "demo#1", "text": "Alpha Beta Gamma", "token_count": 3, "section_title": "Intro", "section_path": "/intro"}, + {"id": "demo#2", "text": "Gamma Delta", "token_count": 2, "section_title": "Teil 1", "section_path": "/teil-1"}, + {"id": "demo#3", "text": "Delta Epsilon Zeta", "token_count": 3, "section_title": "Teil 2", "section_path": "/teil-2"}, ] pls = make_chunk_payloads(demo_fm, "x/demo.md", demo_chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta") from pprint import pprint