""" FILE: app/core/chunker.py DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer. FIX V3: Support für mehrzeilige Callouts und Section-Propagation. VERSION: 3.1.0 (Full Compatibility Merge) """ from __future__ import annotations from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any, Set import re import math import yaml from pathlib import Path import asyncio import logging # Services from app.services.semantic_analyzer import get_semantic_analyzer # Core Imports # Wir importieren build_edges_for_note nur, um kompatibel zur Signatur zu bleiben # oder für den Fallback. try: from app.core.derive_edges import build_edges_for_note except ImportError: # Mock für Tests def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return [] logger = logging.getLogger(__name__) # ========================================== # 1. HELPER & CONFIG # ========================================== BASE_DIR = Path(__file__).resolve().parent.parent.parent CONFIG_PATH = BASE_DIR / "config" / "types.yaml" # Fallback Default, falls types.yaml fehlt DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)} _CONFIG_CACHE = None def _load_yaml_config() -> Dict[str, Any]: global _CONFIG_CACHE if _CONFIG_CACHE is not None: return _CONFIG_CACHE if not CONFIG_PATH.exists(): return {} try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: data = yaml.safe_load(f) _CONFIG_CACHE = data return data except Exception: return {} def get_chunk_config(note_type: str) -> Dict[str, Any]: """ Lädt die Chunking-Strategie basierend auf dem Note-Type aus types.yaml. Dies sichert die Kompatibilität zu WP-15 (Profile). """ full_config = _load_yaml_config() profiles = full_config.get("chunking_profiles", {}) type_def = full_config.get("types", {}).get(note_type.lower(), {}) # Welches Profil nutzt dieser Typ? (z.B. 'sliding_smart_edges') profile_name = type_def.get("chunking_profile") if not profile_name: profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") config = profiles.get(profile_name, DEFAULT_PROFILE).copy() # Tupel-Konvertierung für Overlap (YAML liest oft Listen) if "overlap" in config and isinstance(config["overlap"], list): config["overlap"] = tuple(config["overlap"]) return config def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL) if not fm_match: return {}, md_text try: frontmatter = yaml.safe_load(fm_match.group(1)) if not isinstance(frontmatter, dict): frontmatter = {} except yaml.YAMLError: frontmatter = {} text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL) return frontmatter, text_without_fm.strip() # ========================================== # 2. DATA CLASSES & TEXT TOOLS # ========================================== _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: return max(1, math.ceil(len(text.strip()) / 4)) def split_sentences(text: str) -> list[str]: text = _WS.sub(' ', text.strip()) if not text: return [] parts = _SENT_SPLIT.split(text) return [p.strip() for p in parts if p.strip()] @dataclass class RawBlock: kind: str; text: str; level: Optional[int]; section_path: str; section_title: Optional[str] @dataclass class Chunk: id: str; note_id: str; index: int; text: str; window: str; token_count: int section_title: Optional[str]; section_path: str neighbors_prev: Optional[str]; neighbors_next: Optional[str] suggested_edges: Optional[List[str]] = None # ========================================== # 3. PARSING & STRATEGIES # ========================================== def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: """ Zerlegt Text in logische Blöcke (Absätze, Header). Wichtig für die Strategie 'by_heading'. """ blocks = [] h1_title = "Dokument" section_path = "/" current_h2 = None fm, text_without_fm = extract_frontmatter_from_text(md_text) h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() lines = text_without_fm.split('\n') buffer = [] for line in lines: stripped = line.strip() if stripped.startswith('# '): continue elif stripped.startswith('## '): if buffer: content = "\n".join(buffer).strip() if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) buffer = [] current_h2 = stripped[3:].strip() section_path = f"/{current_h2}" blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2)) elif not stripped: if buffer: content = "\n".join(buffer).strip() if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) buffer = [] else: buffer.append(line) if buffer: content = "\n".join(buffer).strip() if content: blocks.append(RawBlock("paragraph", content, None, section_path, current_h2)) return blocks, h1_title def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: """ Die Standard-Strategie aus WP-15. Fasst Blöcke zusammen und schneidet bei 'target' Tokens (mit Satz-Rücksicht). """ target = config.get("target", 400) max_tokens = config.get("max", 600) overlap_val = config.get("overlap", (50, 80)) overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val chunks = []; buf = [] def _create_chunk(txt, win, sec, path): idx = len(chunks) chunks.append(Chunk( id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, suggested_edges=[] )) def flush_buffer(): nonlocal buf if not buf: return text_body = "\n\n".join([b.text for b in buf]) sec_title = buf[-1].section_title if buf else None sec_path = buf[-1].section_path if buf else "/" # Context Prefix (z.B. H1) voranstellen für Embedding-Qualität win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body if estimate_tokens(text_body) <= max_tokens: _create_chunk(text_body, win_body, sec_title, sec_path) else: # Zu groß -> Satzweiser Split sentences = split_sentences(text_body) current_chunk_sents = [] current_len = 0 for sent in sentences: sent_len = estimate_tokens(sent) if current_len + sent_len > target and current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt _create_chunk(c_txt, c_win, sec_title, sec_path) # Overlap für nächsten Chunk overlap_sents = [] ov_len = 0 for s in reversed(current_chunk_sents): if ov_len + estimate_tokens(s) < overlap: overlap_sents.insert(0, s) ov_len += estimate_tokens(s) else: break current_chunk_sents = list(overlap_sents) current_chunk_sents.append(sent) current_len = ov_len + sent_len else: current_chunk_sents.append(sent) current_len += sent_len # Rest if current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt _create_chunk(c_txt, c_win, sec_title, sec_path) buf = [] for b in blocks: if b.kind == "heading": continue current_buf_text = "\n\n".join([x.text for x in buf]) if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target: flush_buffer() buf.append(b) if estimate_tokens(b.text) >= target: flush_buffer() flush_buffer() return chunks def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """ Strategie für strukturierte Daten (Profile, Werte). Nutzt sliding_window, forciert aber Schnitte an Headings (via parse_blocks Vorarbeit). """ return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}") # ========================================== # 4. ROBUST EDGE PARSING & PROPAGATION (NEU) # ========================================== def _parse_edges_robust(text: str) -> Set[str]: """ NEU: Findet Kanten im Text, auch wenn sie mehrzeilig oder 'kaputt' formatiert sind. Erkennt: > [!edge] type > [[Link]] Returns: Set von Strings "kind:target" """ found_edges = set() # A. Inline [[rel:type|target]] (Standard) inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text) for kind, target in inlines: k = kind.strip() t = target.strip() if k and t: found_edges.add(f"{k}:{t}") # B. Multiline Callouts Parsing (Der Fix für dein Problem) lines = text.split('\n') current_edge_type = None for line in lines: stripped = line.strip() # 1. Start Blockquote: > [!edge] type # (Erlaubt optionalen Doppelpunkt) callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped) if callout_match: current_edge_type = callout_match.group(1).strip() # Check: Sind Links noch in der GLEICHEN Zeile? links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") continue # 2. Continuation Line: > [[Target]] # Wenn wir noch im 'edge mode' sind und die Zeile ein Zitat ist if current_edge_type and stripped.startswith('>'): links = re.findall(r'\[\[([^\]]+)\]\]', stripped) for l in links: if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}") # 3. End of Blockquote (kein '>') -> Reset Type elif not stripped.startswith('>'): current_edge_type = None return found_edges def _propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]: """ NEU: Verteilt Kanten innerhalb einer Sektion. Löst das Problem: Callout steht oben im Kapitel, gilt aber für alle Chunks darunter. """ # Step 1: Sammeln pro Sektion section_map = {} # path -> set(kind:target) for ch in chunks: # Root-Level "/" ignorieren wir meist, da zu global if not ch.section_path or ch.section_path == "/": continue edges = _parse_edges_robust(ch.text) if edges: if ch.section_path not in section_map: section_map[ch.section_path] = set() section_map[ch.section_path].update(edges) # Step 2: Injizieren (Broadcasting) for ch in chunks: if ch.section_path in section_map: edges_to_add = section_map[ch.section_path] if not edges_to_add: continue injections = [] for e_str in edges_to_add: kind, target = e_str.split(':', 1) # Check: Kante schon im Text? token = f"[[rel:{kind}|{target}]]" if token not in ch.text: injections.append(token) if injections: # Wir schreiben die Kanten "hart" in den Text. # Damit findet sie derive_edges.py später garantiert. block = "\n\n\n" + " ".join(injections) ch.text += block # Auch ins Window schreiben für Embedding-Kontext ch.window += block return chunks # ========================================== # 5. ORCHESTRATION (ASYNC) # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: """ Hauptfunktion. Verbindet Parsing, Splitting und Edge-Allocation. """ # 1. Config laden (WP-15 Kompatibilität) if config is None: config = get_chunk_config(note_type) fm, body_text = extract_frontmatter_from_text(md_text) note_status = fm.get("status", "").lower() primary_strategy = config.get("strategy", "sliding_window") enable_smart_edges = config.get("enable_smart_edge_allocation", False) # Drafts skippen LLM um Kosten/Zeit zu sparen if enable_smart_edges and note_status in ["draft", "initial_gen"]: logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.") enable_smart_edges = False # 2. Parsing & Splitting blocks, doc_title = parse_blocks(md_text) if primary_strategy == "by_heading": chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) else: chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) if not chunks: return [] # 3. NEU: Propagation VOR Smart Edge Allocation # Das repariert die fehlenden Kanten aus deinen Callouts. chunks = _propagate_section_edges(chunks) # 4. Smart Edges (LLM) if enable_smart_edges: chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type) # 5. Linking for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None return chunks def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> List[str]: """ Hilfsfunktion: Sammelt ALLE Kanten für den LLM-Kandidaten-Pool. """ # A. Via derive_edges (Standard) dummy_chunk = { "chunk_id": f"{note_id}#full", "text": md_text, "content": md_text, "window": md_text, "type": note_type } # Signatur-Anpassung beachten (WP-15 Fix) raw_edges = build_edges_for_note( note_id, [dummy_chunk], note_level_references=None, include_note_scope_refs=False ) all_candidates = set() for e in raw_edges: kind = e.get("kind") target = e.get("target_id") if target and kind not in ["belongs_to", "next", "prev", "backlink"]: all_candidates.add(f"{kind}:{target}") # B. Via Robust Parser (NEU) - fängt die multiline Callouts robust_edges = _parse_edges_robust(md_text) all_candidates.update(robust_edges) return list(all_candidates) async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]: """ Der LLM-Schritt (WP-15). Filtert irrelevante Kanten. """ analyzer = get_semantic_analyzer() candidate_list = _extract_all_edges_from_md(full_text, note_id, note_type) if not candidate_list: return chunks tasks = [] for chunk in chunks: tasks.append(analyzer.assign_edges_to_chunk(chunk.text, candidate_list, note_type)) results_per_chunk = await asyncio.gather(*tasks) assigned_edges_global = set() for i, confirmed_edges in enumerate(results_per_chunk): chunk = chunks[i] chunk.suggested_edges = confirmed_edges assigned_edges_global.update(confirmed_edges) if confirmed_edges: # Wir schreiben auch Smart Edges hart in den Text injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e]) chunk.text += injection_str chunk.window += injection_str # Fallback für Kanten, die das LLM nirgendwo zugeordnet hat # (Damit nichts verloren geht -> Safety Fallback) unassigned = set(candidate_list) - assigned_edges_global if unassigned: fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e]) for chunk in chunks: chunk.text += fallback_str chunk.window += fallback_str if chunk.suggested_edges is None: chunk.suggested_edges = [] chunk.suggested_edges.extend(list(unassigned)) return chunks