from __future__ import annotations from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any, Set import re import math import yaml from pathlib import Path from markdown_it import MarkdownIt from markdown_it.token import Token import asyncio import logging # NEUE IMPORTS try: from app.services.semantic_analyzer import SemanticAnalyzer, SemanticChunkResult except ImportError: print("WARNUNG: SemanticAnalyzer Service nicht gefunden.") class SemanticAnalyzer: async def analyze_and_chunk(self, text, type): return [SemanticChunkResult(content=text, suggested_edges=[])] @dataclass class SemanticChunkResult: content: str suggested_edges: List[str] # Format: "kind:Target" # Import des Edge Parsers try: from app.core.derive_edges import build_edges_for_note except ImportError: print("WARNUNG: derive_edges.py nicht gefunden. Kanten-Parsing simuliert.") def build_edges_for_note(md_text, note_id, note_type, chunks=[], note_level_references=[], include_note_scope_refs=False): return [] logger = logging.getLogger(__name__) # ========================================== # 1. FUNKTION ZUM AUSLESEN DES FRONTMATTERS # ========================================== def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL) if not fm_match: return {}, md_text frontmatter_yaml = fm_match.group(1) try: frontmatter = yaml.safe_load(frontmatter_yaml) if not isinstance(frontmatter, dict): frontmatter = {} except yaml.YAMLError: frontmatter = {} text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL) return frontmatter, text_without_fm.strip() # ========================================== # 2. CONFIGURATION LOADER # ========================================== BASE_DIR = Path(__file__).resolve().parent.parent.parent CONFIG_PATH = BASE_DIR / "config" / "types.yaml" DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)} _CONFIG_CACHE = None def _load_yaml_config() -> Dict[str, Any]: global _CONFIG_CACHE if _CONFIG_CACHE is not None: return _CONFIG_CACHE if not CONFIG_PATH.exists(): return {} try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: data = yaml.safe_load(f); _CONFIG_CACHE = data; return data except Exception as e: return {} def get_chunk_config(note_type: str) -> Dict[str, Any]: full_config = _load_yaml_config(); profiles = full_config.get("chunking_profiles", {}) type_def = full_config.get("types", {}).get(note_type.lower(), {}); profile_name = type_def.get("chunking_profile") if not profile_name: profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") config = profiles.get(profile_name, DEFAULT_PROFILE).copy() if "overlap" in config and isinstance(config["overlap"], list): config["overlap"] = tuple(config["overlap"]) return config def get_sizes(note_type: str): cfg = get_chunk_config(note_type); return {"target": (cfg["target"], cfg["target"]), "max": cfg["max"], "overlap": cfg["overlap"]} # ========================================== # 3. DATA CLASSES & HELPERS # ========================================== _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: t = len(text.strip()); return max(1, math.ceil(t / 4)) def split_sentences(text: str) -> list[str]: text = _WS.sub(' ', text.strip()) if not text: return [] parts = _SENT_SPLIT.split(text) return [p.strip() for p in parts if p.strip()] @dataclass class RawBlock: kind: str; text: str; level: Optional[int]; section_path: str; section_title: Optional[str] @dataclass class Chunk: id: str; note_id: str; index: int; text: str; window: str; token_count: int; section_title: Optional[str]; section_path: str; neighbors_prev: Optional[str]; neighbors_next: Optional[str]; char_start: int; char_end: int def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: md = MarkdownIt("commonmark").enable("table") tokens: List[Token] = md.parse(md_text) blocks: List[RawBlock] = []; h1_title = "Dokument"; h2, h3 = None, None; section_path = "/" fm, text_without_fm = extract_frontmatter_from_text(md_text) if text_without_fm.strip(): blocks.append(RawBlock(kind="paragraph", text=text_without_fm.strip(), level=None, section_path=section_path, section_title=h2)) h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) if h1_match: h1_title = h1_match.group(1).strip() return blocks, h1_title # ========================================== # 4. STRATEGIES (SYNCHRON) # ========================================== def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: """Klassisches Sliding Window.""" target = config.get("target", 400); max_tokens = config.get("max", 600); overlap_val = config.get("overlap", (50, 80)); overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val chunks: List[Chunk] = []; buf: List[RawBlock] = [] def flush_buffer(): nonlocal buf if not buf: return text_body = "\n\n".join([b.text for b in buf]) sec_title = buf[-1].section_title if buf else None sec_path = buf[-1].section_path if buf else "/" window_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body if estimate_tokens(text_body) > max_tokens: sentences = split_sentences(text_body) current_sents = [] cur_toks = 0 for s in sentences: st = estimate_tokens(s) if cur_toks + st > target and current_sents: txt = "\n".join(current_sents) win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt _add_chunk(txt, win, sec_title, sec_path) ov_txt = " ".join(current_sents)[-overlap*4:] current_sents = [ov_txt, s] if ov_txt else [s] cur_toks = estimate_tokens(" ".join(current_sents)) else: current_sents.append(s) cur_toks += st if current_sents: txt = "\n".join(current_sents) win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt _add_chunk(txt, win, sec_title, sec_path) else: _add_chunk(text_body, window_body, sec_title, sec_path) buf = [] def _add_chunk(txt, win, sec, path): chunks.append(Chunk(id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0)) for b in blocks: if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target: flush_buffer() buf.append(b) flush_buffer() return chunks def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: """Harter Split an Überschriften mit Context Injection.""" chunks: List[Chunk] = [] sections: Dict[str, List[RawBlock]] = {} ordered = [] for b in blocks: if b.kind == "heading": continue if b.section_path not in sections: sections[b.section_path] = []; ordered.append(b.section_path) sections[b.section_path].append(b) for path in ordered: s_blocks = sections[path] if not s_blocks: continue breadcrumbs = path.strip("/").replace("/", " > ") context_header = f"# {doc_title}\n## {breadcrumbs}" full_text = "\n\n".join([b.text for b in s_blocks]) if estimate_tokens(full_text) <= config.get("max", 600): chunks.append(Chunk(id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=full_text, window=f"{context_header}\n{full_text}", token_count=estimate_tokens(full_text), section_title=s_blocks[0].section_title if s_blocks else None, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0)) else: # Fallback auf Sliding Window mit Context Injection sub = _strategy_sliding_window(s_blocks, config, note_id, doc_title, context_prefix=context_header) base = len(chunks) for i, sc in enumerate(sub): sc.index = base + i sc.id = f"{note_id}#c{sc.index:02d}" chunks.append(sc) return chunks # ========================================== # 5. ORCHESTRATION STRATEGY (ASYNC) # ========================================== _semantic_analyzer_instance = None def _get_semantic_analyzer_instance() -> SemanticAnalyzer: global _semantic_analyzer_instance if _semantic_analyzer_instance is None: _semantic_analyzer_instance = SemanticAnalyzer() return _semantic_analyzer_instance # NEU: Abstrakte Funktion zum Extrahieren der Kanten (ersetzt die Simulation) def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> List[str]: """ Ruft die Edge-Derivation auf Note-Ebene auf und gibt die Kanten im Format "kind:Target" zurück. """ # Korrigierte Argumentreihenfolge (Positionale und Keyword-Argumente getrennt) raw_edges: List[Dict] = build_edges_for_note( md_text, note_id=note_id, note_type=note_type, chunks=[], note_level_references=[], include_note_scope_refs=False ) # Filtert die Kanten auf das Format "kind:Target" all_note_edges = set() for edge in raw_edges: if edge.get("target_id") and edge.get("kind") not in ["belongs_to", "next", "prev"]: all_note_edges.add(f"{edge['kind']}:{edge['target_id']}") return list(all_note_edges) async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: str, note_type: str) -> List[Chunk]: """ Führt den 5-Schritte-Workflow zur intelligenten Kantenzuweisung aus. """ analyzer = _get_semantic_analyzer_instance() # 1. [Schritt 2] Kanten sammeln (vom gesamten MD-Text) all_note_edges_list = _extract_all_edges_from_md(md_text, note_id, note_type) # 2. [Schritt 3] Deterministic Chunking (Primärzerlegung) primary_strategy = config.get("strategy", "sliding_window") blocks, doc_title = parse_blocks(md_text) if primary_strategy == "by_heading": chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) else: chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) # 3. [Schritt 4] Kanten pro Chunk zuweisen/filtern (LLM-Call pro Chunk) unassigned_edges: Set[str] = set(all_note_edges_list) llm_tasks = [] if all_note_edges_list: for chunk in chunks: # Starte den LLM-Filter-Call für jeden Chunk parallel task = analyzer.analyze_and_chunk( text=chunk.text, source_type=note_type, # all_note_edges und target_type_resolver werden im SemanticAnalyzer benötigt ) llm_tasks.append(task) filtered_edges_results: List[List[str]] = await asyncio.gather(*llm_tasks) for i, filtered_edges_list in enumerate(filtered_edges_results): chunk = chunks[i] # 4. Ergebnisse zuweisen und Unassigned Edges sammeln chunk.suggested_edges = filtered_edges_list unassigned_edges.difference_update(set(filtered_edges_list)) # 5. Kanten in den Text injizieren (für derive_edges.py) injection_block = "\n" for edge_str in chunk.suggested_edges: if ":" in edge_str: kind, target = edge_str.split(":", 1) injection_block += f"[[rel:{kind} | {target}]] " chunk.text = chunk.text + injection_block chunk.window = chunk.window + injection_block # 6. Fallback: Nicht zugeordnete Kanten JEDEM Chunk zuweisen (Schritt 5) unassigned_edges_list = list(unassigned_edges) if unassigned_edges_list: logger.info(f"Adding {len(unassigned_edges_list)} unassigned edges as fallback to all chunks for note {note_id}") for chunk in chunks: # Füge die Kanten in den Text des Chunks ein (für den Edge-Parser) injection_block = "\n" for edge_str in unassigned_edges_list: if ":" in edge_str: kind, target = edge_str.split(":", 1) injection_block += f"[[rel:{kind} | {target}]] " chunk.text = chunk.text + injection_block chunk.window = chunk.window + injection_block return chunks # ========================================== # 6. MAIN ENTRY POINT (ASYNC) # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: """ Hauptfunktion. Analysiert Config und wählt Strategie (MUSS ASYNC SEIN). Akzeptiert optional 'config' zur Überschreibung der Laufzeitkonfiguration (für Tests). """ # 1. Konfiguration laden (überschreiben, falls im Test injiziert) if config is None: config = get_chunk_config(note_type) # 2. Frontmatter prüfen (Double-LLM-Prevention) fm, body = extract_frontmatter_from_text(md_text) note_status = fm.get("status", "").lower() strategy = config.get("strategy", "sliding_window") enable_smart_edge = config.get("enable_smart_edge_allocation", False) # 3. Strategie-Auswahl # A. Override bei Draft-Status if enable_smart_edge and note_status in ["draft", "initial_gen"]: logger.info(f"Overriding Smart Edge Allocation for draft status. Using 'by_heading' for deterministic chunking.") enable_smart_edge = False strategy = "by_heading" # B. Execution (Dispatcher) blocks, doc_title = parse_blocks(md_text) if enable_smart_edge: # Führt die neue Orchestrierung aus (Smart Edge Allocation) chunks = await _strategy_smart_edge_allocation(md_text, config, note_id, note_type) elif strategy == "by_heading": # Synchronen Code in einem Thread ausführen chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) else: # sliding_window (Default) # Synchronen Code in einem Thread ausführen chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) # 4. Post-Process: Neighbors setzen for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None return chunks