From 135c02bc9a02522862cb027a1797a4b6204c9dcb Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 12 Dec 2025 10:25:01 +0100 Subject: [PATCH] bug fixing --- app/core/chunker.py | 433 ++++++++++++++++++++++++------ app/services/semantic_analyzer.py | 151 +++++++---- 2 files changed, 452 insertions(+), 132 deletions(-) diff --git a/app/core/chunker.py b/app/core/chunker.py index 8c56394..7caeea5 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -8,126 +8,393 @@ from pathlib import Path from markdown_it import MarkdownIt from markdown_it.token import Token import asyncio +import logging # NEUE IMPORTS -from app.services.semantic_analyzer import get_semantic_analyzer -from app.core.note_payload import extract_frontmatter_from_text -# WICHTIG: Import der Edge Derivations Logik -from app.core.derive_edges import build_edges_for_note # <-- Muss importiert werden +try: + from app.services.semantic_analyzer import SemanticAnalyzer, SemanticChunkResult +except ImportError: + # Fallback für Tests, wenn der Service noch nicht auf dem Pfad ist + print("WARNUNG: SemanticAnalyzer Service nicht gefunden.") + class SemanticAnalyzer: + async def analyze_and_chunk(self, text, type): return [SemanticChunkResult(content=text, suggested_edges=[])] + @dataclass + class SemanticChunkResult: + content: str + suggested_edges: List[str] # Format: "kind:Target" -# ... bestehender Code (Konfiguration, Hilfsfunktionen, RawBlock, Chunk) +# Import des Edge Parsers +try: + from app.core.derive_edges import build_edges_for_note +except ImportError: + print("WARNUNG: derive_edges.py nicht gefunden. Kanten-Parsing simuliert.") + def build_edges_for_note(md_text, note_id, note_type, chunks=[], note_level_references=[], include_note_scope_refs=False): + return [] -# --- NEUE STRATEGIE: SMART EDGE ALLOCATION (Ersetzt _strategy_semantic_llm) --- -async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: str, note_type: str) -> List[Chunk]: - """ - [WP-15, Neue Logik] Zerlegt Note deterministisch und nutzt LLM zur Zuweisung von Kanten (Schritte 1-5). - """ - # 0. Initialisierung - analyzer = get_semantic_analyzer() +logger = logging.getLogger(__name__) + +# ========================================== +# 1. FUNKTION ZUM AUSLESEN DES FRONTMATTERS +# ========================================== + +def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: + fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL) - # 1. [Schritt 2 des Workflows] Sammeln ALLER Kanten (Inline & Defaults) - # Führt die Edge-Derivation für die gesamte Notiz aus, basierend auf Text und Typ. + if not fm_match: + return {}, md_text + + frontmatter_yaml = fm_match.group(1) + + try: + frontmatter = yaml.safe_load(frontmatter_yaml) + if not isinstance(frontmatter, dict): + frontmatter = {} + except yaml.YAMLError: + frontmatter = {} + + text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL) + + return frontmatter, text_without_fm.strip() + + +# ========================================== +# 2. CONFIGURATION LOADER +# ========================================== + +BASE_DIR = Path(__file__).resolve().parent.parent.parent +CONFIG_PATH = BASE_DIR / "config" / "types.yaml" +DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)} +_CONFIG_CACHE = None + +def _load_yaml_config() -> Dict[str, Any]: + global _CONFIG_CACHE + # FEHLER BEHOBEN: Zeilenumbruch eingefügt + if _CONFIG_CACHE is not None: + return _CONFIG_CACHE + + if not CONFIG_PATH.exists(): + return {} + + try: + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + _CONFIG_CACHE = data + return data + except Exception as e: + return {} + +def get_chunk_config(note_type: str) -> Dict[str, Any]: + full_config = _load_yaml_config() + profiles = full_config.get("chunking_profiles", {}) + type_def = full_config.get("types", {}).get(note_type.lower(), {}) + profile_name = type_def.get("chunking_profile") + + if not profile_name: + profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") + + config = profiles.get(profile_name, DEFAULT_PROFILE).copy() + + if "overlap" in config and isinstance(config["overlap"], list): + config["overlap"] = tuple(config["overlap"]) + + return config + +def get_sizes(note_type: str): + cfg = get_chunk_config(note_type) + return {"target": (cfg["target"], cfg["target"]), "max": cfg["max"], "overlap": cfg["overlap"]} + + +# ========================================== +# 3. DATA CLASSES & HELPERS +# ========================================== + +_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') +_WS = re.compile(r'\s+') + +def estimate_tokens(text: str) -> int: + t = len(text.strip()) + return max(1, math.ceil(t / 4)) + +def split_sentences(text: str) -> list[str]: + text = _WS.sub(' ', text.strip()) + # FEHLER BEHOBEN: Zeilenumbruch eingefügt + if not text: + return [] + + parts = _SENT_SPLIT.split(text) + return [p.strip() for p in parts if p.strip()] + +@dataclass +class RawBlock: + kind: str; text: str; level: Optional[int]; section_path: str; section_title: Optional[str] + +@dataclass +class Chunk: + id: str; note_id: str; index: int; text: str; window: str; token_count: int; section_title: Optional[str]; section_path: str; neighbors_prev: Optional[str]; neighbors_next: Optional[str]; char_start: int; char_end: int + +def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: + md = MarkdownIt("commonmark").enable("table") + tokens: List[Token] = md.parse(md_text) + blocks: List[RawBlock] = [] + h1_title = "Dokument"; h2, h3 = None, None; section_path = "/" + fm, text_without_fm = extract_frontmatter_from_text(md_text) + + if text_without_fm.strip(): + blocks.append(RawBlock(kind="paragraph", text=text_without_fm.strip(), level=None, section_path=section_path, section_title=h2)) + + h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE) + + if h1_match: + h1_title = h1_match.group(1).strip() + + return blocks, h1_title + +# ========================================== +# 4. STRATEGIES (SYNCHRON) +# ========================================== + +def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]: + """Klassisches Sliding Window.""" + target = config.get("target", 400) + max_tokens = config.get("max", 600) + overlap_val = config.get("overlap", (50, 80)) + overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val + chunks: List[Chunk] = []; buf: List[RawBlock] = [] + + def flush_buffer(): + nonlocal buf + if not buf: return + text_body = "\n\n".join([b.text for b in buf]) + sec_title = buf[-1].section_title if buf else None + sec_path = buf[-1].section_path if buf else "/" + window_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body + + if estimate_tokens(text_body) > max_tokens: + sentences = split_sentences(text_body) + current_sents = [] + cur_toks = 0 + for s in sentences: + st = estimate_tokens(s) + if cur_toks + st > target and current_sents: + txt = "\n".join(current_sents) + win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + _add_chunk(txt, win, sec_title, sec_path) + ov_txt = " ".join(current_sents)[-overlap*4:] + current_sents = [ov_txt, s] if ov_txt else [s] + cur_toks = estimate_tokens(" ".join(current_sents)) + else: + current_sents.append(s) + cur_toks += st + if current_sents: + txt = "\n".join(current_sents) + win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + _add_chunk(txt, win, sec_title, sec_path) + else: + _add_chunk(text_body, window_body, sec_title, sec_path) + buf = [] + + def _add_chunk(txt, win, sec, path): + chunks.append(Chunk(id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0)) + + for b in blocks: + if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target: + flush_buffer() + buf.append(b) + flush_buffer() + return chunks + +def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: + """Harter Split an Überschriften mit Context Injection.""" + chunks: List[Chunk] = [] + sections: Dict[str, List[RawBlock]] = {} + ordered = [] + + for b in blocks: + if b.kind == "heading": continue + if b.section_path not in sections: + sections[b.section_path] = [] + ordered.append(b.section_path) + sections[b.section_path].append(b) + + for path in ordered: + s_blocks = sections[path] + # FEHLER BEHOBEN: Zeilenumbruch eingefügt + if not s_blocks: + continue + + breadcrumbs = path.strip("/").replace("/", " > ") + context_header = f"# {doc_title}\n## {breadcrumbs}" + full_text = "\n\n".join([b.text for b in s_blocks]) + + if estimate_tokens(full_text) <= config.get("max", 600): + chunks.append(Chunk(id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=full_text, window=f"{context_header}\n{full_text}", token_count=estimate_tokens(full_text), section_title=s_blocks[0].section_title if s_blocks else None, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0)) + else: + # Fallback auf Sliding Window mit Context Injection + sub = _strategy_sliding_window(s_blocks, config, note_id, doc_title, context_prefix=context_header) + base = len(chunks) + for i, sc in enumerate(sub): + sc.index = base + i + sc.id = f"{note_id}#c{sc.index:02d}" + chunks.append(sc) + return chunks + + +# ========================================== +# 5. ORCHESTRATION STRATEGY (ASYNC) +# ========================================== + +_semantic_analyzer_instance = None +def _get_semantic_analyzer_instance() -> SemanticAnalyzer: + global _semantic_analyzer_instance + # FEHLER BEHOBEN: Zeilenumbruch eingefügt + if _semantic_analyzer_instance is None: + _semantic_analyzer_instance = SemanticAnalyzer() + return _semantic_analyzer_instance + +# NEU: Abstrakte Funktion zum Extrahieren der Kanten (ersetzt die Simulation) +def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> List[str]: + """ + Ruft die Edge-Derivation auf Note-Ebene auf und gibt die Kanten im Format "kind:Target" zurück. + """ + + # Korrigierte Argumentreihenfolge raw_edges: List[Dict] = build_edges_for_note( - text=md_text, - note_id=note_id, - note_type=note_type, - # Leere Listen übergeben, da wir noch keine Chunks haben und nur die Note selbst analysieren. + md_text, + note_id, + note_type, chunks=[], - references=[] + note_level_references=[], + include_note_scope_refs=False ) - # Kanten im Format "kind:Target" sammeln (ohne Duplikate) + # Filtert die Kanten auf das Format "kind:Target" all_note_edges = set() for edge in raw_edges: - # Extrahiere nur Kanten, die relevant für das Chunking sind (Explizite oder Defaults) - if edge.get("target_id") and edge.get("kind"): - # Nutze target_id, da dies der Notiz-ID entspricht + if edge.get("target_id") and edge.get("kind") not in ["belongs_to", "next", "prev"]: all_note_edges.add(f"{edge['kind']}:{edge['target_id']}") + + return list(all_note_edges) + + +async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: str, note_type: str) -> List[Chunk]: + """ + Führt den 5-Schritte-Workflow zur intelligenten Kantenzuweisung aus. + """ + analyzer = _get_semantic_analyzer_instance() - all_note_edges_list = list(all_note_edges) + # 1. [Schritt 2] Kanten sammeln (vom gesamten MD-Text) + all_note_edges_list = _extract_all_edges_from_md(md_text, note_id, note_type) - - # 2. [Schritt 3 des Workflows] Deterministic Chunking - # Nutzt die in der Config angegebene deterministische Strategie (z.B. by_heading) + # 2. [Schritt 3] Deterministic Chunking (Primärzerlegung) + primary_strategy = config.get("strategy", "sliding_window") blocks, doc_title = parse_blocks(md_text) - # Nutze _strategy_by_heading (oder _strategy_sliding_window, je nach Config-Intent), - # da dies die robusteste deterministische Strategie ist. Die Konfiguration kommt - # vom "structured_strict" oder ähnlichem Profil. - chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) - - # Fallback, falls by_heading nur einen Chunk liefert oder fehlschlägt - if not chunks or len(chunks) <= 1: - # Erhöht die Robustheit bei unstrukturierten Texten + if primary_strategy == "by_heading": + chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) + else: chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) - if not chunks: - # Absoluter Fallback: Ganzer Text ist 1 Chunk. - text = " ".join([b.text for b in blocks if b.kind not in ("heading", "code")]).strip() - if text: - chunks = [Chunk(id=f"{note_id}-c0", note_id=note_id, index=0, text=text, token_count=estimate_tokens(text), section_title=doc_title, section_path="", neighbors_prev=None, neighbors_next=None, char_start=0, char_end=len(text))] - else: - return [] - - # 3. [Schritt 4 des Workflows] Kanten pro Chunk zuweisen/filtern - + # 3. [Schritt 4] Kanten pro Chunk zuweisen/filtern (LLM-Call pro Chunk) unassigned_edges: Set[str] = set(all_note_edges_list) llm_tasks = [] - for chunk in chunks: - # Starte den LLM-Filter-Call für jeden Chunk parallel - task = analyzer.filter_edges_for_chunk(chunk.text, all_note_edges_list, note_type) - llm_tasks.append(task) + if all_note_edges_list: + for chunk in chunks: + # Starte den LLM-Filter-Call für jeden Chunk parallel + task = analyzer.analyze_and_chunk( + chunk_text=chunk.text, + all_note_edges=all_note_edges_list, + note_type=note_type, + ) + llm_tasks.append(task) + + filtered_edges_results: List[List[str]] = await asyncio.gather(*llm_tasks) - # Warte auf alle LLM-Antworten (Batch-Processing) - filtered_edges_results: List[List[str]] = await asyncio.gather(*llm_tasks) - - - # 4. Ergebnisse zuweisen und Unassigned Edges sammeln - for i, filtered_edges_list in enumerate(filtered_edges_results): - chunk = chunks[i] - - # Lege die vom LLM gefilterten Edges in den Chunk-Payload - # Die Chunk-Klasse muss ein Feld 'suggested_edges' haben (wie im alten SemanticChunkResult) - chunk.suggested_edges = filtered_edges_list - - # Unassigned Edges: Subtrahiere alle Edges, die in diesem Chunk gefunden wurden - unassigned_edges.difference_update(set(filtered_edges_list)) + for i, filtered_edges_list in enumerate(filtered_edges_results): + chunk = chunks[i] + + # 4. Ergebnisse zuweisen und Unassigned Edges sammeln + chunk.suggested_edges = filtered_edges_list + unassigned_edges.difference_update(set(filtered_edges_list)) - - # 5. [Schritt 5 des Workflows] Fallback: Nicht zugeordnete Kanten zuweisen - # Alle Kanten, die in KEINEM Chunk explizit zugewiesen wurden, werden JEDEM Chunk zugewiesen. + # 5. Kanten in den Text injizieren (für derive_edges.py) + injection_block = "\n" + for edge_str in chunk.suggested_edges: + if ":" in edge_str: + kind, target = edge_str.split(":", 1) + injection_block += f"[[rel:{kind} | {target}]] " + + chunk.text = chunk.text + injection_block + chunk.window = chunk.window + injection_block + + + # 6. Fallback: Nicht zugeordnete Kanten JEDEM Chunk zuweisen (Schritt 5) unassigned_edges_list = list(unassigned_edges) if unassigned_edges_list: logger.info(f"Adding {len(unassigned_edges_list)} unassigned edges as fallback to all chunks for note {note_id}") for chunk in chunks: - # Füge die unassigned Edges hinzu (Set-Operation für Duplikat-Schutz) - existing_edges = set(chunk.suggested_edges) - chunk.suggested_edges = list(existing_edges.union(unassigned_edges_list)) + # Füge die Kanten in den Text des Chunks ein (für den Edge-Parser) + injection_block = "\n" + for edge_str in unassigned_edges_list: + if ":" in edge_str: + kind, target = edge_str.split(":", 1) + injection_block += f"[[rel:{kind} | {target}]] " + + chunk.text = chunk.text + injection_block + chunk.window = chunk.window + injection_block + - # 6. Return Chunks return chunks -# --- UPDATE DISPATCHER: chunk_note_async --- -async def chunk_note_async(md_text: str, note_id: str, note_type: str, note_status: str, path_arg: str = None) -> List[Chunk]: +# ========================================== +# 6. MAIN ENTRY POINT (ASYNC) +# ========================================== + +async def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: + """ + Hauptfunktion. Analysiert Config und wählt Strategie (MUSS ASYNC SEIN). + """ - # ... bestehender Code (Frontmatter, Config, etc.) + # 1. Frontmatter prüfen (Double-LLM-Prevention) + fm, body = extract_frontmatter_from_text(md_text) + note_status = fm.get("status", "").lower() - # 3. Execution (Dispatcher) + config = get_chunk_config(note_type) + strategy = config.get("strategy", "sliding_window") + + # Neue Konfigurationsprüfung + enable_smart_edge = config.get("enable_smart_edge_allocation", False) - # Update: Rufe die NEUE Strategie auf, wenn 'semantic_llm' konfiguriert ist. - if strategy == "semantic_llm": - chunks = await _strategy_smart_edge_allocation(md_text, config, note_id, note_type) + # 2. Strategie-Auswahl + + # A. Override bei Draft-Status + if enable_smart_edge and note_status in ["draft", "initial_gen"]: + logger.info(f"Overriding Smart Edge Allocation for draft status. Using 'by_heading' for deterministic chunking.") + enable_smart_edge = False + strategy = "by_heading" + + # B. Execution (Dispatcher) + + blocks, doc_title = parse_blocks(md_text) + + if enable_smart_edge: + # Führt die neue Orchestrierung aus (Smart Edge Allocation) + chunks = await _strategy_smart_edge_allocation(md_text, config, note_id, note_type) elif strategy == "by_heading": - blocks, doc_title = parse_blocks(md_text) - # ... bestehender Code + # Synchronen Code in einem Thread ausführen + chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title) else: # sliding_window (Default) - blocks, doc_title = parse_blocks(md_text) - # ... bestehender Code - - # ... bestehender Code (Post-Processing) \ No newline at end of file + # Synchronen Code in einem Thread ausführen + chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title) + + # 4. Post-Process: Neighbors setzen + for i, ch in enumerate(chunks): + ch.neighbors_prev = chunks[i-1].id if i > 0 else None + ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None + + return chunks \ No newline at end of file diff --git a/app/services/semantic_analyzer.py b/app/services/semantic_analyzer.py index 8a1feec..1cf7cda 100644 --- a/app/services/semantic_analyzer.py +++ b/app/services/semantic_analyzer.py @@ -1,87 +1,140 @@ """ app/services/semantic_analyzer.py — Edge Validation & Filtering -Der Service ist nun primär dafür zuständig, Kanten aus einer Liste dem gegebenen Chunk zuzuordnen. +Version: Final (Entkoppelt von internen Typ-Simulationen) """ + import json import logging from typing import List, Dict, Any, Optional +from dataclasses import dataclass -# Import der benötigten Services (Annahme: llm_service ist verfügbar.) +# Import der benötigten Services (Annahme: llm_service und discovery sind verfügbar.) from app.services.llm_service import LLMService +# ANNAHME: DiscoveryService ist für die Matrix-Logik verfügbar. +from app.services.discovery import DiscoveryService logger = logging.getLogger(__name__) -# Ein Singleton-Muster für den Analyzer (wie zuvor) -_analyzer_instance: Optional['SemanticAnalyzer'] = None +@dataclass +class SemanticChunkResult: + content: str + suggested_edges: List[str] # Format: "kind:Target" -def get_semantic_analyzer(): - global _analyzer_instance - if _analyzer_instance is None: - _analyzer_instance = SemanticAnalyzer() - return _analyzer_instance +# Die Klasse muss den TargetTypeResolver als DI-Abhängigkeit erhalten, um flexibel zu sein. +# Da dies aber im Mindnet-System noch nicht etabliert ist, muss der Aufrufer den Resolver bereitstellen. class SemanticAnalyzer: def __init__(self): - # Der DiscoveryService wird hier nicht mehr direkt benötigt. self.llm = LLMService() + self.discovery = DiscoveryService() + self.MAX_CONTEXT_TOKENS = 3000 - async def filter_edges_for_chunk(self, chunk_text: str, all_note_edges: List[str], note_type: str) -> List[str]: + async def analyze_and_chunk( + self, + text: str, + source_type: str, + # NEU: Erfordert die Auflösungsfunktion als Eingabe (DI-Prinzip) + target_type_resolver: Optional[callable] = None + ) -> List[SemanticChunkResult]: """ - [Schritt 4 des Workflows] Sendet Chunk und alle Kanten an LLM, um die relevanten Kanten für diesen Chunk zu filtern. - :param chunk_text: Der Text des Chunks zur Analyse. - :param all_note_edges: Alle für die gesamte Notiz gefundenen Kanten (Format: "kind:Target"). - :param note_type: Der Typ der Notiz. - :return: Liste der relevanten Kanten für diesen Chunk. + Zerlegt Text mittels LLM in semantische Abschnitte und extrahiert Kanten. """ - if not all_note_edges: - return [] - - edge_list_str = "\n".join([f"- {e}" for e in all_note_edges]) - system_prompt = ( - "Du bist ein Edge Filter Agent. Deine Aufgabe ist es, aus einer gegebenen Liste von potentiellen " - "Knowledge Graph Kanten (Edges) jene auszuwählen, die *semantisch relevant* für den vorliegenden " - "Textausschnitt sind. Alle Kanten beziehen sich auf die Hauptnotiz.\n" - "Antworte AUSSCHLIESSLICH mit einer validen JSON-Liste von Kanten-Strings, die im Text direkt erwähnt oder " - "klar impliziert werden. Es ist KEIN Array von Objekten, sondern ein Array von Strings.\n" - "Format: [\"kind:Target\", \"kind:Target\", ...]\n" - "Wähle nur Kanten, die der Chunk *aktiv* benötigt oder referenziert." - ) + # Standard-Resolver verwenden, wenn keiner übergeben wird + if target_type_resolver is None: + target_type_resolver = self._default_target_type_resolver - user_prompt = ( - f"Notiz-Typ: {note_type}\n" - f"Textausschnitt:\n---\n{chunk_text}\n---\n\n" - f"Gesamte Kanten der Notiz (AUSWAHL):\n{edge_list_str}\n\n" - "Welche der oben genannten Kanten sind für diesen Textabschnitt relevant? Liste sie im JSON-Array auf." + system_prompt = ( + "Du bist ein Knowledge Graph Experte. Deine Aufgabe ist es, Rohtext in " + "thematisch geschlossene Abschnitte (Chunks) zu zerlegen.\n" + "Analysiere jeden Abschnitt auf Beziehungen zu anderen Konzepten (Entitäten, Personen, etc.).\n" + "Antworte AUSSCHLIESSLICH mit validem JSON in diesem Format:\n" + "[\n" + " {\n" + " \"content\": \"Der Text des Abschnitts...\",\n" + " \"relations\": [{\"target\": \"Entität X\", \"type\": \"related_to\"}]\n" + " }\n" + "]\n" + "Halte die Chunks mittellang (ca. 100-300 Wörter). Verändere den Inhalt nicht, nur die Struktur." ) + + user_prompt = f"Dokument-Typ: {source_type}\n\nTEXT:\n{text}" try: - # 1. LLM Call response_json = await self.llm.generate_raw_response( user_prompt, system=system_prompt, force_json=True ) - # 2. Robustes JSON Parsing clean_json = response_json.replace("```json", "").replace("```", "").strip() data = json.loads(clean_json) - if isinstance(data, list): - # Filtere nach Strings, die den Doppelpunkt enthalten, um das Format "kind:Target" zu garantieren. - return [s for s in data if isinstance(s, str) and ":" in s] - - logger.warning(f"SemanticAnalyzer: LLM lieferte non-list beim Edge-Filtern: {data}") - return [] + if isinstance(data, dict): + data = [data] + elif not isinstance(data, list): + logger.error("SemanticAnalyzer: JSON root ist weder Array noch Objekt. Fehlerhafte LLM-Antwort.") + raise ValueError("Root element is not a list or dictionary.") - except json.JSONDecodeError as e: - logger.error(f"SemanticAnalyzer: LLM lieferte KEIN valides JSON beim Edge-Filtern: {e}") - return [] + results = [] + for item in data: + if not isinstance(item, dict): + logger.warning(f"SemanticAnalyzer: Ungültiges Chunk-Element ignoriert: {item}") + continue + + content = item.get("content", "").strip() + if not content: continue + + raw_rels = item.get("relations", []) + refined_edges = [] + + for rel in raw_rels: + if not isinstance(rel, dict): + logger.warning(f"SemanticAnalyzer: Ignoriere ungültige Relation: {rel}") + continue + + target = rel.get("target") + raw_type = rel.get("type", "related_to") + + if target: + # 1. Typ-Auflösung über die injizierte Funktion + target_entity_type = target_type_resolver(target) # <--- NUTZT DEN INJIZIERTEN RESOLVER + + # 2. Matrix-Logik anwenden: + final_kind = self.discovery._resolve_edge_type(source_type, target_entity_type) + + # 3. Priorisierung: Wählt den Matrix-Vorschlag, wenn er spezifischer ist. + if final_kind not in ["related_to", "references"] and target_entity_type != "concept": + edge_str = f"{final_kind}:{target}" + else: + edge_str = f"{raw_type}:{target}" + + refined_edges.append(edge_str) + + results.append(SemanticChunkResult(content=content, suggested_edges=refined_edges)) + + return results + + except json.JSONDecodeError: + logger.error("SemanticAnalyzer: LLM lieferte KEIN valides JSON. Fallback auf Raw Text.") + return [SemanticChunkResult(content=text, suggested_edges=[])] except Exception as e: - logger.error(f"SemanticAnalyzer Unbehandelter Fehler beim Edge-Filtern: {e}") - return [] + logger.error(f"SemanticAnalyzer Unbehandelter Fehler: {e}") + return [SemanticChunkResult(content=text, suggested_edges=[])] + + # NEU: Abstrakter Fallback-Resolver (muss außerhalb des Kernmoduls verbleiben) + def _default_target_type_resolver(self, title: str) -> str: + """Standard-Fallback, wenn kein Resolver übergeben wird (immer 'concept').""" + return "concept" async def close(self): - # Stellt sicher, dass der AsyncClient geschlossen wird (gute Praxis) if self.llm: - await self.llm.close() \ No newline at end of file + await self.llm.close() + +# Export des Singleton-Helpers +_analyzer_instance = None +def get_semantic_analyzer(): + global _analyzer_instance + if _analyzer_instance is None: + _analyzer_instance = SemanticAnalyzer() + return _analyzer_instance \ No newline at end of file