from __future__ import annotations from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any import re import math import yaml from pathlib import Path from markdown_it import MarkdownIt from markdown_it.token import Token import asyncio # Notwendig für asynchrone Chunking-Strategien # NEUE IMPORTS # Import der benötigten Klassen direkt (ersetzt get_semantic_analyzer) # ANNAHME: Die Klassen existieren in app/services/semantic_analyzer.py try: from app.services.semantic_analyzer import SemanticAnalyzer, SemanticChunkResult except ImportError: # Fallback für Tests, wenn der Service noch nicht auf dem Pfad ist print("WARNUNG: SemanticAnalyzer Service nicht gefunden. Semantic Chunking wird fehlschlagen.") class SemanticAnalyzer: async def analyze_and_chunk(self, text, type): return [] @dataclass class SemanticChunkResult: content: str suggested_edges: List[str] # Format: "kind:Target" # Import zum Auslesen des Frontmatters # ANNAHME: extract_frontmatter_from_text existiert in app.core.note_payload from app.core.note_payload import extract_frontmatter_from_text # ========================================== # 1. CONFIGURATION LOADER (Ehemals chunk_config.py) # ========================================== # Pfad-Logik: app/core/chunker.py -> app/core -> app -> root/config/types.yaml BASE_DIR = Path(__file__).resolve().parent.parent.parent CONFIG_PATH = BASE_DIR / "config" / "types.yaml" # Fallback Values DEFAULT_PROFILE = { "strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80) } _CONFIG_CACHE = None def _load_yaml_config() -> Dict[str, Any]: """Lädt die config/types.yaml und cached das Ergebnis.""" global _CONFIG_CACHE if _CONFIG_CACHE is not None: return _CONFIG_CACHE if not CONFIG_PATH.exists(): print(f"WARNUNG: types.yaml nicht gefunden unter: {CONFIG_PATH}") return {} try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: data = yaml.safe_load(f) _CONFIG_CACHE = data return data except Exception as e: print(f"FEHLER beim Laden von {CONFIG_PATH}: {e}") return {} def get_chunk_config(note_type: str) -> Dict[str, Any]: """Löst Typ -> Profil -> Konfiguration auf.""" full_config = _load_yaml_config() profiles = full_config.get("chunking_profiles", {}) type_def = full_config.get("types", {}).get(note_type.lower(), {}) profile_name = type_def.get("chunking_profile") if not profile_name: profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") config = profiles.get(profile_name, DEFAULT_PROFILE).copy() if "overlap" in config and isinstance(config["overlap"], list): config["overlap"] = tuple(config["overlap"]) return config # Legacy Support def get_sizes(note_type: str): cfg = get_chunk_config(note_type) return { "target": (cfg["target"], cfg["target"]), "max": cfg["max"], "overlap": cfg["overlap"] } # ========================================== # 2. DATA CLASSES & HELPERS # ========================================== # --- Hilfen --- _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: t = len(text.strip()) return max(1, math.ceil(t / 4)) def split_sentences(text: str) -> list[str]: text = _WS.sub(' ', text.strip()) if not text: return [] parts = _SENT_SPLIT.split(text) return [p.strip() for p in parts if p.strip()] @dataclass class RawBlock: kind: str text: str level: Optional[int] section_path: str section_title: Optional[str] @dataclass class Chunk: id: str note_id: str index: int text: str # Reintext für Anzeige (inkl. injizierter Links bei LLM/Heading) window: str # Text + Context für Embeddings token_count: int section_title: Optional[str] section_path: str neighbors_prev: Optional[str] neighbors_next: Optional[str] char_start: int char_end: int # --- Markdown Parser --- def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: """Parst MD und gibt Blöcke UND den H1 Titel zurück.""" # Im echten Mindnet-System würde hier die komplexe Logik stehen. md = MarkdownIt("commonmark").enable("table") tokens: List[Token] = md.parse(md_text) blocks: List[RawBlock] = [] h1_title = "Dokument" h2, h3 = None, None section_path = "/" # Rudimentäres Block-Parsing für non-LLM Strategien (zur Wahrung der Struktur) text_without_fm = re.sub(r'---.*?---', '', md_text, flags=re.DOTALL) if text_without_fm.strip(): blocks.append(RawBlock(kind="paragraph", text=text_without_fm.strip(), level=None, section_path=section_path, section_title=h2)) # Realistischer wäre die Extraktion des H1 Titels hier h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() return blocks, h1_title # ========================================== # 3. STRATEGIES (SYNCHRON) # ========================================== def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: """Klassisches Sliding Window.""" target = config.get("target", 400) max_tokens = config.get("max", 600) overlap_val = config.get("overlap", (50, 80)) overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val chunks: List[Chunk] = [] buf: List[RawBlock] = [] def flush_buffer(): nonlocal buf if not buf: return text_body = "\n\n".join([b.text for b in buf]) sec_title = buf[-1].section_title if buf else None sec_path = buf[-1].section_path if buf else "/" window_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body if estimate_tokens(text_body) > max_tokens: sentences = split_sentences(text_body) current_sents = [] cur_toks = 0 for s in sentences: st = estimate_tokens(s) if cur_toks + st > target and current_sents: txt = "\n".join(current_sents) win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt _add_chunk(txt, win, sec_title, sec_path) ov_txt = " ".join(current_sents)[-overlap*4:] current_sents = [ov_txt, s] if ov_txt else [s] cur_toks = estimate_tokens(" ".join(current_sents)) else: current_sents.append(s) cur_toks += st if current_sents: txt = "\n".join(current_sents) win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt _add_chunk(txt, win, sec_title, sec_path) else: _add_chunk(text_body, window_body, sec_title, sec_path) buf = [] def _add_chunk(txt, win, sec, path): idx = len(chunks) chunks.append(Chunk( id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0 )) for b in blocks: if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target: flush_buffer() buf.append(b) flush_buffer() return chunks def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str) -> List[Chunk]: """Harter Split an Überschriften mit Context Injection.""" chunks: List[Chunk] = [] sections: Dict[str, List[RawBlock]] = {} ordered = [] # Anmerkung: Die ursprüngliche parse_blocks Logik zur H-Erkennung war detaillierter. # Hier verwenden wir die rudimentäre RawBlock-Struktur. for b in blocks: if b.kind == "heading": continue if b.section_path not in sections: sections[b.section_path] = [] ordered.append(b.section_path) sections[b.section_path].append(b) for path in ordered: s_blocks = sections[path] if not s_blocks: continue breadcrumbs = path.strip("/").replace("/", " > ") context_header = f"# {doc_title}\n## {breadcrumbs}" full_text = "\n\n".join([b.text for b in s_blocks]) if estimate_tokens(full_text) <= config.get("max", 600): chunks.append(Chunk( id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=full_text, window=f"{context_header}\n{full_text}", token_count=estimate_tokens(full_text), section_title=s_blocks[0].section_title if s_blocks else None, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0 )) else: # Fallback auf Sliding Window mit Context Injection sub = _strategy_sliding_window(s_blocks, config, note_id, context_prefix=context_header) base = len(chunks) for i, sc in enumerate(sub): sc.index = base + i sc.id = f"{note_id}#c{sc.index:02d}" chunks.append(sc) return chunks # ========================================== # 4. STRATEGY (ASYNCHRON) # ========================================== # Singleton Instanz für den Analyzer _semantic_analyzer_instance = None def _get_semantic_analyzer_instance() -> SemanticAnalyzer: """Liefert die Singleton-Instanz des SemanticAnalyzer.""" global _semantic_analyzer_instance if _semantic_analyzer_instance is None: _semantic_analyzer_instance = SemanticAnalyzer() return _semantic_analyzer_instance async def _strategy_semantic_llm(md_text: str, config: Dict[str, Any], note_id: str, note_type: str) -> List[Chunk]: """ Strategie: Delegiert die Zerlegung und Kanten-Extraktion an ein LLM (Async). """ analyzer = _get_semantic_analyzer_instance() # Text-Splitting wird hier vom LLM übernommen semantic_chunks: List[SemanticChunkResult] = await analyzer.analyze_and_chunk(md_text, note_type) chunks: List[Chunk] = [] for i, sc in enumerate(semantic_chunks): # 1. Edge Injection für derive_edges.py injection_block = "\n" for edge_str in sc.suggested_edges: # Stellt sicher, dass das Split-Ergebnis 2 Teile hat if ":" in edge_str: kind, target = edge_str.split(":", 1) # Nutzt die Syntax: [[rel:kind | Target]] injection_block += f"[[rel:{kind} | {target}]] " full_text = sc.content + injection_block # 2. Chunk Objekt bauen chunks.append(Chunk( id=f"{note_id}#sem{i:02d}", note_id=note_id, index=i, text=full_text.strip(), window=full_text.strip(), token_count=estimate_tokens(full_text), section_title="Semantic Section", section_path="/LLM", neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0 )) return chunks # ========================================== # 5. MAIN ENTRY POINT (ASYNC) # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: """ Hauptfunktion. Analysiert Config und wählt Strategie (MUSS ASYNC SEIN). Enthält die Logik zur Vermeidung des Double-LLM-Effekts. """ # 1. Frontmatter prüfen (Double-LLM-Prevention) fm, _ = extract_frontmatter_from_text(md_text) note_status = fm.get("status", "").lower() config = get_chunk_config(note_type) strategy = config.get("strategy", "sliding_window") # 2. Strategie-Auswahl # Wenn der Typ LLM-Chunking nutzt (semantic_llm), # ABER der Status ist 'draft' (wahrscheinlich vom LLM generiert): if strategy == "semantic_llm" and note_status in ["draft", "initial_gen"]: # Setze auf die zweitbeste, aber synchrone und deterministische Strategie print(f"INFO: Overriding '{strategy}' for draft status. Using 'by_heading' instead.") strategy = "by_heading" # Fallback auf by_heading, da LLM-Generatoren saubere H2-Strukturen nutzen. # 3. Execution (Dispatcher) if strategy == "semantic_llm": chunks = await _strategy_semantic_llm(md_text, config, note_id, note_type) elif strategy == "by_heading": blocks, doc_title = parse_blocks(md_text) # Synchronen Code in einem Thread ausführen chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) else: # sliding_window (Default) blocks, doc_title = parse_blocks(md_text) # Synchronen Code in einem Thread ausführen chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id) # 4. Post-Process: Neighbors setzen for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None return chunks