From 838083b9095710154a6bd7f1d8f3a987b07aafc0 Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 29 Dec 2025 20:33:43 +0100 Subject: [PATCH] =?UTF-8?q?Verbesserung=20des=20Chunking-Parsers=20zur=20U?= =?UTF-8?q?nterst=C3=BCtzung=20von=20H1-=C3=9Cberschriften=20und=20Anpassu?= =?UTF-8?q?ng=20der=20Metadatenlogik.=20Implementierung=20einer=20atomaren?= =?UTF-8?q?=20Sektions-Chunking-Strategie,=20die=20=C3=9Cberschriften=20un?= =?UTF-8?q?d=20deren=20Inhalte=20zusammenh=C3=A4lt.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunking/chunking_parser.py | 45 +++++++++-------- app/core/chunking/chunking_strategies.py | 64 +++++++++++++----------- 2 files changed, 61 insertions(+), 48 deletions(-) diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py index 3d56f55..2ec45bc 100644 --- a/app/core/chunking/chunking_parser.py +++ b/app/core/chunking/chunking_parser.py @@ -17,53 +17,58 @@ def split_sentences(text: str) -> list[str]: return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()] def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: - """Zerlegt Text in logische Einheiten.""" + """Zerlegt Text in logische Einheiten, inklusive H1.""" blocks = [] - h1_title = "Dokument"; section_path = "/"; current_h2 = None + h1_title = "Dokument"; section_path = "/"; current_section_title = None fm, text_without_fm = extract_frontmatter_from_text(md_text) + + # H1 für Note-Metadaten extrahieren h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() + lines = text_without_fm.split('\n') buffer = [] for line in lines: stripped = line.strip() - # H1 ignorieren (ist Doc Title) - if stripped.startswith('# '): - continue - - # Generische Heading-Erkennung (H2 bis H6) für flexible Split-Levels - heading_match = re.match(r'^(#{2,6})\s+(.*)', stripped) + # Heading-Erkennung (H1 bis H6) + heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped) if heading_match: - # Buffer leeren (vorherigen Text abschließen) if buffer: content = "\n".join(buffer).strip() - if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) + if content: + blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) buffer = [] level = len(heading_match.group(1)) title = heading_match.group(2).strip() - # Pfad-Logik: H2 setzt den Haupt-Pfad - if level == 2: - current_h2 = title - section_path = f"/{current_h2}" - # Bei H3+ bleibt der section_path beim Parent, aber das Level wird korrekt gesetzt + # Metadaten-Update + if level == 1: + current_section_title = title + section_path = "/" + elif level == 2: + current_section_title = title + section_path = f"/{current_section_title}" - blocks.append(RawBlock("heading", stripped, level, section_path, current_h2)) - - elif not stripped: + blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title)) + continue + + if not stripped: if buffer: content = "\n".join(buffer).strip() - if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) + if content: + blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) buffer = [] else: buffer.append(line) if buffer: content = "\n".join(buffer).strip() - if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) + if content: + blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) + return blocks, h1_title def parse_edges_robust(text: str) -> Set[str]: diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index 5e09512..3c939ec 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -20,7 +20,8 @@ def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ - Splittet Text basierend auf Markdown-Überschriften mit atomarem Block-Erhalt. + Implementiert atomares Sektions-Chunking. + Hält Überschriften und ihren Inhalt (inkl. Edges) zusammen. """ strict = config.get("strict_heading_split", False) target = config.get("target", 400) @@ -45,23 +46,21 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: def _flush(): nonlocal buf, cur_tokens if not buf: return - - # Metadaten stammen immer vom ersten Block im Puffer (meist die Überschrift) main_title = buf[0].section_title main_path = buf[0].section_path full_text = "\n\n".join([b.text for b in buf]) - # Falls der gesamte Puffer in einen Chunk passt if estimate_tokens(full_text) <= max_tokens: _add_to_chunks(full_text, main_title, main_path) else: - # Nur wenn ein einzelner Abschnitt größer als 'max' ist, wird intern gesplittet + # Fallback: Nur wenn eine Sektion ALLEINE zu groß ist, wird intern gesplittet sents = split_sentences(full_text) cur_sents = []; sub_len = 0 for s in sents: slen = estimate_tokens(s) if sub_len + slen > target and cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path) + # Overlap-Logik... ov_s = []; ov_l = 0 for os in reversed(cur_sents): if ov_l + estimate_tokens(os) < overlap: @@ -70,34 +69,43 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen else: cur_sents.append(s); sub_len += slen if cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path) - buf = []; cur_tokens = 0 + # SCHRITT 1: Gruppierung in atomare Sektions-Einheiten + sections = [] + curr_sec = [] for b in blocks: - b_tokens = estimate_tokens(b.text) - - # Prüfung auf Split-Trigger (Überschriften) - is_split_trigger = False - if b.kind == "heading": - if b.level < split_level: - is_split_trigger = True - elif b.level == split_level: - if strict or cur_tokens >= target: - is_split_trigger = True - - if is_split_trigger: - _flush() # Vorherigen Puffer leeren - buf.append(b) # Neue Überschrift in den neuen Puffer aufnehmen - cur_tokens = b_tokens + # Ein Split-Trigger startet eine neue Sektion + if b.kind == "heading" and b.level <= split_level: + if curr_sec: sections.append(curr_sec) + curr_sec = [b] else: - # Atomarer Check: Wenn der neue Block den aktuellen Chunk sprengen würde - if cur_tokens + b_tokens > max_tokens and buf: - _flush() # Puffer leeren, Block 'b' wird Teil des nächsten Chunks - - buf.append(b) - cur_tokens += b_tokens + curr_sec.append(b) + if curr_sec: sections.append(curr_sec) - _flush() # Letzten Puffer leeren + # SCHRITT 2: Verarbeitung der Sektionen mit Vorausschau + for sec in sections: + sec_tokens = sum(estimate_tokens(b.text) for b in sec) + + if buf: + # PRÜFUNG: Passt die gesamte Sektion noch in den aktuellen Chunk? + if cur_tokens + sec_tokens > max_tokens: + _flush() + # PRÜFUNG: Harter Split gefordert? + elif strict: + _flush() + # PRÜFUNG: Weicher Split (Target erreicht)? + elif cur_tokens >= target: + _flush() + + buf.extend(sec) + cur_tokens += sec_tokens + + # Falls die Sektion selbst das Limit sprengt, sofort flashen + if cur_tokens >= max_tokens: + _flush() + + _flush() return chunks def strategy_sliding_window(blocks: List[RawBlock],