diff --git a/app/core/chunker.py b/app/core/chunker.py index ee20b0a..c425e3b 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -8,15 +8,17 @@ from pathlib import Path from markdown_it import MarkdownIt from markdown_it.token import Token +# NEUE IMPORTS +# Import des Semantic Analyzer Services +from app.services.semantic_analyzer import get_semantic_analyzer +import asyncio # Für den asynchronen Aufruf des Chunkers + # ========================================== -# 1. CONFIGURATION LOADER (Updated for config/ dir) +# 1. CONFIGURATION LOADER (Ehemals chunk_config.py) # ========================================== -# Pfad-Logik: -# Wir gehen 3 Ebenen hoch: app/core/chunker.py -> app/core -> app -> root +# Pfad-Logik: app/core/chunker.py -> app/core -> app -> root/config/types.yaml BASE_DIR = Path(__file__).resolve().parent.parent.parent - -# KORREKTUR: types.yaml liegt im Unterordner "config" CONFIG_PATH = BASE_DIR / "config" / "types.yaml" # Fallback Values @@ -38,7 +40,6 @@ def _load_yaml_config() -> Dict[str, Any]: if not CONFIG_PATH.exists(): # Debugging-Hilfe: Zeigt an, wo gesucht wurde print(f"WARNUNG: types.yaml nicht gefunden unter: {CONFIG_PATH}") - print(f" (Basis-Verzeichnis war: {BASE_DIR})") return {} try: @@ -51,32 +52,24 @@ def _load_yaml_config() -> Dict[str, Any]: return {} def get_chunk_config(note_type: str) -> Dict[str, Any]: - """ - Löst Typ -> Profil -> Konfiguration auf. - """ + """Löst Typ -> Profil -> Konfiguration auf.""" full_config = _load_yaml_config() - # 1. Profile holen profiles = full_config.get("chunking_profiles", {}) - - # 2. Typ-Definition holen type_def = full_config.get("types", {}).get(note_type.lower(), {}) - - # 3. Profil-Namen ermitteln (Fallback auf defaults) profile_name = type_def.get("chunking_profile") + if not profile_name: profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") - # 4. Config bauen config = profiles.get(profile_name, DEFAULT_PROFILE).copy() - # Sicherstellen, dass Overlap ein Tuple ist if "overlap" in config and isinstance(config["overlap"], list): config["overlap"] = tuple(config["overlap"]) return config -# Legacy Support für alten Code +# Legacy Support def get_sizes(note_type: str): cfg = get_chunk_config(note_type) return { @@ -86,7 +79,7 @@ def get_sizes(note_type: str): } # ========================================== -# 2. CHUNKING LOGIC & PARSER +# 2. DATA CLASSES & HELPERS # ========================================== # --- Hilfen --- @@ -94,7 +87,6 @@ _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: - # 1 Token ≈ 4 chars t = len(text.strip()) return max(1, math.ceil(t / 4)) @@ -117,8 +109,8 @@ class Chunk: id: str note_id: str index: int - text: str # Reintext für Anzeige - window: str # Text + Context für Embeddings + text: str # Reintext für Anzeige (JETZT INKL. INJIZIERTER LINKS) + window: str # Text + Context für Embeddings (WIE 'text' BEI LLM-CHUNK) token_count: int section_title: Optional[str] section_path: str @@ -193,7 +185,9 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: i += 1 return blocks, h1_title -# --- Strategien --- +# ========================================== +# 3. STRATEGIES (SYNCHRON) +# ========================================== def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: target = config.get("target", 400) @@ -266,6 +260,8 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id for path in ordered: s_blocks = sections[path] + if not s_blocks: continue + breadcrumbs = path.strip("/").replace("/", " > ") context_header = f"# {doc_title}\n## {breadcrumbs}" full_text = "\n\n".join([b.text for b in s_blocks]) @@ -279,6 +275,7 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0 )) else: + # Fallback auf Sliding Window mit Context Injection sub = _strategy_sliding_window(s_blocks, config, note_id, context_prefix=context_header) base = len(chunks) for i, sc in enumerate(sub): @@ -287,19 +284,79 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id chunks.append(sc) return chunks -# --- Main Entry Point --- +# ========================================== +# 4. STRATEGY (ASYNCHRON) +# ========================================== -def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: +async def _strategy_semantic_llm(md_text: str, config: Dict[str, Any], note_id: str, note_type: str) -> List[Chunk]: + """ + NEUE STRATEGIE: Delegiert die Zerlegung und Kanten-Extraktion an ein LLM. + """ + analyzer = get_semantic_analyzer() + + # Text-Splitting wird hier vom LLM übernommen + semantic_chunks = await analyzer.analyze_and_chunk(md_text, note_type) + + chunks: List[Chunk] = [] + + for i, sc in enumerate(semantic_chunks): + # 1. Edge Injection für derive_edges.py + # Wir formatieren die LLM-generierten Kanten in die Inline-Syntax, + # damit die bestehende derive_edges.py (Regex) sie findet. + + injection_block = "\n" + for edge_str in sc.suggested_edges: + kind, target = edge_str.split(":", 1) + # Nutzt die Syntax: [[rel:kind | Target]] + injection_block += f"[[rel:{kind} | {target}]] " + + full_text = sc.content + injection_block + + # 2. Chunk Objekt bauen + chunks.append(Chunk( + id=f"{note_id}#sem{i:02d}", + note_id=note_id, + index=i, + text=full_text.strip(), # Enthält die Links (für derive_edges) + window=full_text.strip(), # Auch das Embedding "sieht" die Links (gut für Retrieval) + token_count=estimate_tokens(full_text), + section_title="Semantic Section", + section_path="/LLM", + neighbors_prev=None, neighbors_next=None, + char_start=0, char_end=0 + )) + + return chunks + +# ========================================== +# 5. MAIN ENTRY POINT (ASYNC) +# ========================================== + +async def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: + """ + Hauptfunktion. Analysiert Config und wählt Strategie. MUSS ASYNC SEIN. + """ config = get_chunk_config(note_type) strategy = config.get("strategy", "sliding_window") - blocks, doc_title = parse_blocks(md_text) - if strategy == "by_heading": - chunks = _strategy_by_heading(blocks, config, note_id, doc_title) - else: - chunks = _strategy_sliding_window(blocks, config, note_id) + # Die beiden bestehenden Strategien rufen wir über einen Sync-Wrapper auf, + # damit assemble_chunks ASYNC bleiben kann. + if strategy == "semantic_llm": + chunks = await _strategy_semantic_llm(md_text, config, note_id, note_type) + + elif strategy == "by_heading": + blocks, doc_title = parse_blocks(md_text) + # Blockiert nur kurz für die sync-Rechenarbeit + chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) + else: # sliding_window (Default) + blocks, doc_title = parse_blocks(md_text) + # Blockiert nur kurz für die sync-Rechenarbeit + chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id) + + # Post-Process: Neighbors setzen for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None + return chunks \ No newline at end of file diff --git a/app/services/semantic_analyzer.py b/app/services/semantic_analyzer.py new file mode 100644 index 0000000..fece3f1 --- /dev/null +++ b/app/services/semantic_analyzer.py @@ -0,0 +1,98 @@ +""" +app/services/semantic_analyzer.py +Kapselt die LLM-Strategie für Chunking und Kanten-Extraktion. +Nutzt die Matrix-Logik aus DiscoveryService für konsistente Kanten-Typen. +""" + +import json +import logging +import re +from typing import List, Dict, Any, Optional +from dataclasses import dataclass + +from app.services.llm_service import LLMService +from app.services.discovery import DiscoveryService + +logger = logging.getLogger(__name__) + +@dataclass +class SemanticChunkResult: + content: str + suggested_edges: List[str] # Format: "kind:Target" + +class SemanticAnalyzer: + def __init__(self): + self.llm = LLMService() + self.discovery = DiscoveryService() # Wiederverwendung der Matrix-Logik + + async def analyze_and_chunk(self, text: str, source_type: str) -> List[SemanticChunkResult]: + """ + Zerlegt Text mittels LLM in semantische Abschnitte und extrahiert Kanten. + """ + # 1. Prompt bauen + system_prompt = ( + "Du bist ein Knowledge Graph Experte. Deine Aufgabe ist es, Rohtext in " + "thematisch geschlossene Abschnitte (Chunks) zu zerlegen.\n" + "Analysiere jeden Abschnitt auf Beziehungen zu anderen Konzepten.\n" + "Antworte AUSSCHLIESSLICH mit validem JSON in diesem Format:\n" + "[\n" + " {\n" + " \"content\": \"Der Text des Abschnitts...\",\n" + " \"relations\": [{\"target\": \"Qdrant\", \"type\": \"depends_on\"}]\n" + " }\n" + "]\n" + "Halte die Chunks mittellang (ca. 100-300 Wörter). Verändere den Inhalt nicht, nur die Struktur." + ) + + user_prompt = f"Dokument-Typ: {source_type}\n\nTEXT:\n{text}" + + try: + # 2. LLM Call + response_json = await self.llm.generate_raw_response(user_prompt, system=system_prompt) + + # 3. JSON Parsing & Validierung + # Markdown Code-Block entfernen falls vorhanden + clean_json = response_json.replace("```json", "").replace("```", "").strip() + data = json.loads(clean_json) + + results = [] + for item in data: + content = item.get("content", "").strip() + if not content: continue + + raw_rels = item.get("relations", []) + refined_edges = [] + + for rel in raw_rels: + target = rel.get("target") + raw_type = rel.get("type", "related_to") + + if target: + # 4. Matrix-Logik anwenden (Active Intelligence) + # Wir versuchen, den Typ des Ziels zu erraten oder nutzen Matrix blind + # Hier vereinfacht: Wir nutzen Discovery Logic um den Edge-Typ zu validieren + # (Wir nehmen an, Target Type ist unbekannt -> 'concept') + final_kind = self.discovery._resolve_edge_type(source_type, "concept") + + # Wenn LLM spezifischer war (z.B. 'blocks'), nehmen wir das LLM, + # sonst den Matrix-Vorschlag + if raw_type in ["related_to", "link"] and final_kind != "related_to": + edge_str = f"{final_kind}:{target}" + else: + edge_str = f"{raw_type}:{target}" + + refined_edges.append(edge_str) + + results.append(SemanticChunkResult(content=content, suggested_edges=refined_edges)) + + return results + + except json.JSONDecodeError: + logger.warning("SemanticAnalyzer: LLM lieferte kein valides JSON. Fallback auf Raw Text.") + return [SemanticChunkResult(content=text, suggested_edges=[])] + except Exception as e: + logger.error(f"SemanticAnalyzer Error: {e}") + return [SemanticChunkResult(content=text, suggested_edges=[])] + + async def close(self): + await self.llm.close() \ No newline at end of file diff --git a/config/types.yaml b/config/types.yaml index e715ff5..4ff8ff8 100644 --- a/config/types.yaml +++ b/config/types.yaml @@ -29,7 +29,14 @@ chunking_profiles: max: 600 # Fallback Limit target: 400 # Fallback Target bei Sub-Chunking overlap: [50, 80] # Overlap bei Sub-Chunking - + + # NEU: LLM-basierte semantische Zerlegung (Chunker.py ruft semantic_analyzer.py) + semantic_llm: + strategy: semantic_llm + # Da das LLM die Längensteuerung übernimmt, dienen diese als Fallback/Empfehlung + target: 400 + max: 800 + defaults: retriever_weight: 1.0 chunking_profile: sliding_standard # Fallback Profil @@ -54,7 +61,7 @@ types: # --- IDENTITÄT & PERSÖNLICHKEIT --- profile: - chunking_profile: structured_strict # H2 Split wichtig für Profile + chunking_profile: structured_strict retriever_weight: 0.70 edge_defaults: ["references", "related_to"] @@ -85,7 +92,7 @@ types: edge_defaults: ["depends_on", "related_to"] decision: - chunking_profile: structured_strict # ADRs sind oft strukturiert + chunking_profile: structured_strict retriever_weight: 1.00 edge_defaults: ["caused_by", "references"] @@ -101,7 +108,7 @@ types: # --- OPERATIV --- project: - chunking_profile: sliding_large # Projekte haben viel Text + chunking_profile: sliding_large retriever_weight: 0.97 edge_defaults: ["references", "depends_on"] @@ -111,6 +118,7 @@ types: edge_defaults: ["depends_on", "part_of"] journal: - chunking_profile: sliding_standard + # NEUE ZUWEISUNG: Journale profitieren am meisten von der semantischen Analyse + chunking_profile: semantic_llm retriever_weight: 0.80 edge_defaults: ["references", "related_to"] \ No newline at end of file