fix und umbau fast and slow lane für UI und batch

This commit is contained in:
Lars 2025-12-12 19:00:35 +01:00
parent 1fe9582cbe
commit fc4c868eaa
2 changed files with 40 additions and 35 deletions

View File

@ -1,6 +1,6 @@
""" """
app/services/llm_service.py LLM Client (Ollama) app/services/llm_service.py LLM Client
Version: 0.5.2 (Fix: Removed strict limits, increased Context) Version: 2.7.0 (Clean Architecture: Explicit Priority Queues)
""" """
import httpx import httpx
@ -9,13 +9,12 @@ import logging
import os import os
import asyncio import asyncio
from pathlib import Path from pathlib import Path
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, Literal
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Settings: class Settings:
OLLAMA_URL = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434") OLLAMA_URL = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434")
# Timeout für die Generierung (lang)
LLM_TIMEOUT = float(os.getenv("MINDNET_LLM_TIMEOUT", 300.0)) LLM_TIMEOUT = float(os.getenv("MINDNET_LLM_TIMEOUT", 300.0))
LLM_MODEL = os.getenv("MINDNET_LLM_MODEL", "phi3:mini") LLM_MODEL = os.getenv("MINDNET_LLM_MODEL", "phi3:mini")
PROMPTS_PATH = os.getenv("MINDNET_PROMPTS_PATH", "./config/prompts.yaml") PROMPTS_PATH = os.getenv("MINDNET_PROMPTS_PATH", "./config/prompts.yaml")
@ -24,16 +23,13 @@ def get_settings():
return Settings() return Settings()
class LLMService: class LLMService:
# GLOBALER SEMAPHOR (Drosselung für Hintergrund-Prozesse)
_background_semaphore = asyncio.Semaphore(2)
def __init__(self): def __init__(self):
self.settings = get_settings() self.settings = get_settings()
self.prompts = self._load_prompts() self.prompts = self._load_prompts()
# FIX 1: Keine künstlichen Limits mehr. httpx defaults (100) sind besser.
# Wir wollen nicht, dass der Chat wartet, nur weil im Hintergrund Embeddings laufen.
# Timeout-Konfiguration:
# connect=10.0: Wenn Ollama nicht da ist, failen wir schnell.
# read=LLM_TIMEOUT: Wenn Ollama denkt, geben wir ihm Zeit.
self.timeout = httpx.Timeout(self.settings.LLM_TIMEOUT, connect=10.0) self.timeout = httpx.Timeout(self.settings.LLM_TIMEOUT, connect=10.0)
self.client = httpx.AsyncClient( self.client = httpx.AsyncClient(
@ -43,11 +39,9 @@ class LLMService:
def _load_prompts(self) -> dict: def _load_prompts(self) -> dict:
path = Path(self.settings.PROMPTS_PATH) path = Path(self.settings.PROMPTS_PATH)
if not path.exists(): if not path.exists(): return {}
return {}
try: try:
with open(path, "r", encoding="utf-8") as f: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f)
return yaml.safe_load(f)
except Exception as e: except Exception as e:
logger.error(f"Failed to load prompts: {e}") logger.error(f"Failed to load prompts: {e}")
return {} return {}
@ -58,19 +52,31 @@ class LLMService:
system: str = None, system: str = None,
force_json: bool = False, force_json: bool = False,
max_retries: int = 0, max_retries: int = 0,
base_delay: float = 2.0 base_delay: float = 2.0,
priority: Literal["realtime", "background"] = "realtime" # <--- NEU & EXPLIZIT
) -> str: ) -> str:
""" """
Führt einen LLM Call aus. Führt einen LLM Call aus.
priority="realtime": Chat (Sofort, keine Bremse).
priority="background": Import/Analyse (Gedrosselt durch Semaphore).
""" """
# Entscheidung basierend auf explizitem Parameter, nicht Format!
use_semaphore = (priority == "background")
if use_semaphore:
async with LLMService._background_semaphore:
return await self._execute_request(prompt, system, force_json, max_retries, base_delay)
else:
return await self._execute_request(prompt, system, force_json, max_retries, base_delay)
async def _execute_request(self, prompt, system, force_json, max_retries, base_delay):
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"model": self.settings.LLM_MODEL, "model": self.settings.LLM_MODEL,
"prompt": prompt, "prompt": prompt,
"stream": False, "stream": False,
"options": { "options": {
"temperature": 0.1 if force_json else 0.7, "temperature": 0.1 if force_json else 0.7,
# FIX 2: Kontext auf 8192 erhöht.
# Wichtig für komplexe Schemas und JSON-Stabilität.
"num_ctx": 8192 "num_ctx": 8192
} }
} }
@ -97,7 +103,6 @@ class LLMService:
attempt += 1 attempt += 1
if attempt > max_retries: if attempt > max_retries:
logger.error(f"LLM Final Error (Versuch {attempt}): {e}") logger.error(f"LLM Final Error (Versuch {attempt}): {e}")
# Wir werfen den Fehler weiter, damit der Router nicht "Interner Fehler" als Typ interpretiert
raise e raise e
wait_time = base_delay * (2 ** (attempt - 1)) wait_time = base_delay * (2 ** (attempt - 1))
@ -106,8 +111,7 @@ class LLMService:
async def generate_rag_response(self, query: str, context_str: str) -> str: async def generate_rag_response(self, query: str, context_str: str) -> str:
""" """
WICHTIG FÜR CHAT: Chat-Wrapper: Immer Realtime.
Kein JSON, keine Retries (User-Latency).
""" """
system_prompt = self.prompts.get("system_prompt", "") system_prompt = self.prompts.get("system_prompt", "")
rag_template = self.prompts.get("rag_template", "{context_str}\n\n{query}") rag_template = self.prompts.get("rag_template", "{context_str}\n\n{query}")
@ -117,7 +121,9 @@ class LLMService:
return await self.generate_raw_response( return await self.generate_raw_response(
final_prompt, final_prompt,
system=system_prompt, system=system_prompt,
max_retries=0 max_retries=0,
force_json=False,
priority="realtime" # <--- Standard
) )
async def close(self): async def close(self):

