From 65d697b7be3fe7033fc4c0fc20fc16f20274dbb0 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 30 Dec 2025 07:54:54 +0100 Subject: [PATCH] =?UTF-8?q?Aktualisierung=20der=20atomaren=20Sektions-Chun?= =?UTF-8?q?king-Strategie=20auf=20Version=203.9.8=20mit=20verbesserten=20I?= =?UTF-8?q?mplementierungen=20des=20'Pack-and-Carry-Over'=20Verfahrens.=20?= =?UTF-8?q?Einf=C3=BChrung=20von=20Look-Ahead=20zur=20strikten=20Einhaltun?= =?UTF-8?q?g=20von=20Sektionsgrenzen=20und=20Vermeidung=20redundanter=20Ka?= =?UTF-8?q?nten-Injektionen.=20Anpassungen=20an=20der=20Chunk-Erstellung?= =?UTF-8?q?=20und=20Optimierung=20der=20Handhabung=20von=20leeren=20=C3=9C?= =?UTF-8?q?berschriften.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunking/chunking_parser.py | 21 +++++++++++----- app/core/chunking/chunking_strategies.py | 31 +++++++++++++----------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py index efb1a65..e36ff0e 100644 --- a/app/core/chunking/chunking_parser.py +++ b/app/core/chunking/chunking_parser.py @@ -1,7 +1,8 @@ """ FILE: app/core/chunking/chunking_parser.py -DESCRIPTION: Zerlegt Markdown in atomare Blöcke. Hält H1-Überschriften im Stream - und extrahiert Kanten-Kandidaten (parse_edges_robust). +DESCRIPTION: Zerlegt Markdown in logische Einheiten (RawBlocks). + Hält alle Überschriftenebenen (H1-H6) im Stream. + Stellt die Funktion parse_edges_robust zur Verfügung. """ import re from typing import List, Tuple, Set @@ -21,10 +22,14 @@ def split_sentences(text: str) -> list[str]: def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: """Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1-H6.""" blocks = [] - h1_title = "Dokument"; section_path = "/"; current_section_title = None + h1_title = "Dokument" + section_path = "/" + current_section_title = None + + # Frontmatter entfernen fm, text_without_fm = extract_frontmatter_from_text(md_text) - # H1 für Metadaten extrahieren + # H1 für Note-Titel extrahieren (Metadaten-Zweck) h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() @@ -38,6 +43,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: # Heading-Erkennung (H1 bis H6) heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped) if heading_match: + # Vorherigen Text-Block abschließen if buffer: content = "\n".join(buffer).strip() if content: @@ -47,16 +53,17 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: level = len(heading_match.group(1)) title = heading_match.group(2).strip() - # Pfad- und Titel-Update + # Pfad- und Titel-Update für die Metadaten der folgenden Blöcke if level == 1: current_section_title = title; section_path = "/" elif level == 2: current_section_title = title; section_path = f"/{current_section_title}" + # Die Überschrift selbst als regulären Block hinzufügen blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title)) continue - # Trenner oder Leerzeilen beenden Blöcke, außer innerhalb von Callouts + # Trenner (---) oder Leerzeilen beenden Blöcke, außer innerhalb von Callouts if (not stripped or stripped == "---") and not line.startswith('>'): if buffer: content = "\n".join(buffer).strip() @@ -93,10 +100,12 @@ def parse_edges_robust(text: str) -> Set[str]: callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) if callout_match: current_edge_type = callout_match.group(1).strip().lower() + # Links in der gleichen Zeile des Callouts links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") continue + # Links in Folgezeilen des Callouts if current_edge_type and stripped.startswith('>'): links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index ba04b68..e16121a 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,11 +1,9 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.6. - Implementiert das 'Pack-and-Carry-Over' Verfahren: - 1. Packt ganze Abschnitte basierend auf Schätzung. - 2. Kein physischer Overflow-Check während des Packens. - 3. Smart-Zerlegung von Übergrößen mit Carry-Over in die Queue. - - Hard-Split-Logik für strict_heading_split integriert. +DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.8. + Implementiert das 'Pack-and-Carry-Over' Verfahren nach Regel 1-3. + - Keine redundante Kanten-Injektion. + - Strikte Einhaltung von Sektionsgrenzen via Look-Ahead. """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk @@ -13,6 +11,7 @@ from .chunking_utils import estimate_tokens from .chunking_parser import split_sentences def _create_win(doc_title: str, sec_title: Optional[str], text: str) -> str: + """Baut den Breadcrumb-Kontext für das Embedding-Fenster.""" parts = [doc_title] if doc_title else [] if sec_title and sec_title != doc_title: parts.append(sec_title) prefix = " > ".join(parts) @@ -33,6 +32,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: chunks: List[Chunk] = [] def _emit(txt, title, path): + """Schreibt den finalen Chunk ohne Text-Modifikationen.""" idx = len(chunks) win = _create_win(doc_title, title, txt) chunks.append(Chunk( @@ -54,7 +54,6 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: }) curr_blocks = [b] else: - curr_sec_has_content = True curr_blocks.append(b) if curr_blocks: sections.append({ @@ -75,26 +74,27 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: item = queue.pop(0) item_text = item["text"] + # Initialisierung für neuen Chunk if not current_chunk_text: current_meta["title"] = item["meta"].section_title current_meta["path"] = item["meta"].section_path # FALL A: HARD SPLIT MODUS if is_hard_split_mode: - # Leere Überschriften (H1 vor H2) werden mit dem nächsten Item verschmolzen + # Leere Überschriften (z.B. H1 direkt vor H2) verbleiben am nächsten Chunk if item.get("is_empty", False) and queue: current_chunk_text = (current_chunk_text + "\n\n" + item_text).strip() continue combined = (current_chunk_text + "\n\n" + item_text).strip() - # Wenn durch das Verschmelzen das Limit gesprengt würde, flashen wir vorher + # Wenn durch Verschmelzung das Limit gesprengt würde, vorher flashen if estimate_tokens(combined) > max_tokens and current_chunk_text: _emit(current_chunk_text, current_meta["title"], current_meta["path"]) current_chunk_text = item_text else: current_chunk_text = combined - # Im Hard-Split wird nach jeder nicht-leeren Sektion geflasht + # Im Hard-Split wird nach jeder Sektion geflasht _emit(current_chunk_text, current_meta["title"], current_meta["path"]) current_chunk_text = "" continue @@ -104,7 +104,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: combined_est = estimate_tokens(combined_text) if combined_est <= max_tokens: - # Regel 1 & 2: Passt rein -> Aufnehmen + # Regel 1 & 2: Passt rein laut Schätzung -> Aufnehmen current_chunk_text = combined_text else: if current_chunk_text: @@ -119,8 +119,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: take_sents = []; take_len = 0 while sents: - s = sents.pop(0) - slen = estimate_tokens(s) + s = sents.pop(0); slen = estimate_tokens(s) if take_len + slen > target and take_sents: sents.insert(0, s); break take_sents.append(s); take_len += slen @@ -129,6 +128,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: if sents: remainder = " ".join(sents) + # Kontext-Erhalt: Überschrift für den Rest wiederholen if header_prefix and not remainder.startswith(header_prefix): remainder = header_prefix + "\n\n" + remainder # Carry-Over: Rest wird vorne in die Queue geschoben @@ -140,9 +140,10 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: return chunks def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: - """Basis-Sliding-Window für flache Texte.""" + """Standard-Sliding-Window für flache Texte ohne Sektionsfokus.""" target = config.get("target", 400); max_tokens = config.get("max", 600) chunks: List[Chunk] = []; buf: List[RawBlock] = [] + for b in blocks: b_tokens = estimate_tokens(b.text) curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 @@ -152,8 +153,10 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=curr_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) buf = [] buf.append(b) + if buf: txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) win = _create_win(doc_title, buf[0].section_title, txt) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) + return chunks \ No newline at end of file