diff --git a/app/services/llm_service.py b/app/services/llm_service.py index fa7576b..0565b87 100644 --- a/app/services/llm_service.py +++ b/app/services/llm_service.py @@ -1,6 +1,6 @@ """ -app/services/llm_service.py — LLM Client (Ollama) -Version: 0.5.2 (Fix: Removed strict limits, increased Context) +app/services/llm_service.py — LLM Client +Version: 2.7.0 (Clean Architecture: Explicit Priority Queues) """ import httpx @@ -9,13 +9,12 @@ import logging import os import asyncio from pathlib import Path -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Literal logger = logging.getLogger(__name__) class Settings: OLLAMA_URL = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434") - # Timeout für die Generierung (lang) LLM_TIMEOUT = float(os.getenv("MINDNET_LLM_TIMEOUT", 300.0)) LLM_MODEL = os.getenv("MINDNET_LLM_MODEL", "phi3:mini") PROMPTS_PATH = os.getenv("MINDNET_PROMPTS_PATH", "./config/prompts.yaml") @@ -24,16 +23,13 @@ def get_settings(): return Settings() class LLMService: + # GLOBALER SEMAPHOR (Drosselung für Hintergrund-Prozesse) + _background_semaphore = asyncio.Semaphore(2) + def __init__(self): self.settings = get_settings() self.prompts = self._load_prompts() - # FIX 1: Keine künstlichen Limits mehr. httpx defaults (100) sind besser. - # Wir wollen nicht, dass der Chat wartet, nur weil im Hintergrund Embeddings laufen. - - # Timeout-Konfiguration: - # connect=10.0: Wenn Ollama nicht da ist, failen wir schnell. - # read=LLM_TIMEOUT: Wenn Ollama denkt, geben wir ihm Zeit. self.timeout = httpx.Timeout(self.settings.LLM_TIMEOUT, connect=10.0) self.client = httpx.AsyncClient( @@ -43,11 +39,9 @@ class LLMService: def _load_prompts(self) -> dict: path = Path(self.settings.PROMPTS_PATH) - if not path.exists(): - return {} + if not path.exists(): return {} try: - with open(path, "r", encoding="utf-8") as f: - return yaml.safe_load(f) + with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load prompts: {e}") return {} @@ -58,19 +52,31 @@ class LLMService: system: str = None, force_json: bool = False, max_retries: int = 0, - base_delay: float = 2.0 + base_delay: float = 2.0, + priority: Literal["realtime", "background"] = "realtime" # <--- NEU & EXPLIZIT ) -> str: """ - Führt einen LLM Call aus. + Führt einen LLM Call aus. + priority="realtime": Chat (Sofort, keine Bremse). + priority="background": Import/Analyse (Gedrosselt durch Semaphore). """ + + # Entscheidung basierend auf explizitem Parameter, nicht Format! + use_semaphore = (priority == "background") + + if use_semaphore: + async with LLMService._background_semaphore: + return await self._execute_request(prompt, system, force_json, max_retries, base_delay) + else: + return await self._execute_request(prompt, system, force_json, max_retries, base_delay) + + async def _execute_request(self, prompt, system, force_json, max_retries, base_delay): payload: Dict[str, Any] = { "model": self.settings.LLM_MODEL, "prompt": prompt, "stream": False, "options": { "temperature": 0.1 if force_json else 0.7, - # FIX 2: Kontext auf 8192 erhöht. - # Wichtig für komplexe Schemas und JSON-Stabilität. "num_ctx": 8192 } } @@ -97,7 +103,6 @@ class LLMService: attempt += 1 if attempt > max_retries: logger.error(f"LLM Final Error (Versuch {attempt}): {e}") - # Wir werfen den Fehler weiter, damit der Router nicht "Interner Fehler" als Typ interpretiert raise e wait_time = base_delay * (2 ** (attempt - 1)) @@ -106,8 +111,7 @@ class LLMService: async def generate_rag_response(self, query: str, context_str: str) -> str: """ - WICHTIG FÜR CHAT: - Kein JSON, keine Retries (User-Latency). + Chat-Wrapper: Immer Realtime. """ system_prompt = self.prompts.get("system_prompt", "") rag_template = self.prompts.get("rag_template", "{context_str}\n\n{query}") @@ -117,7 +121,9 @@ class LLMService: return await self.generate_raw_response( final_prompt, system=system_prompt, - max_retries=0 + max_retries=0, + force_json=False, + priority="realtime" # <--- Standard ) async def close(self): diff --git a/app/services/semantic_analyzer.py b/app/services/semantic_analyzer.py index cefe15e..3a971d6 100644 --- a/app/services/semantic_analyzer.py +++ b/app/services/semantic_analyzer.py @@ -1,6 +1,6 @@ """ app/services/semantic_analyzer.py — Edge Validation & Filtering -Version: 1.4 (Merged: Retry Strategy + Extended Observability) +Version: 2.0 (Update: Background Priority for Batch Jobs) """ import json @@ -24,6 +24,7 @@ class SemanticAnalyzer: Features: - Retry Strategy: Wartet bei Überlastung (max_retries=5). + - Priority Queue: Läuft als "background" Task, um den Chat nicht zu blockieren. - Observability: Loggt Input-Größe, Raw-Response und Parsing-Details. """ if not all_edges: @@ -44,27 +45,27 @@ class SemanticAnalyzer: # 2. Kandidaten-Liste formatieren edges_str = "\n".join([f"- {e}" for e in all_edges]) - # LOG: Request Info (Wichtig für Debugging) + # LOG: Request Info logger.debug(f"🔍 [SemanticAnalyzer] Request: {len(chunk_text)} chars Text, {len(all_edges)} Candidates.") # 3. Prompt füllen final_prompt = prompt_template.format( - chunk_text=chunk_text[:3500], # Etwas mehr Kontext als früher (3000 -> 3500) + chunk_text=chunk_text[:3500], edge_list=edges_str ) try: - # 4. LLM Call mit JSON Erzwingung UND Retry-Logik (Merged V1.3) - # max_retries=5 bedeutet: 5s -> 10s -> 20s -> 40s -> 80s Pause. + # 4. LLM Call mit Traffic Control (NEU: priority="background") + # Wir nutzen die "Slow Lane", damit der User im Chat nicht warten muss. response_json = await self.llm.generate_raw_response( prompt=final_prompt, force_json=True, max_retries=5, - base_delay=5.0 + base_delay=5.0, + priority="background" # <--- WICHTIG: Drosselung aktivieren ) - # LOG: Raw Response Preview (Wichtig um zu sehen, was das LLM liefert) - # Zeigt nur die ersten 200 Zeichen, um Log nicht zu fluten + # LOG: Raw Response Preview logger.debug(f"📥 [SemanticAnalyzer] Raw Response (Preview): {response_json[:200]}...") # 5. Parsing & Cleaning @@ -77,10 +78,9 @@ class SemanticAnalyzer: try: data = json.loads(clean_json) except json.JSONDecodeError as json_err: - # LOG: Detaillierter Fehlerbericht für den User logger.error(f"❌ [SemanticAnalyzer] JSON Decode Error.") logger.error(f" Grund: {json_err}") - logger.error(f" Empfangener String: {clean_json[:500]}") # Zeige max 500 chars des Fehlers + logger.error(f" Empfangener String: {clean_json[:500]}") logger.info(" -> Workaround: Fallback auf 'Alle Kanten' (durch Chunker).") return [] @@ -92,7 +92,7 @@ class SemanticAnalyzer: valid_edges = [str(e) for e in data if isinstance(e, str) and ":" in e] elif isinstance(data, dict): - # Abweichende Formate behandeln (Extended Logging V1.2) + # Abweichende Formate behandeln logger.info(f"ℹ️ [SemanticAnalyzer] LLM lieferte Dict statt Liste. Versuche Reparatur. Keys: {list(data.keys())}") for key, val in data.items(): @@ -100,7 +100,7 @@ class SemanticAnalyzer: if key.lower() in ["edges", "results", "kanten", "matches"] and isinstance(val, list): valid_edges.extend([str(e) for e in val if isinstance(e, str) and ":" in e]) - # Fall B: {"kind": "target"} (Das beobachtete Format im Log) + # Fall B: {"kind": "target"} elif isinstance(val, str): valid_edges.append(f"{key}:{val}") @@ -115,7 +115,6 @@ class SemanticAnalyzer: # LOG: Ergebnis if final_result: - # Nur Info, wenn wirklich was gefunden wurde, sonst spammt es bei leeren Chunks logger.info(f"✅ [SemanticAnalyzer] Success. {len(final_result)} Kanten zugewiesen.") else: logger.debug(" [SemanticAnalyzer] Keine spezifischen Kanten erkannt (Empty Result).")