View File

@ -1,6 +1,6 @@
""" """
app/services/semantic_analyzer.py Edge Validation & Filtering app/services/semantic_analyzer.py Edge Validation & Filtering
Version: 1.4 (Merged: Retry Strategy + Extended Observability) Version: 2.0 (Update: Background Priority for Batch Jobs)
""" """
import json import json
@ -24,6 +24,7 @@ class SemanticAnalyzer:
Features: Features:
- Retry Strategy: Wartet bei Überlastung (max_retries=5). - Retry Strategy: Wartet bei Überlastung (max_retries=5).
- Priority Queue: Läuft als "background" Task, um den Chat nicht zu blockieren.
- Observability: Loggt Input-Größe, Raw-Response und Parsing-Details. - Observability: Loggt Input-Größe, Raw-Response und Parsing-Details.
""" """
if not all_edges: if not all_edges:
@ -44,27 +45,27 @@ class SemanticAnalyzer:
# 2. Kandidaten-Liste formatieren # 2. Kandidaten-Liste formatieren
edges_str = "\n".join([f"- {e}" for e in all_edges]) edges_str = "\n".join([f"- {e}" for e in all_edges])
# LOG: Request Info (Wichtig für Debugging) # LOG: Request Info
logger.debug(f"🔍 [SemanticAnalyzer] Request: {len(chunk_text)} chars Text, {len(all_edges)} Candidates.") logger.debug(f"🔍 [SemanticAnalyzer] Request: {len(chunk_text)} chars Text, {len(all_edges)} Candidates.")
# 3. Prompt füllen # 3. Prompt füllen
final_prompt = prompt_template.format( final_prompt = prompt_template.format(
chunk_text=chunk_text[:3500], # Etwas mehr Kontext als früher (3000 -> 3500) chunk_text=chunk_text[:3500],
edge_list=edges_str edge_list=edges_str
) )
try: try:
# 4. LLM Call mit JSON Erzwingung UND Retry-Logik (Merged V1.3) # 4. LLM Call mit Traffic Control (NEU: priority="background")
# max_retries=5 bedeutet: 5s -> 10s -> 20s -> 40s -> 80s Pause. # Wir nutzen die "Slow Lane", damit der User im Chat nicht warten muss.
response_json = await self.llm.generate_raw_response( response_json = await self.llm.generate_raw_response(
prompt=final_prompt, prompt=final_prompt,
force_json=True, force_json=True,
max_retries=5, max_retries=5,
base_delay=5.0 base_delay=5.0,
priority="background" # <--- WICHTIG: Drosselung aktivieren
) )
# LOG: Raw Response Preview (Wichtig um zu sehen, was das LLM liefert) # LOG: Raw Response Preview
# Zeigt nur die ersten 200 Zeichen, um Log nicht zu fluten
logger.debug(f"📥 [SemanticAnalyzer] Raw Response (Preview): {response_json[:200]}...") logger.debug(f"📥 [SemanticAnalyzer] Raw Response (Preview): {response_json[:200]}...")
# 5. Parsing & Cleaning # 5. Parsing & Cleaning
@ -77,10 +78,9 @@ class SemanticAnalyzer:
try: try:
data = json.loads(clean_json) data = json.loads(clean_json)
except json.JSONDecodeError as json_err: except json.JSONDecodeError as json_err:
# LOG: Detaillierter Fehlerbericht für den User
logger.error(f"❌ [SemanticAnalyzer] JSON Decode Error.") logger.error(f"❌ [SemanticAnalyzer] JSON Decode Error.")
logger.error(f" Grund: {json_err}") logger.error(f" Grund: {json_err}")
logger.error(f" Empfangener String: {clean_json[:500]}") # Zeige max 500 chars des Fehlers logger.error(f" Empfangener String: {clean_json[:500]}")
logger.info(" -> Workaround: Fallback auf 'Alle Kanten' (durch Chunker).") logger.info(" -> Workaround: Fallback auf 'Alle Kanten' (durch Chunker).")
return [] return []
@ -92,7 +92,7 @@ class SemanticAnalyzer:
valid_edges = [str(e) for e in data if isinstance(e, str) and ":" in e] valid_edges = [str(e) for e in data if isinstance(e, str) and ":" in e]
elif isinstance(data, dict): elif isinstance(data, dict):
# Abweichende Formate behandeln (Extended Logging V1.2) # Abweichende Formate behandeln
logger.info(f" [SemanticAnalyzer] LLM lieferte Dict statt Liste. Versuche Reparatur. Keys: {list(data.keys())}") logger.info(f" [SemanticAnalyzer] LLM lieferte Dict statt Liste. Versuche Reparatur. Keys: {list(data.keys())}")
for key, val in data.items(): for key, val in data.items():
@ -100,7 +100,7 @@ class SemanticAnalyzer:
if key.lower() in ["edges", "results", "kanten", "matches"] and isinstance(val, list): if key.lower() in ["edges", "results", "kanten", "matches"] and isinstance(val, list):
valid_edges.extend([str(e) for e in val if isinstance(e, str) and ":" in e]) valid_edges.extend([str(e) for e in val if isinstance(e, str) and ":" in e])
# Fall B: {"kind": "target"} (Das beobachtete Format im Log) # Fall B: {"kind": "target"}
elif isinstance(val, str): elif isinstance(val, str):
valid_edges.append(f"{key}:{val}") valid_edges.append(f"{key}:{val}")
@ -115,7 +115,6 @@ class SemanticAnalyzer:
# LOG: Ergebnis # LOG: Ergebnis
if final_result: if final_result:
# Nur Info, wenn wirklich was gefunden wurde, sonst spammt es bei leeren Chunks
logger.info(f"✅ [SemanticAnalyzer] Success. {len(final_result)} Kanten zugewiesen.") logger.info(f"✅ [SemanticAnalyzer] Success. {len(final_result)} Kanten zugewiesen.")
else: else:
logger.debug(" [SemanticAnalyzer] Keine spezifischen Kanten erkannt (Empty Result).") logger.debug(" [SemanticAnalyzer] Keine spezifischen Kanten erkannt (Empty Result).")