From 3c5c567077daad8427abbc80ed72ff717a3d8011 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 30 Dec 2025 07:41:30 +0100 Subject: [PATCH] =?UTF-8?q?Aktualisierung=20der=20atomaren=20Sektions-Chun?= =?UTF-8?q?king-Strategie=20auf=20Version=203.9.5=20mit=20Implementierung?= =?UTF-8?q?=20des=20'Pack-and-Carry-Over'=20Verfahrens.=20Einf=C3=BChrung?= =?UTF-8?q?=20neuer=20Konfigurationsoptionen=20f=C3=BCr=20Smart-Edge=20und?= =?UTF-8?q?=20strikte=20=C3=9Cberschriftenteilung.=20Verbesserte=20Handhab?= =?UTF-8?q?ung=20von=20leeren=20=C3=9Cberschriften=20und=20Anpassungen=20a?= =?UTF-8?q?n=20der=20Warteschlangen-Verarbeitung=20zur=20Optimierung=20der?= =?UTF-8?q?=20Chunk-Erstellung.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunking/chunking_parser.py | 44 ++++------- app/core/chunking/chunking_strategies.py | 95 +++++++++++++----------- 2 files changed, 67 insertions(+), 72 deletions(-) diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py index e55f032..1448932 100644 --- a/app/core/chunking/chunking_parser.py +++ b/app/core/chunking/chunking_parser.py @@ -8,19 +8,13 @@ from typing import List, Tuple, Set from .chunking_models import RawBlock from .chunking_utils import extract_frontmatter_from_text -def split_sentences(text: str) -> list[str]: - """Teilt Text in Sätze auf unter Berücksichtigung deutscher Interpunktion.""" - text = re.sub(r'\s+', ' ', text.strip()) - if not text: return [] - # Splittet bei Satzzeichen, gefolgt von Leerzeichen und Großbuchstaben - return [s.strip() for s in re.split(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])', text) if s.strip()] - def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: """Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1-H6.""" blocks = [] h1_title = "Dokument"; section_path = "/"; current_section_title = None fm, text_without_fm = extract_frontmatter_from_text(md_text) + # H1 für Metadaten extrahieren h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() @@ -32,20 +26,26 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped) if heading_match: + # Vorherigen Text-Block abschließen if buffer: content = "\n".join(buffer).strip() if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) buffer = [] + level = len(heading_match.group(1)) title = heading_match.group(2).strip() + + # Pfad- und Titel-Update if level == 1: current_section_title = title; section_path = "/" elif level == 2: current_section_title = title; section_path = f"/{current_section_title}" + blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title)) continue + # Trenner oder Leerzeilen beenden Blöcke, außer innerhalb von Callouts if (not stripped or stripped == "---") and not line.startswith('>'): if buffer: content = "\n".join(buffer).strip() @@ -60,28 +60,12 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: if buffer: content = "\n".join(buffer).strip() if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) + return blocks, h1_title -def parse_edges_robust(text: str) -> Set[str]: - """Extrahiert Kanten-Kandidaten aus Wikilinks und Callouts.""" - found_edges = set() - inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) - for kind, target in inlines: - k = kind.strip().lower(); t = target.strip() - if k and t: found_edges.add(f"{k}:{t}") - lines = text.split('\n'); current_edge_type = None - for line in lines: - stripped = line.strip() - callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) - if callout_match: - current_edge_type = callout_match.group(1).strip().lower() - links = re.findall(r'\[\[([^\]]+)\]\]', stripped) - for l in links: - if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") - continue - if current_edge_type and stripped.startswith('>'): - links = re.findall(r'\[\[([^\]]+)\]\]', stripped) - for l in links: - if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") - elif not stripped.startswith('>'): current_edge_type = None - return found_edges \ No newline at end of file +def split_sentences(text: str) -> list[str]: + """Teilt Text in Sätze auf unter Berücksichtigung deutscher Interpunktion.""" + text = re.sub(r'\s+', ' ', text.strip()) + if not text: return [] + # Splittet bei Satzzeichen, gefolgt von Leerzeichen und Großbuchstaben + return [s.strip() for s in re.split(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])', text) if s.strip()] \ No newline at end of file diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index de995fd..562808b 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,7 +1,7 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Universelle Strategie für atomares Sektions-Chunking v3.9.0. - Regelkonforme Implementierung: Pack-Sections, Trust Estimation, Carry-Over. +DESCRIPTION: Strategie für atomares Sektions-Chunking v3.9.5. + Implementiert das 'Pack-and-Carry-Over' Verfahren nach Nutzerwunsch. """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk @@ -15,7 +15,14 @@ def _create_win(doc_title: str, sec_title: Optional[str], text: str) -> str: return f"{prefix}\n{text}".strip() if prefix else text def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: + """ + Universelle Sektions-Strategie: + - Smart-Edge=True: Packt Sektionen basierend auf Schätzung (Regel 1-3). + - Smart-Edge=False: Hard Split an Überschriften (außer leere Header). + - Strict=True erzwingt Hard Split Verhalten innerhalb der Smart-Logik. + """ smart_edge = config.get("enable_smart_edge_allocation", True) + strict = config.get("strict_heading_split", False) target = config.get("target", 400) max_tokens = config.get("max", 600) split_level = config.get("split_level", 2) @@ -33,65 +40,72 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: section_title=title, section_path=path, neighbors_prev=None, neighbors_next=None )) - # --- FALL A: HARD SPLIT (enable_smart_edge_allocation: false) --- - if not smart_edge: - buf = [] - for b in blocks: - # Trenne hart bei Überschrift <= split_level - if b.kind == "heading" and b.level <= split_level: - # Prüfe, ob Puffer mehr als nur Überschriften enthält (keine leeren Chunks) - has_content = any(x.kind != "heading" for x in buf) - if buf and has_content: - _emit("\n\n".join([x.text for x in buf]), buf[0].section_title, buf[0].section_path) - buf = [] - buf.append(b) - if buf: _emit("\n\n".join([x.text for x in buf]), buf[0].section_title, buf[0].section_path) - return chunks - - # --- FALL B: SMART EDGE ALLOCATION (Pack-and-Carry-Over) --- - # 1. Gruppierung in atomare Einheiten (Sektions-Isolation) + # --- SCHRITT 1: Gruppierung in atomare Sektions-Einheiten --- sections: List[Dict[str, Any]] = [] curr_blocks = [] for b in blocks: if b.kind == "heading" and b.level <= split_level: if curr_blocks: - sections.append({"text": "\n\n".join([x.text for x in curr_blocks]), "meta": curr_blocks[0]}) + sections.append({"text": "\n\n".join([x.text for x in curr_blocks]), + "meta": curr_blocks[0], "is_empty": len(curr_blocks) == 1}) curr_blocks = [b] else: curr_blocks.append(b) if curr_blocks: - sections.append({"text": "\n\n".join([x.text for x in curr_blocks]), "meta": curr_blocks[0]}) + sections.append({"text": "\n\n".join([x.text for x in curr_blocks]), + "meta": curr_blocks[0], "is_empty": len(curr_blocks) == 1}) - # 2. Warteschlangen-Verarbeitung (Regel 1-3) - # Wir nutzen eine Liste als Queue für Carry-Over-Reste + # --- SCHRITT 2: Verarbeitung der Queue --- queue = list(sections) current_chunk_text = "" current_meta = {"title": None, "path": "/"} + # Hard-Split-Bedingung: Entweder Smart-Edge aus ODER Profil ist Strict + is_hard_split_mode = (not smart_edge) or (strict) + while queue: item = queue.pop(0) item_text = item["text"] - # Initialisiere Metadaten für einen neuen Chunk + # Initialisierung für neuen Chunk if not current_chunk_text: current_meta["title"] = item["meta"].section_title current_meta["path"] = item["meta"].section_path - # Schätzung (Regel 2: Wir verlassen uns darauf) + # FALL A: Hard Split Modus (Regel: Trenne bei jeder Sektion <= Level) + if is_hard_split_mode: + # Regel: Leere Überschriften verbleiben am nächsten Chunk + if item.get("is_empty", False) and queue: + current_chunk_text = (current_chunk_text + "\n\n" + item_text).strip() + continue # Nimm das nächste Item dazu + + combined = (current_chunk_text + "\n\n" + item_text).strip() + if estimate_tokens(combined) > max_tokens and current_chunk_text: + # Falls es trotz Hard-Split zu groß wird, flashen wir erst den alten Teil + _emit(current_chunk_text, current_meta["title"], current_meta["path"]) + current_chunk_text = item_text + else: + current_chunk_text = combined + + # Im Hard Split flashen wir nach jeder Sektion, die nicht leer ist + _emit(current_chunk_text, current_meta["title"], current_meta["path"]) + current_chunk_text = "" + continue + + # FALL B: Smart Mode (Regel 1-3) combined_text = (current_chunk_text + "\n\n" + item_text).strip() if current_chunk_text else item_text combined_est = estimate_tokens(combined_text) if combined_est <= max_tokens: - # Regel 1: Vollständiger Abschnitt passt -> Aufnehmen + # Regel 1 & 2: Passt nach Schätzung -> Aufnehmen current_chunk_text = combined_text else: - # Er passt nicht ganz rein. + # Regel 3: Passt nicht -> Entweder Puffer flashen oder Item zerlegen if current_chunk_text: - # Puffer ist bereits gefüllt -> Wegschreiben, Item zurück in die Queue _emit(current_chunk_text, current_meta["title"], current_meta["path"]) current_chunk_text = "" - queue.insert(0, item) + queue.insert(0, item) # Item für neuen Chunk zurücklegen else: - # Regel 3: Einzelner Abschnitt allein ist > max -> Smart Zerlegung + # Einzelne Sektion zu groß -> Smart Zerlegung sents = split_sentences(item_text) header_prefix = item["meta"].text if item["meta"].kind == "heading" else "" @@ -103,25 +117,22 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: sents.insert(0, s); break take_sents.append(s); take_len += slen - # Ersten Teil emittieren _emit(" ".join(take_sents), current_meta["title"], current_meta["path"]) - # Rest als Carry-Over zurück in die Queue (Regel 3) + # Carry-Over: Rest wird vorne in die Queue geschoben if sents: - remainder_text = " ".join(sents) - # Kontext-Erhalt: Überschrift für den Rest wiederholen - if header_prefix and not remainder_text.startswith(header_prefix): - remainder_text = header_prefix + "\n\n" + remainder_text - queue.insert(0, {"text": remainder_text, "meta": item["meta"]}) + remainder = " ".join(sents) + if header_prefix and not remainder.startswith(header_prefix): + remainder = header_prefix + "\n\n" + remainder + queue.insert(0, {"text": remainder, "meta": item["meta"], "is_split": True}) - # Letzten Rest wegschreiben if current_chunk_text: _emit(current_chunk_text, current_meta["title"], current_meta["path"]) return chunks -def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: - """Sliding Window: Unverändert erhalten für Standard-Typen.""" +def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: + """Standard Sliding Window Strategie.""" target = config.get("target", 400); max_tokens = config.get("max", 600) chunks: List[Chunk] = []; buf: List[RawBlock] = [] for b in blocks: @@ -129,12 +140,12 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 if curr_tokens + b_tokens > max_tokens and buf: txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) - win = _create_win(doc_title=context_prefix, sec_title=buf[0].section_title, text=txt) + win = _create_win(doc_title, buf[0].section_title, txt) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=curr_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) buf = [] buf.append(b) if buf: txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) - win = _create_win(doc_title=context_prefix, sec_title=buf[0].section_title, text=txt) + win = _create_win(doc_title, buf[0].section_title, txt) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) return chunks \ No newline at end of file