Verbesserung des Chunking-Parsers zur Unterstützung atomarer Blöcke und Gewährleistung der strukturellen Integrität von Callouts. Aktualisierung der Beschreibung und Optimierung der Satz- und Blockverarbeitung, einschließlich präziserer Handhabung von H1-Überschriften und Trennern.

This commit is contained in:
Lars 2025-12-29 21:48:54 +01:00
parent 680c36ab59
commit be265e9cc0
2 changed files with 77 additions and 66 deletions

View File

@ -1,39 +1,44 @@
""" """
FILE: app/core/chunking/chunking_parser.py FILE: app/core/chunking/chunking_parser.py
DESCRIPTION: Zerlegt Markdown in logische Blöcke (RawBlocks). DESCRIPTION: Zerlegt Markdown in atomare Blöcke. Hält H1-Überschriften im Stream
Gewährleistet, dass H1 und Trenner im Stream verbleiben. und gewährleistet die strukturelle Integrität von Callouts.
""" """
import re import re
from typing import List, Tuple, Set from typing import List, Tuple, Set
from .chunking_models import RawBlock from .chunking_models import RawBlock
from .chunking_utils import extract_frontmatter_from_text from .chunking_utils import extract_frontmatter_from_text
_WS = re.compile(r'\s+')
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
def split_sentences(text: str) -> list[str]: def split_sentences(text: str) -> list[str]:
"""Teilt Text in Sätze auf unter Berücksichtigung deutscher Interpunktion.""" """Teilt Text in Sätze auf unter Berücksichtigung deutscher Interpunktion."""
text = _WS.sub(' ', text.strip()) text = re.sub(r'\s+', ' ', text.strip())
if not text: return [] if not text: return []
return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()] # Splittet bei Satzzeichen, gefolgt von Leerzeichen und Großbuchstaben
sentences = re.split(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])', text)
return [s.strip() for s in sentences if s.strip()]
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1-H6.""" """Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1-H6."""
blocks = [] blocks = []
h1_title = "Dokument"; section_path = "/"; current_section_title = None h1_title = "Dokument"
section_path = "/"
current_section_title = None
# Frontmatter entfernen
fm, text_without_fm = extract_frontmatter_from_text(md_text) fm, text_without_fm = extract_frontmatter_from_text(md_text)
# H1 für Metadaten extrahieren # H1 für Note-Titel extrahieren
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match: h1_title = h1_match.group(1).strip() if h1_match:
h1_title = h1_match.group(1).strip()
lines = text_without_fm.split('\n') lines = text_without_fm.split('\n')
buffer = [] buffer = []
for line in lines: for line in lines:
stripped = line.strip() stripped = line.strip()
heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped)
# Heading-Erkennung (H1 bis H6)
heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped)
if heading_match: if heading_match:
if buffer: if buffer:
content = "\n".join(buffer).strip() content = "\n".join(buffer).strip()
@ -52,14 +57,18 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title)) blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title))
continue continue
if (not stripped or stripped == "---") and not line.startswith('>'): # Trenner (---) beenden Blöcke, Leerzeilen nur wenn nicht in Callout
if stripped == "---" and not line.startswith('>'):
if buffer: if buffer:
content = "\n".join(buffer).strip() content = "\n".join(buffer).strip()
if content: if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title)) buffer = []
blocks.append(RawBlock("separator", "---", None, section_path, current_section_title))
elif not stripped and not line.startswith('>'):
if buffer:
content = "\n".join(buffer).strip()
if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
buffer = [] buffer = []
if stripped == "---":
blocks.append(RawBlock("separator", "---", None, section_path, current_section_title))
else: else:
buffer.append(line) buffer.append(line)
@ -70,7 +79,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
return blocks, h1_title return blocks, h1_title
def parse_edges_robust(text: str) -> Set[str]: def parse_edges_robust(text: str) -> Set[str]:
"""Extrahiert Kanten aus Wikilinks und Callouts.""" """Extrahiert Kanten-Kandidaten aus Wikilinks und Callouts."""
found_edges = set() found_edges = set()
inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text)
for kind, target in inlines: for kind, target in inlines:

View File

@ -1,39 +1,42 @@
""" """
FILE: app/core/chunking/chunking_strategies.py FILE: app/core/chunking/chunking_strategies.py
DESCRIPTION: Universelle Strategie für atomares Sektions-Chunking v3.6.0. DESCRIPTION: Strategie für atomares Sektions-Chunking v3.7.0.
Garantiert Sektions-Integrität durch präventives Chunk-Management. Garantiert Sektions-Integrität durch ein flexibles Toleranz-Limit.
Kein Splitting von Sektionen, solange sie 'ungefähr' passen.
""" """
import math import math
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from .chunking_models import RawBlock, Chunk from .chunking_models import RawBlock, Chunk
from .chunking_parser import split_sentences from .chunking_parser import split_sentences
def _accurate_estimate_tokens(text: str) -> int: # Toleranz-Faktor: Erlaubt Chunks, bis zu 15% über 'max' zu wachsen,
"""Konservative Schätzung für deutschen Text (len/2.5 statt len/4).""" # um eine Sektion vollständig zu erhalten.
return max(1, math.ceil(len(text.strip()) / 2.5)) FLEX_FACTOR = 1.15
def _safe_estimate(text: str) -> int:
"""Sicherere Token-Schätzung für MD/Deutsch (Faktor 3.0 statt 4.0)."""
return max(1, math.ceil(len(text.strip()) / 3.0))
def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> str: def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> str:
parts = [] parts = []
if doc_title: parts.append(doc_title) if doc_title: parts.append(doc_title)
if sec_title and sec_title != doc_title: parts.append(sec_title) if sec_title and sec_title != doc_title: parts.append(sec_title)
prefix = " > ".join(parts) prefix = " > ".join(parts); return f"{prefix}\n{text}".strip() if prefix else text
return f"{prefix}\n{text}".strip() if prefix else text
def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
"""
Sektions-Chunking: Packt komplette Abschnitte in Chunks.
Bei Überlauf wird die Sektion ohne Ausnahme in den nächsten Chunk geschoben.
"""
target = config.get("target", 400) target = config.get("target", 400)
max_tokens = config.get("max", 600) max_tokens = config.get("max", 600)
split_level = config.get("split_level", 2) split_level = config.get("split_level", 2)
overlap_cfg = config.get("overlap", (50, 80)) overlap_cfg = config.get("overlap", (50, 80))
overlap = sum(overlap_cfg) // 2 if isinstance(overlap_cfg, (list, tuple)) else overlap_cfg overlap = sum(overlap_cfg) // 2 if isinstance(overlap_cfg, (list, tuple)) else overlap_cfg
# Das flexible Maximum, das Sektionen unzertrennt lässt
soft_max = int(max_tokens * FLEX_FACTOR)
chunks: List[Chunk] = [] chunks: List[Chunk] = []
def _emit_chunk(block_list: List[RawBlock]): def _emit_chunk(block_list: List[RawBlock]):
"""Schreibt eine Liste von Blöcken als einen einzigen, ungeteilten Chunk.""" """Schreibt eine Liste von Blöcken als einen einzigen Chunk ohne internes Splitting."""
if not block_list: return if not block_list: return
txt = "\n\n".join([b.text for b in block_list]) txt = "\n\n".join([b.text for b in block_list])
idx = len(chunks) idx = len(chunks)
@ -42,40 +45,36 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
win = _create_context_win(doc_title, title, txt) win = _create_context_win(doc_title, title, txt)
chunks.append(Chunk( chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=_accurate_estimate_tokens(txt), text=txt, window=win, token_count=_safe_estimate(txt),
section_title=title, section_path=path, section_title=title, section_path=path,
neighbors_prev=None, neighbors_next=None neighbors_prev=None, neighbors_next=None
)) ))
def _split_giant_section(sec_blocks: List[RawBlock]): def _split_giant_section(sec_blocks: List[RawBlock]):
"""Notfall-Split: Nur wenn eine EINZELNE Sektion bereits > max ist.""" """Notfall-Split: Nur wenn eine EINZELNE Sektion bereits > soft_max ist."""
full_text = "\n\n".join([b.text for b in sec_blocks]) full_text = "\n\n".join([b.text for b in sec_blocks])
main_title = sec_blocks[0].section_title main_title = sec_blocks[0].section_title; main_path = sec_blocks[0].section_path
main_path = sec_blocks[0].section_path
header_text = sec_blocks[0].text if sec_blocks[0].kind == "heading" else "" header_text = sec_blocks[0].text if sec_blocks[0].kind == "heading" else ""
sents = split_sentences(full_text) sents = split_sentences(full_text)
cur_sents = []; sub_len = 0 cur_sents = []; sub_len = 0
for s in sents: for s in sents:
slen = _accurate_estimate_tokens(s) slen = _safe_estimate(s)
if sub_len + slen > target and cur_sents: if sub_len + slen > target and cur_sents:
_emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)]) _emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)])
ov_s = [header_text] if header_text else [] ov_s = [header_text] if header_text else []
ov_l = _accurate_estimate_tokens(header_text) if header_text else 0 ov_l = _safe_estimate(header_text) if header_text else 0
for os in reversed(cur_sents): for os in reversed(cur_sents):
if os == header_text: continue if os == header_text: continue
t_len = _accurate_estimate_tokens(os) t_len = _safe_estimate(os)
if ov_l + t_len < overlap: if ov_l + t_len < overlap:
ov_s.insert(len(ov_s)-1 if header_text else 0, os) ov_s.insert(len(ov_s)-1 if header_text else 0, os); ov_l += t_len
ov_l += t_len
else: break else: break
cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen
else: cur_sents.append(s); sub_len += slen else: cur_sents.append(s); sub_len += slen
if cur_sents: _emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)]) if cur_sents: _emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)])
# 1. Gruppierung in atomare Einheiten # 1. Gruppierung in atomare Sektions-Einheiten
sections: List[List[RawBlock]] = [] sections: List[List[RawBlock]] = []
curr_sec: List[RawBlock] = [] curr_sec: List[RawBlock] = []
for b in blocks: for b in blocks:
@ -85,41 +84,44 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
else: curr_sec.append(b) else: curr_sec.append(b)
if curr_sec: sections.append(curr_sec) if curr_sec: sections.append(curr_sec)
# 2. Das Pack-Verfahren (Kein Zerschneiden beim Flashen!) # 2. Das flexible Pack-Verfahren
candidate_chunk: List[RawBlock] = [] current_chunk_buf: List[RawBlock] = []
candidate_tokens = 0 current_tokens = 0
for sec in sections: for sec in sections:
sec_text = "\n\n".join([b.text for b in sec]) sec_text = "\n\n".join([b.text for b in sec])
sec_tokens = _accurate_estimate_tokens(sec_text) sec_tokens = _safe_estimate(sec_text)
# Prüfung: Passt die Sektion noch dazu? if current_chunk_buf:
if candidate_tokens + sec_tokens <= max_tokens: # PRÜFUNG: Würde die neue Sektion das FLEXIBLE Limit sprengen?
candidate_chunk.extend(sec) if (current_tokens + sec_tokens > soft_max):
candidate_tokens = _accurate_estimate_tokens("\n\n".join([b.text for b in candidate_chunk])) _emit_chunk(current_chunk_buf)
else: current_chunk_buf = []
# Chunk ist voll -> Abschluss an Sektionsgrenze current_tokens = 0
if candidate_chunk: # Haben wir das Ziel-Maß erreicht und es kommt eine neue Sektion?
_emit_chunk(candidate_chunk) elif (current_tokens >= target):
candidate_chunk = [] _emit_chunk(current_chunk_buf)
candidate_tokens = 0 current_chunk_buf = []
current_tokens = 0
# Neue Sektion allein prüfen
if sec_tokens > max_tokens: # Wenn eine EINZELNE Sektion alleine schon das weiche Limit sprengt
_split_giant_section(sec) if not current_chunk_buf and sec_tokens > soft_max:
else: _split_giant_section(sec)
candidate_chunk = list(sec) else:
candidate_tokens = sec_tokens current_chunk_buf.extend(sec)
current_tokens = _safe_estimate("\n\n".join([b.text for b in current_chunk_buf]))
if current_chunk_buf:
_emit_chunk(current_chunk_buf)
if candidate_chunk: _emit_chunk(candidate_chunk)
return chunks return chunks
def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
target = config.get("target", 400); max_tokens = config.get("max", 600) target = config.get("target", 400); max_tokens = config.get("max", 600)
chunks: List[Chunk] = []; buf: List[RawBlock] = [] chunks: List[Chunk] = []; buf: List[RawBlock] = []
for b in blocks: for b in blocks:
b_tokens = _accurate_estimate_tokens(b.text) b_tokens = _safe_estimate(b.text)
current_tokens = sum(_accurate_estimate_tokens(x.text) for x in buf) if buf else 0 current_tokens = sum(_safe_estimate(x.text) for x in buf) if buf else 0
if current_tokens + b_tokens > max_tokens and buf: if current_tokens + b_tokens > max_tokens and buf:
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) txt = "\n\n".join([x.text for x in buf]); idx = len(chunks)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
@ -129,5 +131,5 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note
if buf: if buf:
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) txt = "\n\n".join([x.text for x in buf]); idx = len(chunks)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=_accurate_estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None)) chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=_safe_estimate(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None))
return chunks return chunks