From 8fadec5c2c4a39b64c72ab3e834dc663fa01e2a8 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 16 Dec 2025 07:25:34 +0100 Subject: [PATCH] chunker korrigiert zu scmart edges --- app/core/chunker.py | 137 ++++++++++++++++++++++++++++++++------------ 1 file changed, 100 insertions(+), 37 deletions(-) diff --git a/app/core/chunker.py b/app/core/chunker.py index 8348715..0943010 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -1,11 +1,11 @@ """ FILE: app/core/chunker.py DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer. -VERSION: 2.5.0 +VERSION: 2.6.0 (Fix: Strict Heading Split & Header Retention) STATUS: Active DEPENDENCIES: app.services.semantic_analyzer, app.core.derive_edges, markdown_it, yaml, asyncio EXTERNAL_CONFIG: config/types.yaml -LAST_ANALYSIS: 2025-12-15 +LAST_ANALYSIS: 2025-12-16 """ from __future__ import annotations @@ -15,8 +15,6 @@ import re import math import yaml from pathlib import Path -from markdown_it import MarkdownIt -from markdown_it.token import Token import asyncio import logging @@ -27,7 +25,7 @@ from app.services.semantic_analyzer import get_semantic_analyzer try: from app.core.derive_edges import build_edges_for_note except ImportError: - # Mock für Tests + # Mock für Tests, falls Module fehlen def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return [] logger = logging.getLogger(__name__) @@ -103,7 +101,7 @@ class Chunk: suggested_edges: Optional[List[str]] = None # ========================================== -# 3. PARSING & STRATEGIES (SYNCHRON) +# 3. PARSING & STRATEGIES # ========================================== def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: @@ -125,6 +123,8 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: for line in lines: stripped = line.strip() if stripped.startswith('# '): + # H1 wird für den Titel genutzt, aber nicht als Block für sliding window + # (Außer es ist H1 im Body, aber wir ignorieren H1 hier meist als Title) continue elif stripped.startswith('## '): if buffer: @@ -134,6 +134,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: buffer = [] current_h2 = stripped[3:].strip() section_path = f"/{current_h2}" + # WICHTIG: Die Überschrift selbst als Block speichern! blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2)) elif not stripped: if buffer: @@ -151,6 +152,15 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: return blocks, h1_title +def _create_chunk_obj(chunks_list: List[Chunk], note_id: str, txt: str, win: str, sec: Optional[str], path: str): + idx = len(chunks_list) + chunks_list.append(Chunk( + id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, + text=txt, window=win, token_count=estimate_tokens(txt), + section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, + suggested_edges=[] + )) + def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: target = config.get("target", 400) max_tokens = config.get("max", 600) @@ -158,15 +168,6 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val chunks = []; buf = [] - def _create_chunk(txt, win, sec, path): - idx = len(chunks) - chunks.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=estimate_tokens(txt), - section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, - suggested_edges=[] - )) - def flush_buffer(): nonlocal buf if not buf: return @@ -175,18 +176,24 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body if estimate_tokens(text_body) <= max_tokens: - _create_chunk(text_body, win_body, buf[-1].section_title, buf[-1].section_path) + sec = buf[0].section_title if buf else None + path = buf[0].section_path if buf else "/" + _create_chunk_obj(chunks, note_id, text_body, win_body, sec, path) else: sentences = split_sentences(text_body) current_chunk_sents = [] current_len = 0 + # Basis-Info vom ersten Block im Buffer + sec = buf[0].section_title if buf else None + path = buf[0].section_path if buf else "/" + for sent in sentences: sent_len = estimate_tokens(sent) if current_len + sent_len > target and current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path) + _create_chunk_obj(chunks, note_id, c_txt, c_win, sec, path) overlap_sents = [] ov_len = 0 @@ -207,27 +214,81 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not if current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path) + _create_chunk_obj(chunks, note_id, c_txt, c_win, sec, path) buf = [] for b in blocks: - if b.kind == "heading": continue + # Bei Sliding Window ignorieren wir Heading-Blocks als Split-Trigger NICHT zwingend, + # aber wir wollen Headings oft nicht "allein" stehen haben. + # Hier einfache Logik: + if b.kind == "heading": + # Optional: Buffer flushen bei neuem Header, um Kontextwechsel sauberer zu machen + flush_buffer() + current_buf_text = "\n\n".join([x.text for x in buf]) - if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target: + if buf and (estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target): flush_buffer() + buf.append(b) - if estimate_tokens(b.text) >= target: - flush_buffer() - + flush_buffer() return chunks def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: - return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}") + """ + STRICT HEADING SPLIT (Fix v2.6.0): + Trennt den Text konsequent an jeder Überschrift der definierten Ebene. + Behält Überschriften als Teil (erste Zeile) des Chunks bei. + Kein Merging kleiner Abschnitte über Header-Grenzen hinweg. + """ + split_level = config.get("split_level", 2) + chunks = [] + + # Temporärer Speicher für den aktuellen Chunk + current_chunk_blocks = [] + + context_prefix = f"# {doc_title}" + + def flush_current_chunk(): + nonlocal current_chunk_blocks + if not current_chunk_blocks: + return + + # Text zusammenbauen + text_body = "\n\n".join([b.text for b in current_chunk_blocks]) + # Window bauen (hier einfach Text, da Kontext via Header implizit ist) + win_body = f"{context_prefix}\n{text_body}".strip() + + # Metadaten vom ersten Block (üblicherweise der Header) nehmen + first_b = current_chunk_blocks[0] + sec = first_b.section_title + path = first_b.section_path + + _create_chunk_obj(chunks, note_id, text_body, win_body, sec, path) + current_chunk_blocks = [] + + for b in blocks: + # Prüfen, ob dieser Block ein Trenner (Header auf Split-Level) ist + is_splitter = (b.kind == "heading" and b.level == split_level) + + if is_splitter: + # 1. Den bisherigen Chunk abschließen (falls vorhanden) + flush_current_chunk() + + # 2. Den neuen Chunk mit diesem Header beginnen + current_chunk_blocks.append(b) + else: + # Einfach anhängen + current_chunk_blocks.append(b) + + # Letzten Rest flushen + flush_current_chunk() + + return chunks # ========================================== -# 4. ORCHESTRATION (ASYNC) - WP-15 CORE +# 4. ORCHESTRATION (ASYNC) # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: @@ -240,12 +301,14 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op primary_strategy = config.get("strategy", "sliding_window") enable_smart_edges = config.get("enable_smart_edge_allocation", False) + # Performance/Cost-Guard: Bei Entwürfen keine Smart Edges if enable_smart_edges and note_status in ["draft", "initial_gen"]: logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.") enable_smart_edges = False blocks, doc_title = parse_blocks(md_text) + # Strategie-Auswahl if primary_strategy == "by_heading": chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) else: @@ -254,10 +317,11 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op if not chunks: return [] + # Smart Edge Allocation (WP-15) if enable_smart_edges: - # Hier rufen wir nun die Smart Edge Allocation auf chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type) + # Verkettung der Chunks (next/prev) for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None @@ -269,30 +333,25 @@ def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> Li Hilfsfunktion: Erstellt einen Dummy-Chunk für den gesamten Text und ruft den Edge-Parser auf, um ALLE Kanten der Notiz zu finden. """ - # 1. Dummy Chunk erstellen, der den gesamten Text enthält - # Das ist notwendig, da build_edges_for_note Kanten nur aus Chunks extrahiert. dummy_chunk = { "chunk_id": f"{note_id}#full", "text": md_text, - "content": md_text, # Sicherstellen, dass der Parser Text findet + "content": md_text, "window": md_text, "type": note_type } - - # 2. Aufruf des Parsers (Signatur-Fix!) - # derive_edges.py: build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False) + # Parsing aller Kanten (Inline, Wikilinks, Callouts) raw_edges = build_edges_for_note( note_id, [dummy_chunk], note_level_references=None, include_note_scope_refs=False ) - - # 3. Kanten extrahieren all_candidates = set() for e in raw_edges: kind = e.get("kind") target = e.get("target_id") + # Struktur-Kanten ignorieren wir für die Verteilung if target and kind not in ["belongs_to", "next", "prev", "backlink"]: all_candidates.add(f"{kind}:{target}") @@ -301,7 +360,7 @@ def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> Li async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]: analyzer = get_semantic_analyzer() - # A. Alle potenziellen Kanten der Notiz sammeln (über den Dummy-Chunk Trick) + # A. Alle potenziellen Kanten der Notiz sammeln candidate_list = _extract_all_edges_from_md(full_text, note_id, note_type) if not candidate_list: @@ -314,7 +373,7 @@ async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_i results_per_chunk = await asyncio.gather(*tasks) - # C. Injection & Fallback + # C. Injection & Fallback Tracking assigned_edges_global = set() for i, confirmed_edges in enumerate(results_per_chunk): @@ -322,14 +381,18 @@ async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_i chunk.suggested_edges = confirmed_edges assigned_edges_global.update(confirmed_edges) + # Injection: Wir hängen die bestätigten Edges unsichtbar (fürs Embedding) oder sichtbar an + # Hier als "Pseudo-Code" im Text, damit sie embedded werden. if confirmed_edges: + # Format: [[rel:kind|target]] injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e]) chunk.text += injection_str chunk.window += injection_str - # D. Fallback: Unassigned Kanten überall hin + # D. Fallback: Kanten, die NIRGENDS zugewiesen wurden, werden JEDEM Chunk angehängt (Sicherheit) unassigned = set(candidate_list) - assigned_edges_global if unassigned: + logger.info(f"Chunker: {len(unassigned)} unassigned edges in {note_id}. Distributing to all chunks.") fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e]) for chunk in chunks: chunk.text += fallback_str