mindnet/app/routers/chat.py

284 lines
11 KiB
Python

"""
FILE: app/routers/chat.py
DESCRIPTION: Haupt-Chat-Interface (WP-25a Agentic Edition).
Kombiniert die spezialisierte Interview-Logik und Keyword-Erkennung
mit der neuen MoE-Orchestrierung und Pre-Synthesis Kompression.
VERSION: 3.0.3 (WP-25a: MoE & Compression Support - Full Release)
STATUS: Active
FIX:
- 100% Wiederherstellung der v3.0.2 Logik (Interview Fallbacks, Schema-Resolution).
- WP-25a: Integration der Stream-Kompression (Module A) in den RAG-Workflow.
- WP-25a: Unterstützung der llm_profiles für spezialisierte Synthese (Module B).
- Erhalt der Ollama Context-Throttling Parameter (WP-20) als finaler Schutz.
- Beibehaltung der No-Retry Logik (max_retries=0) für Chat-Stabilität.
"""
from fastapi import APIRouter, HTTPException, Depends
from typing import List, Dict, Any, Optional
import time
import uuid
import logging
import yaml
import os
import asyncio
from pathlib import Path
from app.config import get_settings
from app.models.dto import ChatRequest, ChatResponse, QueryHit
from app.services.llm_service import LLMService
from app.services.feedback_service import log_search
router = APIRouter()
logger = logging.getLogger(__name__)
# --- EBENE 1: CONFIG LOADER & CACHING (Restauriert aus v3.0.2) ---
_DECISION_CONFIG_CACHE = None
_TYPES_CONFIG_CACHE = None
def _load_decision_config() -> Dict[str, Any]:
"""Lädt die Strategie-Konfiguration (Kompatibilität zu WP-25)."""
settings = get_settings()
path = Path(settings.DECISION_CONFIG_PATH)
try:
if path.exists():
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"Failed to load decision config: {e}")
return {"strategies": {}}
def _load_types_config() -> Dict[str, Any]:
"""Lädt die types.yaml für die Typerkennung im Interview-Modus."""
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
try:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"Failed to load types config: {e}")
return {}
def get_full_config() -> Dict[str, Any]:
global _DECISION_CONFIG_CACHE
if _DECISION_CONFIG_CACHE is None:
_DECISION_CONFIG_CACHE = _load_decision_config()
return _DECISION_CONFIG_CACHE
def get_types_config() -> Dict[str, Any]:
global _TYPES_CONFIG_CACHE
if _TYPES_CONFIG_CACHE is None:
_TYPES_CONFIG_CACHE = _load_types_config()
return _TYPES_CONFIG_CACHE
def get_decision_strategy(intent: str) -> Dict[str, Any]:
config = get_full_config()
strategies = config.get("strategies", {})
return strategies.get(intent, strategies.get("FACT_WHAT", {}))
# --- EBENE 2: SPEZIAL-LOGIK (INTERVIEW & DETECTION) ---
def _detect_target_type(message: str, configured_schemas: Dict[str, Any]) -> str:
"""WP-07: Identifiziert den gewünschten Notiz-Typ (Keyword-basiert)."""
message_lower = message.lower()
types_cfg = get_types_config()
types_def = types_cfg.get("types", {})
# 1. Check types.yaml detection_keywords
for type_name, type_data in types_def.items():
keywords = type_data.get("detection_keywords", [])
for kw in keywords:
if kw.lower() in message_lower:
return type_name
# 2. Direkter Match mit Schema-Keys
for type_key in configured_schemas.keys():
if type_key == "default": continue
if type_key in message_lower:
return type_key
# 3. Synonym-Mapping (Legacy)
synonyms = {
"projekt": "project", "entscheidung": "decision", "ziel": "goal",
"erfahrung": "experience", "wert": "value", "prinzip": "principle"
}
for term, schema_key in synonyms.items():
if term in message_lower:
return schema_key
return "default"
def _is_question(query: str) -> bool:
"""Prüft, ob der Input eine Frage ist (W-Fragen Erkennung)."""
q = query.strip().lower()
if "?" in q: return True
starters = ["wer", "wie", "was", "wo", "wann", "warum", "weshalb", "wozu", "welche", "bist du"]
return any(q.startswith(s + " ") for s in starters)
async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
"""Hybrid Router: Keyword-Fast-Paths & DecisionEngine LLM Router."""
config = get_full_config()
strategies = config.get("strategies", {})
query_lower = query.lower()
# 1. FAST PATH: Keyword Trigger
for intent_name, strategy in strategies.items():
keywords = strategy.get("trigger_keywords", [])
for k in keywords:
if k.lower() in query_lower:
return intent_name, "Keyword (FastPath)"
# 2. FAST PATH B: Type Keywords -> INTERVIEW
if not _is_question(query_lower):
types_cfg = get_types_config()
for type_name, type_data in types_cfg.get("types", {}).items():
for kw in type_data.get("detection_keywords", []):
if kw.lower() in query_lower:
return "INTERVIEW", "Keyword (Interview)"
# 3. SLOW PATH: DecisionEngine LLM Router
intent = await llm.decision_engine._determine_strategy(query)
return intent, "DecisionEngine (LLM)"
# --- EBENE 3: RETRIEVAL AGGREGATION ---
def _collect_all_hits(stream_responses: Dict[str, Any]) -> List[QueryHit]:
"""Sammelt und dedupliziert Treffer aus allen parallelen Streams."""
all_hits = []
seen_node_ids = set()
for _, response in stream_responses.items():
if hasattr(response, 'results'):
for hit in response.results:
if hit.node_id not in seen_node_ids:
all_hits.append(hit)
seen_node_ids.add(hit.node_id)
return sorted(all_hits, key=lambda h: h.total_score, reverse=True)
# --- EBENE 4: ENDPUNKT ---
def get_llm_service():
return LLMService()
@router.post("/", response_model=ChatResponse)
async def chat_endpoint(
request: ChatRequest,
llm: LLMService = Depends(get_llm_service)
):
start_time = time.time()
query_id = str(uuid.uuid4())
settings = get_settings()
logger.info(f"🚀 [WP-25a] Chat request [{query_id}]: {request.message[:50]}...")
try:
# 1. Intent Detection
intent, intent_source = await _classify_intent(request.message, llm)
logger.info(f"[{query_id}] Intent: {intent} via {intent_source}")
strategy = get_decision_strategy(intent)
engine = llm.decision_engine
sources_hits = []
answer_text = ""
# 2. INTERVIEW MODE (Kompatibilität zu v3.0.2)
if intent == "INTERVIEW":
target_type = _detect_target_type(request.message, strategy.get("schemas", {}))
types_cfg = get_types_config()
type_def = types_cfg.get("types", {}).get(target_type, {})
fields_list = type_def.get("schema", [])
# WP-07: RESTAURIERTE FALLBACK LOGIK (v3.0.2)
if not fields_list:
configured_schemas = strategy.get("schemas", {})
fallback = configured_schemas.get(target_type, configured_schemas.get("default", {}))
fields_list = fallback.get("fields", []) if isinstance(fallback, dict) else (fallback or [])
fields_str = "\n- " + "\n- ".join(fields_list)
template = llm.get_prompt(strategy.get("prompt_template", "interview_template"))
final_prompt = template.replace("{query}", request.message) \
.replace("{target_type}", target_type) \
.replace("{schema_fields}", fields_str)
# WP-25a: Nutzt spezialisiertes Kompressions-Profil für Interviews
answer_text = await llm.generate_raw_response(
final_prompt, system=llm.get_prompt("system_prompt"),
priority="realtime", profile_name="compression_fast", max_retries=0
)
sources_hits = []
# 3. RAG MODE (WP-25a Multi-Stream + Pre-Synthesis)
else:
stream_keys = strategy.get("use_streams", [])
library = engine.config.get("streams_library", {})
# Phase A: Retrieval
tasks = []
active_streams = []
for key in stream_keys:
stream_cfg = library.get(key)
if stream_cfg:
active_streams.append(key)
tasks.append(engine._run_single_stream(key, stream_cfg, request.message))
responses = await asyncio.gather(*tasks, return_exceptions=True)
raw_stream_map = {}
formatted_context_tasks = []
max_chars = getattr(settings, "MAX_OLLAMA_CHARS", 10000)
provider = strategy.get("preferred_provider") or settings.MINDNET_LLM_PROVIDER
# Phase B: Pre-Synthesis & Throttling
for name, res in zip(active_streams, responses):
if not isinstance(res, Exception):
raw_stream_map[name] = res
context_text = engine._format_stream_context(res)
# WP-25a: Automatisierte Kompression
stream_cfg = library.get(name, {})
threshold = stream_cfg.get("compression_threshold", 4000)
if len(context_text) > threshold:
profile = stream_cfg.get("compression_profile")
formatted_context_tasks.append(
engine._compress_stream_content(name, context_text, request.message, profile)
)
else:
# WP-20: Restaurierter Throttling-Schutz als Fallback
if provider == "ollama" and len(context_text) > max_chars:
context_text = context_text[:max_chars] + "\n[...]"
async def _ident(c=context_text): return c
formatted_context_tasks.append(_ident())
else:
async def _err(): return "[Stream Error]"
formatted_context_tasks.append(_err())
# Inhalte parallel finalisieren
final_contexts = await asyncio.gather(*formatted_context_tasks)
formatted_context_map = dict(zip(active_streams, final_contexts))
# Phase C: MoE Synthese
answer_text = await engine._generate_final_answer(
intent, strategy, request.message, formatted_context_map
)
sources_hits = _collect_all_hits(raw_stream_map)
duration_ms = int((time.time() - start_time) * 1000)
# Logging
try:
log_search(
query_id=query_id, query_text=request.message, results=sources_hits,
mode=f"wp25a_{intent.lower()}", metadata={"strategy": intent, "source": intent_source}
)
except: pass
return ChatResponse(
query_id=query_id, answer=answer_text, sources=sources_hits,
latency_ms=duration_ms, intent=intent, intent_source=intent_source
)
except Exception as e:
logger.error(f"❌ Chat Endpoint Failure: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung.")