Verbesserung des Chunking-Parsers zur Unterstützung von H1-Überschriften und Anpassung der Metadatenlogik. Implementierung einer atomaren Sektions-Chunking-Strategie, die Überschriften und deren Inhalte zusammenhält.

This commit is contained in:
Lars 2025-12-29 20:33:43 +01:00
parent 8f5eb36b5f
commit 838083b909
2 changed files with 61 additions and 48 deletions

View File

@ -17,53 +17,58 @@ def split_sentences(text: str) -> list[str]:
return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()] return [p.strip() for p in _SENT_SPLIT.split(text) if p.strip()]
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""Zerlegt Text in logische Einheiten.""" """Zerlegt Text in logische Einheiten, inklusive H1."""
blocks = [] blocks = []
h1_title = "Dokument"; section_path = "/"; current_h2 = None h1_title = "Dokument"; section_path = "/"; current_section_title = None
fm, text_without_fm = extract_frontmatter_from_text(md_text) fm, text_without_fm = extract_frontmatter_from_text(md_text)
# H1 für Note-Metadaten extrahieren
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match: h1_title = h1_match.group(1).strip() if h1_match: h1_title = h1_match.group(1).strip()
lines = text_without_fm.split('\n') lines = text_without_fm.split('\n')
buffer = [] buffer = []
for line in lines: for line in lines:
stripped = line.strip() stripped = line.strip()
# H1 ignorieren (ist Doc Title) # Heading-Erkennung (H1 bis H6)
if stripped.startswith('# '): heading_match = re.match(r'^(#{1,6})\s+(.*)', stripped)
continue
# Generische Heading-Erkennung (H2 bis H6) für flexible Split-Levels
heading_match = re.match(r'^(#{2,6})\s+(.*)', stripped)
if heading_match: if heading_match:
# Buffer leeren (vorherigen Text abschließen)
if buffer: if buffer:
content = "\n".join(buffer).strip() content = "\n".join(buffer).strip()
if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
buffer = [] buffer = []
level = len(heading_match.group(1)) level = len(heading_match.group(1))
title = heading_match.group(2).strip() title = heading_match.group(2).strip()
# Pfad-Logik: H2 setzt den Haupt-Pfad # Metadaten-Update
if level == 2: if level == 1:
current_h2 = title current_section_title = title
section_path = f"/{current_h2}" section_path = "/"
# Bei H3+ bleibt der section_path beim Parent, aber das Level wird korrekt gesetzt elif level == 2:
current_section_title = title
section_path = f"/{current_section_title}"
blocks.append(RawBlock("heading", stripped, level, section_path, current_h2)) blocks.append(RawBlock("heading", stripped, level, section_path, current_section_title))
continue
elif not stripped:
if not stripped:
if buffer: if buffer:
content = "\n".join(buffer).strip() content = "\n".join(buffer).strip()
if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
buffer = [] buffer = []
else: else:
buffer.append(line) buffer.append(line)
if buffer: if buffer:
content = "\n".join(buffer).strip() content = "\n".join(buffer).strip()
if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_section_title))
return blocks, h1_title return blocks, h1_title
def parse_edges_robust(text: str) -> Set[str]: def parse_edges_robust(text: str) -> Set[str]:

View File

@ -20,7 +20,8 @@ def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) ->
def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
""" """
Splittet Text basierend auf Markdown-Überschriften mit atomarem Block-Erhalt. Implementiert atomares Sektions-Chunking.
Hält Überschriften und ihren Inhalt (inkl. Edges) zusammen.
""" """
strict = config.get("strict_heading_split", False) strict = config.get("strict_heading_split", False)
target = config.get("target", 400) target = config.get("target", 400)
@ -45,23 +46,21 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
def _flush(): def _flush():
nonlocal buf, cur_tokens nonlocal buf, cur_tokens
if not buf: return if not buf: return
# Metadaten stammen immer vom ersten Block im Puffer (meist die Überschrift)
main_title = buf[0].section_title main_title = buf[0].section_title
main_path = buf[0].section_path main_path = buf[0].section_path
full_text = "\n\n".join([b.text for b in buf]) full_text = "\n\n".join([b.text for b in buf])
# Falls der gesamte Puffer in einen Chunk passt
if estimate_tokens(full_text) <= max_tokens: if estimate_tokens(full_text) <= max_tokens:
_add_to_chunks(full_text, main_title, main_path) _add_to_chunks(full_text, main_title, main_path)
else: else:
# Nur wenn ein einzelner Abschnitt größer als 'max' ist, wird intern gesplittet # Fallback: Nur wenn eine Sektion ALLEINE zu groß ist, wird intern gesplittet
sents = split_sentences(full_text) sents = split_sentences(full_text)
cur_sents = []; sub_len = 0 cur_sents = []; sub_len = 0
for s in sents: for s in sents:
slen = estimate_tokens(s) slen = estimate_tokens(s)
if sub_len + slen > target and cur_sents: if sub_len + slen > target and cur_sents:
_add_to_chunks(" ".join(cur_sents), main_title, main_path) _add_to_chunks(" ".join(cur_sents), main_title, main_path)
# Overlap-Logik...
ov_s = []; ov_l = 0 ov_s = []; ov_l = 0
for os in reversed(cur_sents): for os in reversed(cur_sents):
if ov_l + estimate_tokens(os) < overlap: if ov_l + estimate_tokens(os) < overlap:
@ -70,34 +69,43 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen
else: cur_sents.append(s); sub_len += slen else: cur_sents.append(s); sub_len += slen
if cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path) if cur_sents: _add_to_chunks(" ".join(cur_sents), main_title, main_path)
buf = []; cur_tokens = 0 buf = []; cur_tokens = 0
# SCHRITT 1: Gruppierung in atomare Sektions-Einheiten
sections = []
curr_sec = []
for b in blocks: for b in blocks:
b_tokens = estimate_tokens(b.text) # Ein Split-Trigger startet eine neue Sektion
if b.kind == "heading" and b.level <= split_level:
# Prüfung auf Split-Trigger (Überschriften) if curr_sec: sections.append(curr_sec)
is_split_trigger = False curr_sec = [b]
if b.kind == "heading":
if b.level < split_level:
is_split_trigger = True
elif b.level == split_level:
if strict or cur_tokens >= target:
is_split_trigger = True
if is_split_trigger:
_flush() # Vorherigen Puffer leeren
buf.append(b) # Neue Überschrift in den neuen Puffer aufnehmen
cur_tokens = b_tokens
else: else:
# Atomarer Check: Wenn der neue Block den aktuellen Chunk sprengen würde curr_sec.append(b)
if cur_tokens + b_tokens > max_tokens and buf: if curr_sec: sections.append(curr_sec)
_flush() # Puffer leeren, Block 'b' wird Teil des nächsten Chunks
buf.append(b)
cur_tokens += b_tokens
_flush() # Letzten Puffer leeren # SCHRITT 2: Verarbeitung der Sektionen mit Vorausschau
for sec in sections:
sec_tokens = sum(estimate_tokens(b.text) for b in sec)
if buf:
# PRÜFUNG: Passt die gesamte Sektion noch in den aktuellen Chunk?
if cur_tokens + sec_tokens > max_tokens:
_flush()
# PRÜFUNG: Harter Split gefordert?
elif strict:
_flush()
# PRÜFUNG: Weicher Split (Target erreicht)?
elif cur_tokens >= target:
_flush()
buf.extend(sec)
cur_tokens += sec_tokens
# Falls die Sektion selbst das Limit sprengt, sofort flashen
if cur_tokens >= max_tokens:
_flush()
_flush()
return chunks return chunks
def strategy_sliding_window(blocks: List[RawBlock], def strategy_sliding_window(blocks: List[RawBlock],