diff --git a/app/core/chunker.py b/app/core/chunker.py index 1d6f625..07b5f47 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -1,11 +1,9 @@ """ FILE: app/core/chunker.py -DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer. -VERSION: 2.9.0 (Feat: Hybrid Strict Splitting with Size Safety) -STATUS: Active -DEPENDENCIES: app.services.semantic_analyzer, app.core.derive_edges, markdown_it, yaml, asyncio -EXTERNAL_CONFIG: config/types.yaml -LAST_ANALYSIS: 2025-12-16 +DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). + Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer. + FIX V3: Support für mehrzeilige Callouts und Section-Propagation. +VERSION: 3.1.0 (Full Compatibility Merge) """ from __future__ import annotations @@ -22,6 +20,8 @@ import logging from app.services.semantic_analyzer import get_semantic_analyzer # Core Imports +# Wir importieren build_edges_for_note nur, um kompatibel zur Signatur zu bleiben +# oder für den Fallback. try: from app.core.derive_edges import build_edges_for_note except ImportError: @@ -36,6 +36,7 @@ logger = logging.getLogger(__name__) BASE_DIR = Path(__file__).resolve().parent.parent.parent CONFIG_PATH = BASE_DIR / "config" / "types.yaml" +# Fallback Default, falls types.yaml fehlt DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)} _CONFIG_CACHE = None @@ -51,16 +52,26 @@ def _load_yaml_config() -> Dict[str, Any]: except Exception: return {} def get_chunk_config(note_type: str) -> Dict[str, Any]: + """ + Lädt die Chunking-Strategie basierend auf dem Note-Type aus types.yaml. + Dies sichert die Kompatibilität zu WP-15 (Profile). + """ full_config = _load_yaml_config() profiles = full_config.get("chunking_profiles", {}) type_def = full_config.get("types", {}).get(note_type.lower(), {}) + + # Welches Profil nutzt dieser Typ? (z.B. 'sliding_smart_edges') profile_name = type_def.get("chunking_profile") + if not profile_name: profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") config = profiles.get(profile_name, DEFAULT_PROFILE).copy() + + # Tupel-Konvertierung für Overlap (YAML liest oft Listen) if "overlap" in config and isinstance(config["overlap"], list): config["overlap"] = tuple(config["overlap"]) + return config def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: @@ -75,7 +86,7 @@ def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: return frontmatter, text_without_fm.strip() # ========================================== -# 2. DATA CLASSES +# 2. DATA CLASSES & TEXT TOOLS # ========================================== _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+') @@ -105,7 +116,10 @@ class Chunk: # ========================================== def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: - """Zerlegt Text in logische Blöcke (Absätze, Header).""" + """ + Zerlegt Text in logische Blöcke (Absätze, Header). + Wichtig für die Strategie 'by_heading'. + """ blocks = [] h1_title = "Dokument" section_path = "/" @@ -122,14 +136,8 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: for line in lines: stripped = line.strip() - if stripped.startswith('# '): - if buffer: - content = "\n".join(buffer).strip() - if content: - blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) - buffer = [] - blocks.append(RawBlock("heading", stripped, 1, section_path, current_h2)) - + if stripped.startswith('# '): + continue elif stripped.startswith('## '): if buffer: content = "\n".join(buffer).strip() @@ -139,15 +147,6 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: current_h2 = stripped[3:].strip() section_path = f"/{current_h2}" blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2)) - - elif stripped.startswith('### '): - if buffer: - content = "\n".join(buffer).strip() - if content: - blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) - buffer = [] - blocks.append(RawBlock("heading", stripped, 3, section_path, current_h2)) - elif not stripped: if buffer: content = "\n".join(buffer).strip() @@ -164,37 +163,41 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: return blocks, h1_title -def _create_chunk_obj(chunks_list: List[Chunk], note_id: str, txt: str, win: str, sec: Optional[str], path: str): - idx = len(chunks_list) - chunks_list.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=estimate_tokens(txt), - section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, - suggested_edges=[] - )) - def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: + """ + Die Standard-Strategie aus WP-15. + Fasst Blöcke zusammen und schneidet bei 'target' Tokens (mit Satz-Rücksicht). + """ target = config.get("target", 400) max_tokens = config.get("max", 600) overlap_val = config.get("overlap", (50, 80)) overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val chunks = []; buf = [] + def _create_chunk(txt, win, sec, path): + idx = len(chunks) + chunks.append(Chunk( + id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, + text=txt, window=win, token_count=estimate_tokens(txt), + section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, + suggested_edges=[] + )) + def flush_buffer(): nonlocal buf if not buf: return text_body = "\n\n".join([b.text for b in buf]) + sec_title = buf[-1].section_title if buf else None + sec_path = buf[-1].section_path if buf else "/" + + # Context Prefix (z.B. H1) voranstellen für Embedding-Qualität win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body - # Basis-Info vom ersten Block im Buffer - sec = buf[0].section_title if buf else None - path = buf[0].section_path if buf else "/" - if estimate_tokens(text_body) <= max_tokens: - _create_chunk_obj(chunks, note_id, text_body, win_body, sec, path) + _create_chunk(text_body, win_body, sec_title, sec_path) else: - # Fallback: Wenn Block zu groß, intern splitten (Sentence-Level) + # Zu groß -> Satzweiser Split sentences = split_sentences(text_body) current_chunk_sents = [] current_len = 0 @@ -204,8 +207,9 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not if current_len + sent_len > target and current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk_obj(chunks, note_id, c_txt, c_win, sec, path) + _create_chunk(c_txt, c_win, sec_title, sec_path) + # Overlap für nächsten Chunk overlap_sents = [] ov_len = 0 for s in reversed(current_chunk_sents): @@ -222,117 +226,139 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not current_chunk_sents.append(sent) current_len += sent_len + # Rest if current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk_obj(chunks, note_id, c_txt, c_win, sec, path) + _create_chunk(c_txt, c_win, sec_title, sec_path) buf = [] for b in blocks: - if b.kind == "heading": - flush_buffer() - + if b.kind == "heading": continue current_buf_text = "\n\n".join([x.text for x in buf]) - if buf and (estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target): + if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target: flush_buffer() - buf.append(b) - + if estimate_tokens(b.text) >= target: + flush_buffer() + flush_buffer() return chunks def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ - MODUS: Structured / Heading Split - - split_level: Ebene für logische Trennung (z.B. H2). - - strict_heading_split: - True: Trennt an jedem Header <= split_level. - NEU v2.9: Wenn Inhalt > max_tokens, wird trotzdem gesplittet (Safety Split). - False: Fasst zusammen bis 'target' erreicht ist. + Strategie für strukturierte Daten (Profile, Werte). + Nutzt sliding_window, forciert aber Schnitte an Headings (via parse_blocks Vorarbeit). """ - split_level = config.get("split_level", 2) - target = config.get("target", 400) - max_limit = config.get("max", 600) - strict_mode = config.get("strict_heading_split", False) + return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}") + +# ========================================== +# 4. ROBUST EDGE PARSING & PROPAGATION (NEU) +# ========================================== + +def _parse_edges_robust(text: str) -> Set[str]: + """ + NEU: Findet Kanten im Text, auch wenn sie mehrzeilig oder 'kaputt' formatiert sind. + Erkennt: + > [!edge] type + > [[Link]] + Returns: Set von Strings "kind:target" + """ + found_edges = set() - chunks = [] - current_chunk_blocks = [] + # A. Inline [[rel:type|target]] (Standard) + inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) + for kind, target in inlines: + k = kind.strip() + t = target.strip() + if k and t: found_edges.add(f"{k}:{t}") + + # B. Multiline Callouts Parsing (Der Fix für dein Problem) + lines = text.split('\n') + current_edge_type = None - context_prefix = f"# {doc_title}" - - def has_content(blk_list): - return any(b.kind != "heading" for b in blk_list) - - def flush_current_chunk(): - nonlocal current_chunk_blocks - if not current_chunk_blocks: - return + for line in lines: + stripped = line.strip() - text_body = "\n\n".join([b.text for b in current_chunk_blocks]) - win_body = f"{context_prefix}\n{text_body}".strip() - - first_b = current_chunk_blocks[0] - sec = first_b.section_title - path = first_b.section_path - - _create_chunk_obj(chunks, note_id, text_body, win_body, sec, path) - current_chunk_blocks = [] - - def get_current_size(): - txt = "\n\n".join([b.text for b in current_chunk_blocks]) - return estimate_tokens(txt) - - for b in blocks: - # 1. Header Logic (Struktur-Trigger) - is_splitter = (b.kind == "heading" and b.level is not None and b.level <= split_level) - - if is_splitter: - is_higher_hierarchy = (b.level < split_level) + # 1. Start Blockquote: > [!edge] type + # (Erlaubt optionalen Doppelpunkt) + callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) + if callout_match: + current_edge_type = callout_match.group(1).strip() - if strict_mode: - # STRICT: - # Wir splitten immer, außer der Vor-Chunk ist leer. - if current_chunk_blocks and has_content(current_chunk_blocks): - flush_current_chunk() - current_chunk_blocks.append(b) - else: - # SOFT: - # Split bei Hierarchie-Wechsel ODER wenn voll. - if is_higher_hierarchy: - flush_current_chunk() - current_chunk_blocks.append(b) - elif current_chunk_blocks and get_current_size() >= target: - flush_current_chunk() - current_chunk_blocks.append(b) - else: - current_chunk_blocks.append(b) - else: - # 2. Content Logic (Safety Trigger für Monster-Abschnitte) - # Bevor wir den Block anhängen: Würde er das Fass zum Überlaufen bringen? - # Wir nutzen hier 'max' als harte Grenze für den Safety-Split. - current_size = get_current_size() - block_size = estimate_tokens(b.text) + # Check: Sind Links noch in der GLEICHEN Zeile? + links = re.findall(r'\[\[([^\]]+)\]\]', stripped) + for l in links: + if "rel:" not in l: + found_edges.add(f"{current_edge_type}:{l}") + continue - if current_chunk_blocks and (current_size + block_size > max_limit): - # NOTBREMSE: Chunk wird zu groß. - # Wir splitten hier, auch wenn kein Header da ist. - # Der Kontext (Section Title) bleibt erhalten, da er aus `current_h2` kommt (siehe parse_blocks). - flush_current_chunk() - current_chunk_blocks.append(b) - else: - current_chunk_blocks.append(b) - - # Letzten Rest flushen - flush_current_chunk() + # 2. Continuation Line: > [[Target]] + # Wenn wir noch im 'edge mode' sind und die Zeile ein Zitat ist + if current_edge_type and stripped.startswith('>'): + links = re.findall(r'\[\[([^\]]+)\]\]', stripped) + for l in links: + if "rel:" not in l: + found_edges.add(f"{current_edge_type}:{l}") + + # 3. End of Blockquote (kein '>') -> Reset Type + elif not stripped.startswith('>'): + current_edge_type = None + + return found_edges +def _propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]: + """ + NEU: Verteilt Kanten innerhalb einer Sektion. + Löst das Problem: Callout steht oben im Kapitel, gilt aber für alle Chunks darunter. + """ + # Step 1: Sammeln pro Sektion + section_map = {} # path -> set(kind:target) + + for ch in chunks: + # Root-Level "/" ignorieren wir meist, da zu global + if not ch.section_path or ch.section_path == "/": continue + + edges = _parse_edges_robust(ch.text) + if edges: + if ch.section_path not in section_map: + section_map[ch.section_path] = set() + section_map[ch.section_path].update(edges) + + # Step 2: Injizieren (Broadcasting) + for ch in chunks: + if ch.section_path in section_map: + edges_to_add = section_map[ch.section_path] + if not edges_to_add: continue + + injections = [] + for e_str in edges_to_add: + kind, target = e_str.split(':', 1) + # Check: Kante schon im Text? + token = f"[[rel:{kind}|{target}]]" + if token not in ch.text: + injections.append(token) + + if injections: + # Wir schreiben die Kanten "hart" in den Text. + # Damit findet sie derive_edges.py später garantiert. + block = "\n\n\n" + " ".join(injections) + ch.text += block + # Auch ins Window schreiben für Embedding-Kontext + ch.window += block + return chunks # ========================================== -# 4. ORCHESTRATION (ASYNC) +# 5. ORCHESTRATION (ASYNC) # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: + """ + Hauptfunktion. Verbindet Parsing, Splitting und Edge-Allocation. + """ + # 1. Config laden (WP-15 Kompatibilität) if config is None: config = get_chunk_config(note_type) @@ -342,10 +368,12 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op primary_strategy = config.get("strategy", "sliding_window") enable_smart_edges = config.get("enable_smart_edge_allocation", False) + # Drafts skippen LLM um Kosten/Zeit zu sparen if enable_smart_edges and note_status in ["draft", "initial_gen"]: logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.") enable_smart_edges = False + # 2. Parsing & Splitting blocks, doc_title = parse_blocks(md_text) if primary_strategy == "by_heading": @@ -356,9 +384,15 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op if not chunks: return [] + # 3. NEU: Propagation VOR Smart Edge Allocation + # Das repariert die fehlenden Kanten aus deinen Callouts. + chunks = _propagate_section_edges(chunks) + + # 4. Smart Edges (LLM) if enable_smart_edges: chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type) + # 5. Linking for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None @@ -366,13 +400,18 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op return chunks def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> List[str]: + """ + Hilfsfunktion: Sammelt ALLE Kanten für den LLM-Kandidaten-Pool. + """ + # A. Via derive_edges (Standard) dummy_chunk = { "chunk_id": f"{note_id}#full", "text": md_text, - "content": md_text, + "content": md_text, "window": md_text, "type": note_type } + # Signatur-Anpassung beachten (WP-15 Fix) raw_edges = build_edges_for_note( note_id, [dummy_chunk], @@ -385,9 +424,17 @@ def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> Li target = e.get("target_id") if target and kind not in ["belongs_to", "next", "prev", "backlink"]: all_candidates.add(f"{kind}:{target}") + + # B. Via Robust Parser (NEU) - fängt die multiline Callouts + robust_edges = _parse_edges_robust(md_text) + all_candidates.update(robust_edges) + return list(all_candidates) async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]: + """ + Der LLM-Schritt (WP-15). Filtert irrelevante Kanten. + """ analyzer = get_semantic_analyzer() candidate_list = _extract_all_edges_from_md(full_text, note_id, note_type) @@ -408,10 +455,13 @@ async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_i assigned_edges_global.update(confirmed_edges) if confirmed_edges: + # Wir schreiben auch Smart Edges hart in den Text injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e]) chunk.text += injection_str chunk.window += injection_str + # Fallback für Kanten, die das LLM nirgendwo zugeordnet hat + # (Damit nichts verloren geht -> Safety Fallback) unassigned = set(candidate_list) - assigned_edges_global if unassigned: fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e])