From b1a897e51cf899bd42f736d25cbffda77ea53e40 Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 29 Dec 2025 21:26:05 +0100 Subject: [PATCH] =?UTF-8?q?Verbesserung=20des=20Chunking-Parsers=20zur=20U?= =?UTF-8?q?nterst=C3=BCtzung=20aller=20=C3=9Cberschriften=20(H1-H6)=20und?= =?UTF-8?q?=20Optimierung=20der=20Block-Trennung=20f=C3=BCr=20atomares=20S?= =?UTF-8?q?ektions-Chunking.=20Aktualisierung=20der=20Sektions-Chunking-St?= =?UTF-8?q?rategie=20mit=20striktem=20Look-Ahead=20und=20pr=C3=A4ziserer?= =?UTF-8?q?=20Token-Sch=C3=A4tzung=20f=C3=BCr=20eine=20verbesserte=20Handh?= =?UTF-8?q?abung=20von=20gro=C3=9Fen=20Bl=C3=B6cken.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunking/chunking_parser.py | 24 +-- app/core/chunking/chunking_strategies.py | 180 ++++++++++------------- 2 files changed, 87 insertions(+), 117 deletions(-) diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py index 72d696d..ca67598 100644 --- a/app/core/chunking/chunking_parser.py +++ b/app/core/chunking/chunking_parser.py @@ -1,7 +1,7 @@ """ FILE: app/core/chunking/chunking_parser.py -DESCRIPTION: Zerlegt Markdown in Blöcke. Erhält H1-Überschriften und - gewährleistet die Integrität von Callouts und Listen. +DESCRIPTION: Zerlegt Markdown in logische Blöcke. Hält H1-Überschriften im Stream + und optimiert die Block-Trennung für atomares Sektions-Chunking. """ import re from typing import List, Tuple, Set @@ -12,13 +12,14 @@ _WS = re.compile(r'\s+') _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') def split_sentences(text: str) -> list[str]: - """Teilt Text in Sätze auf.""" + """Teilt Text in Sätze auf unter Berücksichtigung deutscher Interpunktion.""" text = _WS.sub(' ', text.strip()) if not text: return [] + # Splittet bei Satzzeichen, gefolgt von Leerzeichen und Großbuchstaben return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()] def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: - """Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1.""" + """Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1-H6.""" blocks = [] h1_title = "Dokument" section_path = "/" @@ -27,7 +28,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: # Frontmatter entfernen fm, text_without_fm = extract_frontmatter_from_text(md_text) - # H1 für Note-Titel extrahieren + # H1 für Note-Metadaten extrahieren h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() @@ -41,7 +42,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: # Heading-Erkennung (H1 bis H6) heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped) if heading_match: - # Vorherigen Text-Block abschließen + # Vorherigen Block abschließen if buffer: content = "\n".join(buffer).strip() if content: @@ -51,7 +52,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: level = len(heading_match.group(1)) title = heading_match.group(2).strip() - # Pfad- und Titel-Update für die Metadaten + # Update der Pfad-Metadaten für die folgenden Blöcke if level == 1: current_section_title = title section_path = "/" @@ -59,11 +60,11 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: current_section_title = title section_path = f"/{current_section_title}" - # Die Überschrift als Block hinzufügen (H1 wird NICHT mehr gefiltert) + # Die Überschrift als regulären Block hinzufügen (Fix: H1 bleibt im Text) blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title)) continue - # Trenner oder Leerzeilen beenden einen Block + # Trenner (---) oder Leerzeilen beenden Blöcke, außer in Callouts if (not stripped or stripped == "---") and not line.startswith('>'): if buffer: content = "\n".join(buffer).strip() @@ -75,6 +76,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: else: buffer.append(line) + # Letzten Puffer leeren if buffer: content = "\n".join(buffer).strip() if content: @@ -85,7 +87,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: def parse_edges_robust(text: str) -> Set[str]: """Extrahiert Kanten-Kandidaten aus Wikilinks und Callouts.""" found_edges = set() - # 1. Inline Wikilinks [[rel:kind|target]] + # 1. Wikilinks [[rel:kind|target]] inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) for kind, target in inlines: k = kind.strip().lower() @@ -100,12 +102,10 @@ def parse_edges_robust(text: str) -> Set[str]: callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) if callout_match: current_edge_type = callout_match.group(1).strip().lower() - # Links in der gleichen Zeile links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") continue - # Links in Folgezeilen des Callouts if current_edge_type and stripped.startswith('>'): links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index 09ed198..9e2943a 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,21 +1,20 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Strategien für atomares Sektions-Chunking v3.3.6. - AUDIT: 100% Konformität zur 'by_heading' Spezifikation. - - Block-Aware Flushing: Trennung nur an Blockgrenzen. - - Atomic Section Vorausschau: Verhindert Sektions-Zerreißung. +DESCRIPTION: Strategien für atomares Sektions-Chunking v3.4.1. + Garantiert Sektions-Integrität (Atomic Units) durch Look-Ahead. """ import math from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk +from .chunking_utils import estimate_tokens from .chunking_parser import split_sentences def _safe_estimate_tokens(text: str) -> int: - """Konservative Token-Schätzung für deutschen Text (len/3 statt len/4).""" - return max(1, math.ceil(len(text.strip()) / 3)) + """Konservative Schätzung für MD und deutsche Texte (len/2.8).""" + return max(1, math.ceil(len(text.strip()) / 2.8)) def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> str: - """Baut den Breadcrumb-Kontext für das Embedding-Fenster.""" + """Baut den Breadcrumb-Kontext für das Embedding-Fenster (H1 > H2).""" parts = [] if doc_title: parts.append(doc_title) if sec_title and sec_title != doc_title: parts.append(sec_title) @@ -24,8 +23,8 @@ def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ - Gruppiert Blöcke zu Sektionen und hält diese atomar zusammen. - Nutzt Block-Aware-Flushing, um Sektionsgrenzen strikt zu wahren. + Sektions-Chunking: Behandelt Abschnitte als unteilbare Einheiten. + Schiebt ganze Abschnitte in den nächsten Chunk, falls das Limit erreicht ist. """ strict = config.get("strict_heading_split", False) target = config.get("target", 400) @@ -35,10 +34,15 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: overlap = sum(overlap_cfg) // 2 if isinstance(overlap_cfg, (list, tuple)) else overlap_cfg chunks: List[Chunk] = [] - buf: List[RawBlock] = [] - def _add_chunk(txt, title, path): + def _emit_chunk(block_list: List[RawBlock]): + """Erzeugt ein finales Chunk-Objekt aus einer Liste von Blöcken.""" + if not block_list: return + txt = "\n\n".join([b.text for b in block_list]) idx = len(chunks) + # Metadaten vom ersten Block der Gruppe (Header) + title = block_list[0].section_title + path = block_list[0].section_path win = _create_context_win(doc_title, title, txt) chunks.append(Chunk( id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, @@ -47,70 +51,39 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: neighbors_prev=None, neighbors_next=None )) - def _flush_buffer(): - nonlocal buf - if not buf: return + def _emit_split_section(sec_blocks: List[RawBlock]): + """Splittet eine einzelne Sektion, die für sich allein zu groß ist.""" + full_text = "\n\n".join([b.text for b in sec_blocks]) + main_title = sec_blocks[0].section_title + main_path = sec_blocks[0].section_path + header_text = sec_blocks[0].text if sec_blocks[0].kind == "heading" else "" - # Block-Aware Processing innerhalb des Puffers - current_blocks = [] - current_len = 0 + sents = split_sentences(full_text) + cur_sents = []; sub_len = 0 - for b in buf: - b_len = _safe_estimate_tokens(b.text) - - # Falls dieser Block den aktuellen Chunk sprengen würde -> Vorher abschließen - if current_len + b_len > max_tokens and current_blocks: - txt = "\n\n".join([cb.text for cb in current_blocks]) - _add_chunk(txt, current_blocks[0].section_title, current_blocks[0].section_path) - current_blocks = [] - current_len = 0 - - # Falls ein einzelner Block alleine zu groß ist (Sliding Window Fallback) - if b_len > max_tokens: - if current_blocks: # Vorherigen Rest wegschreiben - txt = "\n\n".join([cb.text for cb in current_blocks]) - _add_chunk(txt, current_blocks[0].section_title, current_blocks[0].section_path) - current_blocks = [] - current_len = 0 - - # Sätze dieses einen Riesen-Blocks splitten - sents = split_sentences(b.text) - cur_sents = []; sub_len = 0 - header_text = b.text if b.kind == "heading" else "" - - for s in sents: - slen = _safe_estimate_tokens(s) - if sub_len + slen > target and cur_sents: - _add_chunk(" ".join(cur_sents), b.section_title, b.section_path) - # Overlap-Erzeugung & Header-Wiederholung - ov_s = [header_text] if header_text else [] - ov_l = _safe_estimate_tokens(header_text) if header_text else 0 - for os in reversed(cur_sents): - if os == header_text: continue - t_len = _safe_estimate_tokens(os) - if ov_l + t_len < overlap: - ov_s.insert(len(ov_s)-1 if header_text else 0, os) - ov_l += t_len - else: break - cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen - else: cur_sents.append(s); sub_len += slen - if cur_sents: - _add_chunk(" ".join(cur_sents), b.section_title, b.section_path) + for s in sents: + slen = _safe_estimate_tokens(s) + if sub_len + slen > target and cur_sents: + _emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)]) + # Header Injection für den Kontext im nächsten Teil-Chunk + ov_s = [header_text] if header_text else [] + ov_l = _safe_estimate_tokens(header_text) if header_text else 0 + for os in reversed(cur_sents): + if os == header_text: continue + t_len = _safe_estimate_tokens(os) + if ov_l + t_len < overlap: + ov_s.insert(len(ov_s)-1 if header_text else 0, os) + ov_l += t_len + else: break + cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen else: - current_blocks.append(b) - current_len += b_len - - # Den verbleibenden Rest im Puffer als finalen Chunk schreiben - if current_blocks: - txt = "\n\n".join([cb.text for cb in current_blocks]) - _add_chunk(txt, current_blocks[0].section_title, current_blocks[0].section_path) - - buf = [] + cur_sents.append(s); sub_len += slen + if cur_sents: + _emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)]) - # SCHRITT 1: Gruppierung in atomare Sektions-Einheiten + # SCHRITT 1: Gruppierung in atomare Einheiten (Sektionen) sections: List[List[RawBlock]] = [] curr_sec: List[RawBlock] = [] - for b in blocks: if b.kind == "heading" and b.level <= split_level: if curr_sec: sections.append(curr_sec) @@ -119,66 +92,63 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: curr_sec.append(b) if curr_sec: sections.append(curr_sec) - # SCHRITT 2: Verarbeitung der Sektionen mit Look-Ahead + # SCHRITT 2: Verarbeitung der Sektionen mit strengem Look-Ahead + current_chunk_buf = [] + current_tokens = 0 + for sec in sections: sec_text = "\n\n".join([b.text for b in sec]) sec_tokens = _safe_estimate_tokens(sec_text) - # Aktueller Füllstand des Puffers - cur_buf_tokens = sum(_safe_estimate_tokens(b.text) for b in buf) - - if buf: - # PRÜFUNG: Wenn die neue Sektion den Puffer über das Limit treibt - # ODER wenn der Puffer bereits das Ziel-Format erreicht hat - if (cur_buf_tokens + sec_tokens > max_tokens) or (cur_buf_tokens >= target): - _flush_buffer() - # PRÜFUNG: Strikter Split an Überschriften + if current_chunk_buf: + # PRÜFUNG: Würde die neue Sektion den aktuellen Chunk sprengen? + # ODER: Haben wir das Target bereits erreicht und fangen lieber neu an? + if (current_tokens + sec_tokens > max_tokens) or (current_tokens >= target): + _emit_chunk(current_chunk_buf) + current_chunk_buf = [] + current_tokens = 0 + # PRÜFUNG: Harter Split gefordert an Überschriften elif strict and sec[0].kind == "heading" and sec[0].level == split_level: - _flush_buffer() - - buf.extend(sec) - - # Falls eine Riesen-Sektion hinzugefügt wurde, die sofort raus muss - if sum(_safe_estimate_tokens(b.text) for b in buf) >= max_tokens: - _flush_buffer() + _emit_chunk(current_chunk_buf) + current_chunk_buf = [] + current_tokens = 0 + + # Wenn eine EINZELNE Sektion alleine schon das Limit sprengt + if sec_tokens > max_tokens: + if current_chunk_buf: + _emit_chunk(current_chunk_buf) + current_chunk_buf = [] + current_tokens = 0 + _emit_split_section(sec) + else: + current_chunk_buf.extend(sec) + current_tokens += sec_tokens + 2 # +2 für Newline Join + + # Letzten Puffer schreiben + if current_chunk_buf: + _emit_chunk(current_chunk_buf) - _flush_buffer() return chunks def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: """Basis-Sliding-Window für flache Texte ohne Sektionsfokus.""" target = config.get("target", 400) max_tokens = config.get("max", 600) - chunks: List[Chunk] = [] buf: List[RawBlock] = [] for b in blocks: b_tokens = _safe_estimate_tokens(b.text) current_tokens = sum(_safe_estimate_tokens(x.text) for x in buf) if buf else 0 - if current_tokens + b_tokens > max_tokens and buf: txt = "\n\n".join([x.text for x in buf]) idx = len(chunks) win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt - chunks.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=current_tokens, - section_title=buf[0].section_title, section_path=buf[0].section_path, - neighbors_prev=None, neighbors_next=None)) + chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=current_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) buf = [] - current_tokens = 0 - buf.append(b) - if buf: - txt = "\n\n".join([x.text for x in buf]) - idx = len(chunks) + txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt - chunks.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=_safe_estimate_tokens(txt), - section_title=buf[0].section_title, section_path=buf[0].section_path, - neighbors_prev=None, neighbors_next=None)) - + chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=_safe_estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) return chunks \ No newline at end of file