From 7eba1fb487d8a7784f0cccf8662dc51ccff7c242 Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 29 Dec 2025 20:45:04 +0100 Subject: [PATCH] =?UTF-8?q?Aktualisierung=20des=20Chunking-Parsers=20zur?= =?UTF-8?q?=20Unterst=C3=BCtzung=20aller=20=C3=9Cberschriften=20im=20Strea?= =?UTF-8?q?m=20und=20Verbesserung=20der=20Metadatenverarbeitung.=20Anpassu?= =?UTF-8?q?ngen=20an=20der=20atomaren=20Sektions-Chunking-Strategie=20zur?= =?UTF-8?q?=20besseren=20Handhabung=20von=20Blockinhalten=20und=20Token-Sc?= =?UTF-8?q?h=C3=A4tzungen.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunking/chunking_parser.py | 15 ++-- app/core/chunking/chunking_strategies.py | 95 +++++++++++------------- 2 files changed, 52 insertions(+), 58 deletions(-) diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py index 2ec45bc..95e2fad 100644 --- a/app/core/chunking/chunking_parser.py +++ b/app/core/chunking/chunking_parser.py @@ -1,6 +1,6 @@ """ FILE: app/core/chunking/chunking_parser.py -DESCRIPTION: Zerlegt Markdown in Blöcke und extrahiert Kanten-Strings. +DESCRIPTION: Zerlegt Markdown in Blöcke. Hält H1-Überschriften im Stream. """ import re from typing import List, Tuple, Set @@ -17,12 +17,12 @@ def split_sentences(text: str) -> list[str]: return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()] def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: - """Zerlegt Text in logische Einheiten, inklusive H1.""" + """Zerlegt Text in logische Einheiten, inklusive aller Überschriften.""" blocks = [] h1_title = "Dokument"; section_path = "/"; current_section_title = None fm, text_without_fm = extract_frontmatter_from_text(md_text) - # H1 für Note-Metadaten extrahieren + # H1 für Note-Titel extrahieren (Metadaten) h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() @@ -35,6 +35,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: # Heading-Erkennung (H1 bis H6) heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped) if heading_match: + # Vorherigen Text-Block abschließen if buffer: content = "\n".join(buffer).strip() if content: @@ -44,7 +45,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: level = len(heading_match.group(1)) title = heading_match.group(2).strip() - # Metadaten-Update + # Pfad- und Titel-Update if level == 1: current_section_title = title section_path = "/" @@ -55,7 +56,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title)) continue - if not stripped: + if not stripped and not line.startswith('>'): # Leerzeilen (außer in Callouts) trennen Blöcke if buffer: content = "\n".join(buffer).strip() if content: @@ -79,6 +80,7 @@ def parse_edges_robust(text: str) -> Set[str]: k = kind.strip().lower() t = target.strip() if k and t: found_edges.add(f"{k}:{t}") + lines = text.split('\n') current_edge_type = None for line in lines: @@ -94,5 +96,6 @@ def parse_edges_robust(text: str) -> Set[str]: links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") - elif not stripped.startswith('>'): current_edge_type = None + elif not stripped.startswith('>'): + current_edge_type = None return found_edges \ No newline at end of file diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index 3c939ec..1a15bba 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,9 +1,6 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Korrigierte Splitting-Strategien für Mindnet v3.3.3. - - Fix: Erhalt von Überschriften im Chunk-Text. - - Fix: Atomares Buffering (Blöcke fallen als Ganzes in den nächsten Chunk). - - Fix: Korrekte Zuordnung von Sektions-Metadaten. +DESCRIPTION: Strategien für atomares Sektions-Chunking (WP-15b konform). """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk @@ -20,8 +17,7 @@ def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ - Implementiert atomares Sektions-Chunking. - Hält Überschriften und ihren Inhalt (inkl. Edges) zusammen. + Gruppiert Blöcke zu Sektionen und hält diese atomar zusammen. """ strict = config.get("strict_heading_split", False) target = config.get("target", 400) @@ -43,9 +39,10 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: neighbors_prev=None, neighbors_next=None )) - def _flush(): + def _flush_buffer(): nonlocal buf, cur_tokens if not buf: return + main_title = buf[0].section_title main_path = buf[0].section_path full_text = "\n\n".join([b.text for b in buf]) @@ -53,14 +50,14 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: if estimate_tokens(full_text) <= max_tokens: _add_to_chunks(full_text, main_title, main_path) else: - # Fallback: Nur wenn eine Sektion ALLEINE zu groß ist, wird intern gesplittet + # Nur wenn eine Sektion ALLEINE zu groß ist, wird intern gesplittet sents = split_sentences(full_text) cur_sents = []; sub_len = 0 for s in sents: slen = estimate_tokens(s) if sub_len + slen > target and cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path) - # Overlap-Logik... + # Overlap Logic ov_s = []; ov_l = 0 for os in reversed(cur_sents): if ov_l + estimate_tokens(os) < overlap: @@ -69,11 +66,13 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen else: cur_sents.append(s); sub_len += slen if cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path) + buf = []; cur_tokens = 0 - # SCHRITT 1: Gruppierung in atomare Sektions-Einheiten - sections = [] - curr_sec = [] + # SCHRITT 1: Gruppierung in atomare Sektions-Einheiten (Heading + Paragraphs) + sections: List[List[RawBlock]] = [] + curr_sec: List[RawBlock] = [] + for b in blocks: # Ein Split-Trigger startet eine neue Sektion if b.kind == "heading" and b.level <= split_level: @@ -85,66 +84,58 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: # SCHRITT 2: Verarbeitung der Sektionen mit Vorausschau for sec in sections: - sec_tokens = sum(estimate_tokens(b.text) for b in sec) + # Token-Schätzung für die gesamte Sektion inkl. Newline-Overhead + sec_text = "\n\n".join([b.text for b in sec]) + sec_tokens = estimate_tokens(sec_text) if buf: - # PRÜFUNG: Passt die gesamte Sektion noch in den aktuellen Chunk? + # Passt die Sektion noch in den aktuellen Chunk? if cur_tokens + sec_tokens > max_tokens: - _flush() - # PRÜFUNG: Harter Split gefordert? - elif strict: - _flush() - # PRÜFUNG: Weicher Split (Target erreicht)? + _flush_buffer() + # Wenn strict: Jede neue Sektion auf split_level erzwingt neuen Chunk + elif strict and sec[0].kind == "heading" and sec[0].level == split_level: + _flush_buffer() + # Wenn target erreicht: Neue Sektion startet neuen Chunk elif cur_tokens >= target: - _flush() + _flush_buffer() buf.extend(sec) cur_tokens += sec_tokens - # Falls die Sektion selbst das Limit sprengt, sofort flashen + # Falls der Puffer (selbst nach flush) durch eine Riesen-Sektion zu groß ist if cur_tokens >= max_tokens: - _flush() + _flush_buffer() - _flush() + _flush_buffer() return chunks -def strategy_sliding_window(blocks: List[RawBlock], - config: Dict[str, Any], - note_id: str, - context_prefix: str = "") -> List[Chunk]: - """ - Standard Sliding Window mit Korrektur für Heading-Retention. - """ +def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: + # (Identische Korrektur wie oben für Sliding Window, falls benötigt) + # Hier halten wir es einfach: Blöcke nacheinander bis target. target = config.get("target", 400) max_tokens = config.get("max", 600) - overlap_val = config.get("overlap", (50, 80)) - overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val chunks: List[Chunk] = [] buf: List[RawBlock] = [] - - def _flush_window(): - nonlocal buf - if not buf: return - txt = "\n\n".join([b.text for b in buf]) - idx = len(chunks) - win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt - chunks.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=estimate_tokens(txt), - section_title=buf[0].section_title, section_path=buf[0].section_path, - neighbors_prev=None, neighbors_next=None - )) - buf = [] - + for b in blocks: - # Auch hier: Überschriften mitnehmen b_tokens = estimate_tokens(b.text) - current_buf_tokens = estimate_tokens("\n\n".join([x.text for x in buf])) if buf else 0 + current_tokens = estimate_tokens("\n\n".join([x.text for x in buf])) if buf else 0 - if current_buf_tokens + b_tokens >= target and buf: - _flush_window() + if current_tokens + b_tokens > max_tokens and buf: + txt = "\n\n".join([x.text for x in buf]) + idx = len(chunks) + win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=current_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) + buf = [] + current_tokens = 0 + buf.append(b) - _flush_window() + if buf: + txt = "\n\n".join([x.text for x in buf]) + idx = len(chunks) + win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) + return chunks \ No newline at end of file