from __future__ import annotations from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any, Set import re import math import yaml from pathlib import Path from markdown_it import MarkdownIt from markdown_it.token import Token import asyncio import logging # Services from app.services.semantic_analyzer import get_semantic_analyzer # Core Imports (mit Fehlerbehandlung für Tests) try: from app.core.derive_edges import build_edges_for_note except ImportError: # Mock für Standalone-Tests ohne vollständige App-Struktur def build_edges_for_note(*args, **kwargs): return [] logger = logging.getLogger(__name__) # ========================================== # 1. HELPER & CONFIG # ========================================== BASE_DIR = Path(__file__).resolve().parent.parent.parent CONFIG_PATH = BASE_DIR / "config" / "types.yaml" DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)} _CONFIG_CACHE = None def _load_yaml_config() -> Dict[str, Any]: global _CONFIG_CACHE if _CONFIG_CACHE is not None: return _CONFIG_CACHE if not CONFIG_PATH.exists(): return {} try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: data = yaml.safe_load(f) _CONFIG_CACHE = data return data except Exception: return {} def get_chunk_config(note_type: str) -> Dict[str, Any]: full_config = _load_yaml_config() profiles = full_config.get("chunking_profiles", {}) type_def = full_config.get("types", {}).get(note_type.lower(), {}) profile_name = type_def.get("chunking_profile") if not profile_name: profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") config = profiles.get(profile_name, DEFAULT_PROFILE).copy() if "overlap" in config and isinstance(config["overlap"], list): config["overlap"] = tuple(config["overlap"]) return config def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL) if not fm_match: return {}, md_text try: frontmatter = yaml.safe_load(fm_match.group(1)) if not isinstance(frontmatter, dict): frontmatter = {} except yaml.YAMLError: frontmatter = {} text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL) return frontmatter, text_without_fm.strip() # ========================================== # 2. DATA CLASSES # ========================================== _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: return max(1, math.ceil(len(text.strip()) / 4)) def split_sentences(text: str) -> list[str]: text = _WS.sub(' ', text.strip()) if not text: return [] parts = _SENT_SPLIT.split(text) return [p.strip() for p in parts if p.strip()] @dataclass class RawBlock: kind: str; text: str; level: Optional[int]; section_path: str; section_title: Optional[str] @dataclass class Chunk: id: str; note_id: str; index: int; text: str; window: str; token_count: int section_title: Optional[str]; section_path: str neighbors_prev: Optional[str]; neighbors_next: Optional[str] # NEU: Speichert Kanten, die der Algorithmus diesem Chunk zugewiesen hat suggested_edges: Optional[List[str]] = None # ========================================== # 3. PARSING & STRATEGIES (SYNCHRON) # ========================================== def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: md = MarkdownIt("commonmark").enable("table") tokens = md.parse(md_text) blocks = []; h1_title = "Dokument"; h2 = None; section_path = "/" fm, text_without_fm = extract_frontmatter_from_text(md_text) # Fallback Body Block if text_without_fm.strip(): blocks.append(RawBlock("paragraph", text_without_fm.strip(), None, section_path, h2)) # Versuche echten Titel zu finden h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() return blocks, h1_title def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: target = config.get("target", 400); max_tokens = config.get("max", 600) overlap_val = config.get("overlap", (50, 80)) overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val chunks = []; buf = [] def _add_chunk(txt, win, sec, path): chunks.append(Chunk( id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, suggested_edges=[] )) def flush_buffer(): nonlocal buf if not buf: return text_body = "\n\n".join([b.text for b in buf]) win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body # Simple Logic for brevity: Just add chunk if small enough, else split sentences if estimate_tokens(text_body) <= max_tokens: _add_chunk(text_body, win_body, buf[-1].section_title, buf[-1].section_path) else: # Fallback naive split _add_chunk(text_body[:max_tokens*4], win_body[:max_tokens*4], buf[-1].section_title, buf[-1].section_path) buf = [] for b in blocks: if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target: flush_buffer() buf.append(b) flush_buffer() return chunks def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: # Wrapper für Struktur-basiertes Chunking # Im echten System ist hier die komplexe Logik. Wir nutzen hier sliding_window als Fallback. return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}") # ========================================== # 4. ORCHESTRATION (ASYNC) - WP-15 CORE # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: """ Hauptfunktion. Orchestriert das Chunking. Unterstützt Dependency Injection für Config (Tests). """ # 1. Config & Status if config is None: config = get_chunk_config(note_type) fm, body_text = extract_frontmatter_from_text(md_text) note_status = fm.get("status", "").lower() primary_strategy = config.get("strategy", "sliding_window") enable_smart_edges = config.get("enable_smart_edge_allocation", False) # 2. Safety Override: Keine AI-Allocation bei Drafts (spart Ressourcen/Zeit) if enable_smart_edges and note_status in ["draft", "initial_gen"]: logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.") enable_smart_edges = False # 3. Step 1: Parsing & Primär-Zerlegung (Deterministisch) blocks, doc_title = parse_blocks(md_text) # Wähle Strategie if primary_strategy == "by_heading": chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) else: chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) if not chunks: return [] # 4. Step 2: Smart Edge Allocation (Optional) if enable_smart_edges: chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type) # 5. Post-Processing (Neighbors) for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None return chunks async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]: """ Führt die LLM-basierte Kantenzuordnung durch. """ analyzer = get_semantic_analyzer() # A. Alle potenziellen Kanten der Notiz sammeln # Wir rufen derive_edges auf dem GESAMTEN Text auf. # WICHTIG: chunks=[] übergeben, damit er nur Note-Level References findet. raw_edges = build_edges_for_note( text=full_text, note_id=note_id, note_type=note_type, chunks=[], references=[] ) # Formatieren als "kind:Target" Liste all_candidates = set() for e in raw_edges: # Nur Kanten mit Ziel und Typ, keine internen Strukturkanten if e.get("target_id") and e.get("kind") not in ["next", "prev", "belongs_to"]: all_candidates.add(f"{e['kind']}:{e['target_id']}") candidate_list = list(all_candidates) if not candidate_list: return chunks # Keine Kanten zu verteilen # B. LLM Filterung pro Chunk (Parallel) tasks = [] for chunk in chunks: tasks.append(analyzer.assign_edges_to_chunk(chunk.text, candidate_list, note_type)) # Alle Ergebnisse sammeln results_per_chunk = await asyncio.gather(*tasks) # C. Injection & Fallback assigned_edges_global = set() for i, confirmed_edges in enumerate(results_per_chunk): chunk = chunks[i] # Speichere bestätigte Kanten chunk.suggested_edges = confirmed_edges assigned_edges_global.update(confirmed_edges) # Injiziere in den Text (für Indexierung) if confirmed_edges: injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e]) chunk.text += injection_str chunk.window += injection_str # D. Fallback: Kanten, die NIRGENDS zugeordnet wurden, landen in allen Chunks (Sicherheit) unassigned = set(candidate_list) - assigned_edges_global if unassigned: fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e]) for chunk in chunks: chunk.text += fallback_str chunk.window += fallback_str if chunk.suggested_edges is None: chunk.suggested_edges = [] chunk.suggested_edges.extend(list(unassigned)) return chunks