diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py index 95e2fad..6bc866d 100644 --- a/app/core/chunking/chunking_parser.py +++ b/app/core/chunking/chunking_parser.py @@ -1,6 +1,7 @@ """ FILE: app/core/chunking/chunking_parser.py -DESCRIPTION: Zerlegt Markdown in Blöcke. Hält H1-Überschriften im Stream. +DESCRIPTION: Zerlegt Markdown in Blöcke. Hält H1-Überschriften im Stream + und optimiert die Block-Trennung für atomares Chunking. """ import re from typing import List, Tuple, Set @@ -24,7 +25,8 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: # H1 für Note-Titel extrahieren (Metadaten) h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) - if h1_match: h1_title = h1_match.group(1).strip() + if h1_match: + h1_title = h1_match.group(1).strip() lines = text_without_fm.split('\n') buffer = [] @@ -45,7 +47,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: level = len(heading_match.group(1)) title = heading_match.group(2).strip() - # Pfad- und Titel-Update + # Pfad- und Titel-Update für die Metadaten der folgenden Blöcke if level == 1: current_section_title = title section_path = "/" @@ -53,10 +55,12 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: current_section_title = title section_path = f"/{current_section_title}" + # Die Überschrift selbst als Block hinzufügen (Fix: H1 wird nicht mehr gefiltert) blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title)) continue - if not stripped and not line.startswith('>'): # Leerzeilen (außer in Callouts) trennen Blöcke + # Leerzeilen trennen Blöcke, außer innerhalb von Callouts + if not stripped and not line.startswith('>'): if buffer: content = "\n".join(buffer).strip() if content: diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index 1a15bba..af19d4d 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,6 +1,7 @@ """ FILE: app/core/chunking/chunking_strategies.py DESCRIPTION: Strategien für atomares Sektions-Chunking (WP-15b konform). + Fix: Vorausschauende Trennung zur Wahrung von Sektionsgrenzen. """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk @@ -18,6 +19,7 @@ def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ Gruppiert Blöcke zu Sektionen und hält diese atomar zusammen. + Nutzt Look-Ahead, um Sektions-Überhänge zu vermeiden. """ strict = config.get("strict_heading_split", False) target = config.get("target", 400) @@ -47,25 +49,38 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: main_path = buf[0].section_path full_text = "\n\n".join([b.text for b in buf]) + # Falls die gruppierten Sektionen in das Limit passen if estimate_tokens(full_text) <= max_tokens: _add_to_chunks(full_text, main_title, main_path) else: # Nur wenn eine Sektion ALLEINE zu groß ist, wird intern gesplittet sents = split_sentences(full_text) cur_sents = []; sub_len = 0 + # Kontext-Sicherung: Heading für Teil-Chunks merken + header_text = buf[0].text if buf[0].kind == "heading" else "" + for s in sents: slen = estimate_tokens(s) if sub_len + slen > target and cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path) - # Overlap Logic - ov_s = []; ov_l = 0 + + # Overlap-Erzeugung und Header-Injektion für Folgeschritte + ov_s = [header_text] if header_text else [] + ov_l = estimate_tokens(header_text) if header_text else 0 for os in reversed(cur_sents): - if ov_l + estimate_tokens(os) < overlap: - ov_s.insert(0, os); ov_l += estimate_tokens(os) + if os == header_text: continue + t_len = estimate_tokens(os) + if ov_l + t_len < overlap: + ov_s.insert(len(ov_s)-1 if header_text else 0, os) + ov_l += t_len else: break - cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen - else: cur_sents.append(s); sub_len += slen - if cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path) + cur_sents = list(ov_s); cur_sents.append(s) + sub_len = ov_l + slen + else: + cur_sents.append(s); sub_len += slen + + if cur_sents: + _add_to_chunks(" ".join(cur_sents), main_title, main_path) buf = []; cur_tokens = 0 @@ -74,7 +89,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: curr_sec: List[RawBlock] = [] for b in blocks: - # Ein Split-Trigger startet eine neue Sektion + # Ein Split-Trigger (H1 oder H2) startet eine neue atomare Sektion if b.kind == "heading" and b.level <= split_level: if curr_sec: sections.append(curr_sec) curr_sec = [b] @@ -82,16 +97,15 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: curr_sec.append(b) if curr_sec: sections.append(curr_sec) - # SCHRITT 2: Verarbeitung der Sektionen mit Vorausschau + # SCHRITT 2: Verarbeitung der Sektionen mit Vorausschau (Look-Ahead) for sec in sections: - # Token-Schätzung für die gesamte Sektion inkl. Newline-Overhead sec_text = "\n\n".join([b.text for b in sec]) sec_tokens = estimate_tokens(sec_text) if buf: - # Passt die Sektion noch in den aktuellen Chunk? + # VORAUSSCHAU: Würde die neue Sektion das Limit sprengen? if cur_tokens + sec_tokens > max_tokens: - _flush_buffer() + _flush_buffer() # Beende den aktuellen Chunk sauber VOR der neuen Sektion # Wenn strict: Jede neue Sektion auf split_level erzwingt neuen Chunk elif strict and sec[0].kind == "heading" and sec[0].level == split_level: _flush_buffer() @@ -102,7 +116,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: buf.extend(sec) cur_tokens += sec_tokens - # Falls der Puffer (selbst nach flush) durch eine Riesen-Sektion zu groß ist + # Falls eine einzelne Sektion nach dem Flush (oder als erste) schon zu groß ist if cur_tokens >= max_tokens: _flush_buffer() @@ -110,8 +124,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: return chunks def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: - # (Identische Korrektur wie oben für Sliding Window, falls benötigt) - # Hier halten wir es einfach: Blöcke nacheinander bis target. + """Basis-Sliding-Window für flache Texte ohne Sektionsfokus.""" target = config.get("target", 400) max_tokens = config.get("max", 600)