Aktualisierung des Chunking-Parsers zur Verbesserung der Blockverarbeitung und Beschreibung. Anpassungen an der atomaren Sektions-Chunking-Strategie mit optimierter Token-Schätzung und neuen Hilfsfunktionen zur besseren Handhabung von großen Sektionen. Einführung einer präziseren Schätzung für deutsche Texte und Anpassungen an der Logik zur Handhabung von Sektionen.

This commit is contained in:
Lars 2025-12-29 21:45:14 +01:00
parent 96b4f65cd1
commit 680c36ab59
2 changed files with 47 additions and 75 deletions

View File

@ -1,7 +1,7 @@
"""
FILE: app/core/chunking/chunking_parser.py
DESCRIPTION: Zerlegt Markdown in logische Einheiten (RawBlocks).
Hält H1-Überschriften im Inhalts-Stream.
DESCRIPTION: Zerlegt Markdown in logische Blöcke (RawBlocks).
Gewährleistet, dass H1 und Trenner im Stream verbleiben.
"""
import re
from typing import List, Tuple, Set
@ -20,28 +20,21 @@ def split_sentences(text: str) -> list[str]:
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""Zerlegt Text in logische Einheiten (RawBlocks), inklusive H1-H6."""
blocks = []
h1_title = "Dokument"
section_path = "/"
current_section_title = None
# Frontmatter entfernen
h1_title = "Dokument"; section_path = "/"; current_section_title = None
fm, text_without_fm = extract_frontmatter_from_text(md_text)
# H1 für Note-Titel extrahieren
# H1 für Metadaten extrahieren
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match:
h1_title = h1_match.group(1).strip()
if h1_match: h1_title = h1_match.group(1).strip()
lines = text_without_fm.split('\n')
buffer = []
for line in lines:
stripped = line.strip()
# Heading-Erkennung (H1 bis H6)
heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped)
if heading_match:
# Vorherigen Text-Block abschließen
if buffer:
content = "\n".join(buffer).strip()
if content:
@ -51,19 +44,14 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
level = len(heading_match.group(1))
title = heading_match.group(2).strip()
# Pfad- und Titel-Update für die Metadaten der folgenden Blöcke
if level == 1:
current_section_title = title
section_path = "/"
current_section_title = title; section_path = "/"
elif level == 2:
current_section_title = title
section_path = f"/{current_section_title}"
current_section_title = title; section_path = f"/{current_section_title}"
# Die Überschrift selbst als Block hinzufügen
blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title))
continue
# Trenner oder Leerzeilen beenden Blöcke, außer innerhalb von Callouts
if (not stripped or stripped == "---") and not line.startswith('>'):
if buffer:
content = "\n".join(buffer).strip()
@ -77,22 +65,19 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
return blocks, h1_title
def parse_edges_robust(text: str) -> Set[str]:
"""Extrahiert Kanten-Kandidaten aus Wikilinks und Callouts."""
"""Extrahiert Kanten aus Wikilinks und Callouts."""
found_edges = set()
inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text)
for kind, target in inlines:
k = kind.strip().lower()
t = target.strip()
k = kind.strip().lower(); t = target.strip()
if k and t: found_edges.add(f"{k}:{t}")
lines = text.split('\n')
current_edge_type = None
lines = text.split('\n'); current_edge_type = None
for line in lines:
stripped = line.strip()
callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped)
@ -106,6 +91,5 @@ def parse_edges_robust(text: str) -> Set[str]:
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
for l in links:
if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}")
elif not stripped.startswith('>'):
current_edge_type = None
elif not stripped.startswith('>'): current_edge_type = None
return found_edges

View File

@ -1,15 +1,18 @@
"""
FILE: app/core/chunking/chunking_strategies.py
DESCRIPTION: Universelle Strategie für atomares Sektions-Chunking v3.5.0.
DESCRIPTION: Universelle Strategie für atomares Sektions-Chunking v3.6.0.
Garantiert Sektions-Integrität durch präventives Chunk-Management.
"""
import math
from typing import List, Dict, Any, Optional
from .chunking_models import RawBlock, Chunk
from .chunking_utils import estimate_tokens
from .chunking_parser import split_sentences
def _accurate_estimate_tokens(text: str) -> int:
"""Konservative Schätzung für deutschen Text (len/2.5 statt len/4)."""
return max(1, math.ceil(len(text.strip()) / 2.5))
def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> str:
"""Baut den Breadcrumb-Kontext für das Embedding-Fenster."""
parts = []
if doc_title: parts.append(doc_title)
if sec_title and sec_title != doc_title: parts.append(sec_title)
@ -18,8 +21,8 @@ def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) ->
def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
"""
Universelles Sektions-Chunking: Packt Sektionen in Chunks.
Bei Überlauf wird die komplette Sektion in den nächsten Chunk geschoben.
Sektions-Chunking: Packt komplette Abschnitte in Chunks.
Bei Überlauf wird die Sektion ohne Ausnahme in den nächsten Chunk geschoben.
"""
target = config.get("target", 400)
max_tokens = config.get("max", 600)
@ -29,8 +32,8 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
chunks: List[Chunk] = []
# --- HILFSFUNKTION: Erzeugt einen Chunk aus einer Blockliste ---
def _create_chunk_from_blocks(block_list: List[RawBlock]):
def _emit_chunk(block_list: List[RawBlock]):
"""Schreibt eine Liste von Blöcken als einen einzigen, ungeteilten Chunk."""
if not block_list: return
txt = "\n\n".join([b.text for b in block_list])
idx = len(chunks)
@ -39,13 +42,13 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
win = _create_context_win(doc_title, title, txt)
chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
text=txt, window=win, token_count=_accurate_estimate_tokens(txt),
section_title=title, section_path=path,
neighbors_prev=None, neighbors_next=None
))
# --- HILFSFUNKTION: Splittet eine einzelne Sektion, die > max ist ---
def _split_giant_section(sec_blocks: List[RawBlock]):
"""Notfall-Split: Nur wenn eine EINZELNE Sektion bereits > max ist."""
full_text = "\n\n".join([b.text for b in sec_blocks])
main_title = sec_blocks[0].section_title
main_path = sec_blocks[0].section_path
@ -55,85 +58,70 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
cur_sents = []; sub_len = 0
for s in sents:
slen = estimate_tokens(s)
slen = _accurate_estimate_tokens(s)
if sub_len + slen > target and cur_sents:
combined_text = " ".join(cur_sents)
_create_chunk_from_blocks([RawBlock("paragraph", combined_text, None, main_path, main_title)])
# Context Injection: Überschrift für den nächsten Teil-Chunk
_emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)])
ov_s = [header_text] if header_text else []
ov_l = estimate_tokens(header_text) if header_text else 0
ov_l = _accurate_estimate_tokens(header_text) if header_text else 0
for os in reversed(cur_sents):
if os == header_text: continue
t_len = estimate_tokens(os)
t_len = _accurate_estimate_tokens(os)
if ov_l + t_len < overlap:
ov_s.insert(len(ov_s)-1 if header_text else 0, os)
ov_l += t_len
else: break
cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen
else:
cur_sents.append(s); sub_len += slen
else: cur_sents.append(s); sub_len += slen
if cur_sents:
_create_chunk_from_blocks([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)])
if cur_sents: _emit_chunk([RawBlock("paragraph", " ".join(cur_sents), None, main_path, main_title)])
# 1. SCHRITT: Gruppierung in atomare Sektions-Einheiten
# 1. Gruppierung in atomare Einheiten
sections: List[List[RawBlock]] = []
curr_sec: List[RawBlock] = []
for b in blocks:
# Eine neue Überschrift auf oder unter dem split_level startet eine neue Sektion
if b.kind == "heading" and b.level <= split_level:
if curr_sec: sections.append(curr_sec)
curr_sec = [b]
else:
curr_sec.append(b)
else: curr_sec.append(b)
if curr_sec: sections.append(curr_sec)
# 2. SCHRITT: Sektionen in Chunks packen (Das universelle Pack-Verfahren)
# 2. Das Pack-Verfahren (Kein Zerschneiden beim Flashen!)
candidate_chunk: List[RawBlock] = []
candidate_tokens = 0
for sec in sections:
sec_text = "\n\n".join([b.text for b in sec])
sec_tokens = estimate_tokens(sec_text)
sec_tokens = _accurate_estimate_tokens(sec_text)
# Passt diese gesamte Sektion noch in den laufenden Chunk?
# Prüfung: Passt die Sektion noch dazu?
if candidate_tokens + sec_tokens <= max_tokens:
candidate_chunk.extend(sec)
candidate_tokens = estimate_tokens("\n\n".join([b.text for b in candidate_chunk]))
candidate_tokens = _accurate_estimate_tokens("\n\n".join([b.text for b in candidate_chunk]))
else:
# Falls der aktuelle Chunk nicht leer ist: Raus damit, bevor die neue Sektion kommt
# Chunk ist voll -> Abschluss an Sektionsgrenze
if candidate_chunk:
_create_chunk_from_blocks(candidate_chunk)
_emit_chunk(candidate_chunk)
candidate_chunk = []
candidate_tokens = 0
# Die neue Sektion ist nun allein. Ist sie selbst zu groß?
# Neue Sektion allein prüfen
if sec_tokens > max_tokens:
_split_giant_section(sec)
else:
candidate_chunk = list(sec)
candidate_tokens = sec_tokens
# Letzten Rest wegschreiben
if candidate_chunk:
_create_chunk_from_blocks(candidate_chunk)
if candidate_chunk: _emit_chunk(candidate_chunk)
return chunks
def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
"""Standard-Sliding-Window für flache Texte ohne Sektionsfokus."""
target = config.get("target", 400)
max_tokens = config.get("max", 600)
chunks: List[Chunk] = []
buf: List[RawBlock] = []
target = config.get("target", 400); max_tokens = config.get("max", 600)
chunks: List[Chunk] = []; buf: List[RawBlock] = []
for b in blocks:
b_tokens = estimate_tokens(b.text)
current_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0
b_tokens = _accurate_estimate_tokens(b.text)
current_tokens = sum(_accurate_estimate_tokens(x.text) for x in buf) if buf else 0
if current_tokens + b_tokens > max_tokens and buf:
txt = "\n\n".join([x.text for x in buf])
idx = len(chunks)
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=current_tokens, section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None))
buf = []
@ -141,5 +129,5 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note
if buf:
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None))
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=_accurate_estimate_tokens(txt), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None))
return chunks