From 06fc42ed37d418423ca5f7396ea47681011a0ccd Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 30 Dec 2025 07:44:30 +0100 Subject: [PATCH] =?UTF-8?q?Aktualisierung=20des=20Chunking-Parsers=20zur?= =?UTF-8?q?=20Einf=C3=BChrung=20der=20Funktion=20`parse=5Fedges=5Frobust`?= =?UTF-8?q?=20zur=20Extraktion=20von=20Kanten-Kandidaten=20aus=20Wikilinks?= =?UTF-8?q?=20und=20Callouts.=20Verbesserung=20der=20Satzverarbeitung=20du?= =?UTF-8?q?rch=20die=20Implementierung=20der=20Funktion=20`split=5Fsentenc?= =?UTF-8?q?es`.=20Aktualisierung=20der=20Sektions-Chunking-Strategie=20auf?= =?UTF-8?q?=20Version=203.9.6=20mit=20optimierter=20Handhabung=20von=20lee?= =?UTF-8?q?ren=20=C3=9Cberschriften=20und=20Carry-Over=20Logik=20zur=20bes?= =?UTF-8?q?seren=20Chunk-Erstellung.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunking/chunking_parser.py | 57 +++++++++++++++++----- app/core/chunking/chunking_strategies.py | 60 ++++++++++++++---------- 2 files changed, 80 insertions(+), 37 deletions(-) diff --git a/app/core/chunking/chunking_parser.py b/app/core/chunking/chunking_parser.py index 1448932..efb1a65 100644 --- a/app/core/chunking/chunking_parser.py +++ b/app/core/chunking/chunking_parser.py @@ -1,13 +1,23 @@ """ FILE: app/core/chunking/chunking_parser.py DESCRIPTION: Zerlegt Markdown in atomare Blöcke. Hält H1-Überschriften im Stream - und gewährleistet die strukturelle Integrität von Callouts. + und extrahiert Kanten-Kandidaten (parse_edges_robust). """ import re from typing import List, Tuple, Set from .chunking_models import RawBlock from .chunking_utils import extract_frontmatter_from_text +_WS = re.compile(r'\s+') +_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') + +def split_sentences(text: str) -> list[str]: + """Teilt Text in Sätze auf unter Berücksichtigung deutscher Interpunktion.""" + text = _WS.sub(' ', text.strip()) + if not text: return [] + # Splittet bei Punkt, Ausrufezeichen oder Fragezeichen, gefolgt von Leerzeichen und Großbuchstabe + return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()] + def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: """Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1-H6.""" blocks = [] @@ -16,17 +26,18 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: # H1 für Metadaten extrahieren h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) - if h1_match: h1_title = h1_match.group(1).strip() + if h1_match: + h1_title = h1_match.group(1).strip() lines = text_without_fm.split('\n') buffer = [] for line in lines: stripped = line.strip() - heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped) + # Heading-Erkennung (H1 bis H6) + heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped) if heading_match: - # Vorherigen Text-Block abschließen if buffer: content = "\n".join(buffer).strip() if content: @@ -59,13 +70,37 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: if buffer: content = "\n".join(buffer).strip() - if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) + if content: + blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) return blocks, h1_title -def split_sentences(text: str) -> list[str]: - """Teilt Text in Sätze auf unter Berücksichtigung deutscher Interpunktion.""" - text = re.sub(r'\s+', ' ', text.strip()) - if not text: return [] - # Splittet bei Satzzeichen, gefolgt von Leerzeichen und Großbuchstaben - return [s.strip() for s in re.split(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])', text) if s.strip()] \ No newline at end of file +def parse_edges_robust(text: str) -> Set[str]: + """Extrahiert Kanten-Kandidaten aus Wikilinks und Callouts.""" + found_edges = set() + # 1. Wikilinks [[rel:kind|target]] + inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) + for kind, target in inlines: + k = kind.strip().lower() + t = target.strip() + if k and t: found_edges.add(f"{k}:{t}") + + # 2. Callout Edges > [!edge] kind + lines = text.split('\n') + current_edge_type = None + for line in lines: + stripped = line.strip() + callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) + if callout_match: + current_edge_type = callout_match.group(1).strip().lower() + links = re.findall(r'\[\[([^\]]+)\]\]', stripped) + for l in links: + if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") + continue + if current_edge_type and stripped.startswith('>'): + links = re.findall(r'\[\[([^\]]+)\]\]', stripped) + for l in links: + if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") + elif not stripped.startswith('>'): + current_edge_type = None + return found_edges \ No newline at end of file diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index 562808b..ba04b68 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,7 +1,11 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Strategie für atomares Sektions-Chunking v3.9.5. - Implementiert das 'Pack-and-Carry-Over' Verfahren nach Nutzerwunsch. +DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.6. + Implementiert das 'Pack-and-Carry-Over' Verfahren: + 1. Packt ganze Abschnitte basierend auf Schätzung. + 2. Kein physischer Overflow-Check während des Packens. + 3. Smart-Zerlegung von Übergrößen mit Carry-Over in die Queue. + - Hard-Split-Logik für strict_heading_split integriert. """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk @@ -16,10 +20,7 @@ def _create_win(doc_title: str, sec_title: Optional[str], text: str) -> str: def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ - Universelle Sektions-Strategie: - - Smart-Edge=True: Packt Sektionen basierend auf Schätzung (Regel 1-3). - - Smart-Edge=False: Hard Split an Überschriften (außer leere Header). - - Strict=True erzwingt Hard Split Verhalten innerhalb der Smart-Logik. + Universelle Heading-Strategie mit Carry-Over Logik. """ smart_edge = config.get("enable_smart_edge_allocation", True) strict = config.get("strict_heading_split", False) @@ -46,66 +47,73 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: for b in blocks: if b.kind == "heading" and b.level <= split_level: if curr_blocks: - sections.append({"text": "\n\n".join([x.text for x in curr_blocks]), - "meta": curr_blocks[0], "is_empty": len(curr_blocks) == 1}) + sections.append({ + "text": "\n\n".join([x.text for x in curr_blocks]), + "meta": curr_blocks[0], + "is_empty": len(curr_blocks) == 1 and curr_blocks[0].kind == "heading" + }) curr_blocks = [b] - else: curr_blocks.append(b) + else: + curr_sec_has_content = True + curr_blocks.append(b) if curr_blocks: - sections.append({"text": "\n\n".join([x.text for x in curr_blocks]), - "meta": curr_blocks[0], "is_empty": len(curr_blocks) == 1}) + sections.append({ + "text": "\n\n".join([x.text for x in curr_blocks]), + "meta": curr_blocks[0], + "is_empty": len(curr_blocks) == 1 and curr_blocks[0].kind == "heading" + }) # --- SCHRITT 2: Verarbeitung der Queue --- queue = list(sections) current_chunk_text = "" current_meta = {"title": None, "path": "/"} - - # Hard-Split-Bedingung: Entweder Smart-Edge aus ODER Profil ist Strict + + # Bestimmung des Modus: Hard-Split wenn smart_edge=False ODER strict=True is_hard_split_mode = (not smart_edge) or (strict) while queue: item = queue.pop(0) item_text = item["text"] - # Initialisierung für neuen Chunk if not current_chunk_text: current_meta["title"] = item["meta"].section_title current_meta["path"] = item["meta"].section_path - # FALL A: Hard Split Modus (Regel: Trenne bei jeder Sektion <= Level) + # FALL A: HARD SPLIT MODUS if is_hard_split_mode: - # Regel: Leere Überschriften verbleiben am nächsten Chunk + # Leere Überschriften (H1 vor H2) werden mit dem nächsten Item verschmolzen if item.get("is_empty", False) and queue: current_chunk_text = (current_chunk_text + "\n\n" + item_text).strip() - continue # Nimm das nächste Item dazu + continue combined = (current_chunk_text + "\n\n" + item_text).strip() + # Wenn durch das Verschmelzen das Limit gesprengt würde, flashen wir vorher if estimate_tokens(combined) > max_tokens and current_chunk_text: - # Falls es trotz Hard-Split zu groß wird, flashen wir erst den alten Teil _emit(current_chunk_text, current_meta["title"], current_meta["path"]) current_chunk_text = item_text else: current_chunk_text = combined - # Im Hard Split flashen wir nach jeder Sektion, die nicht leer ist + # Im Hard-Split wird nach jeder nicht-leeren Sektion geflasht _emit(current_chunk_text, current_meta["title"], current_meta["path"]) current_chunk_text = "" continue - # FALL B: Smart Mode (Regel 1-3) + # FALL B: SMART MODE (Regel 1-3) combined_text = (current_chunk_text + "\n\n" + item_text).strip() if current_chunk_text else item_text combined_est = estimate_tokens(combined_text) if combined_est <= max_tokens: - # Regel 1 & 2: Passt nach Schätzung -> Aufnehmen + # Regel 1 & 2: Passt rein -> Aufnehmen current_chunk_text = combined_text else: - # Regel 3: Passt nicht -> Entweder Puffer flashen oder Item zerlegen if current_chunk_text: + # Regel 2: Flashen an Sektionsgrenze, Item zurücklegen _emit(current_chunk_text, current_meta["title"], current_meta["path"]) current_chunk_text = "" - queue.insert(0, item) # Item für neuen Chunk zurücklegen + queue.insert(0, item) else: - # Einzelne Sektion zu groß -> Smart Zerlegung + # Regel 3: Einzelne Sektion zu groß -> Smart Zerlegung sents = split_sentences(item_text) header_prefix = item["meta"].text if item["meta"].kind == "heading" else "" @@ -119,11 +127,11 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: _emit(" ".join(take_sents), current_meta["title"], current_meta["path"]) - # Carry-Over: Rest wird vorne in die Queue geschoben if sents: remainder = " ".join(sents) if header_prefix and not remainder.startswith(header_prefix): remainder = header_prefix + "\n\n" + remainder + # Carry-Over: Rest wird vorne in die Queue geschoben queue.insert(0, {"text": remainder, "meta": item["meta"], "is_split": True}) if current_chunk_text: @@ -132,7 +140,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: return chunks def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: - """Standard Sliding Window Strategie.""" + """Basis-Sliding-Window für flache Texte.""" target = config.get("target", 400); max_tokens = config.get("max", 600) chunks: List[Chunk] = []; buf: List[RawBlock] = [] for b in blocks: