mindnet/app/routers/chat.py
2025-12-10 13:48:13 +01:00

345 lines
12 KiB
Python

"""
app/routers/chat.py — RAG Endpunkt (WP-06 Hybrid Router + WP-07 Interview Mode)
Version: 2.4.0 (Interview Support)
Features:
- Hybrid Intent Router (Keyword + LLM)
- Strategic Retrieval (Late Binding via Config)
- Interview Loop (Schema-driven Data Collection)
- Context Enrichment (Payload/Source Fallback)
- Data Flywheel (Feedback Logging Integration)
"""
from fastapi import APIRouter, HTTPException, Depends
from typing import List, Dict, Any
import time
import uuid
import logging
import yaml
from pathlib import Path
from app.config import get_settings
from app.models.dto import ChatRequest, ChatResponse, QueryRequest, QueryHit
from app.services.llm_service import LLMService
from app.core.retriever import Retriever
from app.services.feedback_service import log_search
router = APIRouter()
logger = logging.getLogger(__name__)
# --- Helper: Config Loader ---
_DECISION_CONFIG_CACHE = None
def _load_decision_config() -> Dict[str, Any]:
settings = get_settings()
path = Path(settings.DECISION_CONFIG_PATH)
default_config = {
"strategies": {
"FACT": {"trigger_keywords": []}
}
}
if not path.exists():
logger.warning(f"Decision config not found at {path}, using defaults.")
return default_config
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load decision config: {e}")
return default_config
def get_full_config() -> Dict[str, Any]:
global _DECISION_CONFIG_CACHE
if _DECISION_CONFIG_CACHE is None:
_DECISION_CONFIG_CACHE = _load_decision_config()
return _DECISION_CONFIG_CACHE
def get_decision_strategy(intent: str) -> Dict[str, Any]:
config = get_full_config()
strategies = config.get("strategies", {})
return strategies.get(intent, strategies.get("FACT", {}))
# --- Helper: Target Type Detection (WP-07) ---
def _detect_target_type(message: str, configured_schemas: Dict[str, Any]) -> str:
"""
Versucht zu erraten, welchen Notiz-Typ der User erstellen will.
Nutzt Keywords und Mappings.
"""
message_lower = message.lower()
# 1. Direkter Match mit Schema-Keys (z.B. "projekt", "entscheidung")
# Ignoriere 'default' hier
for type_key in configured_schemas.keys():
if type_key == "default":
continue
if type_key in message_lower:
return type_key
# 2. Synonym-Mapping (Deutsch -> Schema Key)
# Dies verbessert die UX, falls User deutsche Begriffe nutzen
synonyms = {
"projekt": "project",
"vorhaben": "project",
"entscheidung": "decision",
"beschluss": "decision",
"ziel": "goal",
"erfahrung": "experience",
"lektion": "experience",
"wert": "value",
"prinzip": "principle",
"grundsatz": "principle",
"notiz": "default",
"idee": "default"
}
for term, schema_key in synonyms.items():
if term in message_lower:
# Prüfen, ob der gemappte Key auch konfiguriert ist
if schema_key in configured_schemas:
return schema_key
return "default"
# --- Dependencies ---
def get_llm_service():
return LLMService()
def get_retriever():
return Retriever()
# --- Logic ---
def _build_enriched_context(hits: List[QueryHit]) -> str:
context_parts = []
for i, hit in enumerate(hits, 1):
source = hit.source or {}
content = (
source.get("text") or source.get("content") or
source.get("page_content") or source.get("chunk_text") or
"[Kein Text]"
)
title = hit.note_id or "Unbekannt"
# [FIX] Robustes Auslesen des Typs (Payload > Source > Unknown)
payload = hit.payload or {}
note_type = payload.get("type") or source.get("type", "unknown")
note_type = str(note_type).upper()
entry = (
f"### QUELLE {i}: {title}\n"
f"TYP: [{note_type}] (Score: {hit.total_score:.2f})\n"
f"INHALT:\n{content}\n"
)
context_parts.append(entry)
return "\n\n".join(context_parts)
async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
"""
Hybrid Router v3:
Gibt Tuple zurück: (Intent, Source)
"""
config = get_full_config()
strategies = config.get("strategies", {})
settings = config.get("settings", {})
query_lower = query.lower()
best_intent = None
max_match_length = 0
# 1. FAST PATH: Keywords
for intent_name, strategy in strategies.items():
if intent_name == "FACT": continue
keywords = strategy.get("trigger_keywords", [])
for k in keywords:
if k.lower() in query_lower:
if len(k) > max_match_length:
max_match_length = len(k)
best_intent = intent_name
if best_intent:
return best_intent, "Keyword (Fast Path)"
# 2. SLOW PATH: LLM Router
if settings.get("llm_fallback_enabled", False):
router_prompt_template = settings.get("llm_router_prompt", "")
if router_prompt_template:
prompt = router_prompt_template.replace("{query}", query)
logger.info("Keywords failed. Asking LLM for Intent...")
raw_response = await llm.generate_raw_response(prompt)
# Parsing logic
llm_output_upper = raw_response.upper()
found_intents = []
for strat_key in strategies.keys():
if strat_key in llm_output_upper:
found_intents.append(strat_key)
if len(found_intents) == 1:
return found_intents[0], "LLM Router (Slow Path)"
elif len(found_intents) > 1:
return found_intents[0], f"LLM Ambiguous {found_intents}"
else:
return "FACT", "LLM Fallback (No Match)"
return "FACT", "Default (No Match)"
@router.post("/", response_model=ChatResponse)
async def chat_endpoint(
request: ChatRequest,
llm: LLMService = Depends(get_llm_service),
retriever: Retriever = Depends(get_retriever)
):
start_time = time.time()
query_id = str(uuid.uuid4())
logger.info(f"Chat request [{query_id}]: {request.message[:50]}...")
try:
# 1. Intent Detection (mit Source)
intent, intent_source = await _classify_intent(request.message, llm)
logger.info(f"[{query_id}] Final Intent: {intent} via {intent_source}")
# Strategy Load
strategy = get_decision_strategy(intent)
prompt_key = strategy.get("prompt_template", "rag_template")
# --- SPLIT LOGIC: INTERVIEW vs. RAG ---
sources_hits = []
final_prompt = ""
if intent == "INTERVIEW":
# --- WP-07: INTERVIEW MODE ---
# Kein Retrieval. Wir nutzen den Dialog-Kontext.
# 1. Schema Loading (Late Binding)
schemas = strategy.get("schemas", {})
target_type = _detect_target_type(request.message, schemas)
active_schema = schemas.get(target_type, schemas.get("default"))
logger.info(f"[{query_id}] Starting Interview for Type: {target_type}")
# Robustes Schema-Parsing (Dict vs List)
if isinstance(active_schema, dict):
fields_list = active_schema.get("fields", [])
hint_str = active_schema.get("hint", "")
else:
fields_list = active_schema # Fallback falls nur Liste definiert
hint_str = ""
fields_str = "\n- " + "\n- ".join(fields_list)
# 2. Context Logic
# Hinweis: In einer Stateless-API ist {context_str} idealerweise die History.
# Da ChatRequest (noch) kein History-Feld hat, nutzen wir einen Placeholder
# oder verlassen uns darauf, dass der Client die History im Prompt mitschickt
# (Streamlit Pattern: Appends history to prompt).
# Wir labeln es hier explizit.
context_str = "Bisheriger Verlauf (falls vorhanden): Siehe oben/unten."
# 3. Prompt Assembly
template = llm.prompts.get(prompt_key, "")
final_prompt = template.replace("{context_str}", context_str) \
.replace("{query}", request.message) \
.replace("{target_type}", target_type) \
.replace("{schema_fields}", fields_str) \
.replace("{schema_hint}", hint_str)
# Keine Hits im Interview
sources_hits = []
else:
# --- WP-06: STANDARD RAG MODE ---
inject_types = strategy.get("inject_types", [])
prepend_instr = strategy.get("prepend_instruction", "")
# 2. Primary Retrieval
query_req = QueryRequest(
query=request.message,
mode="hybrid",
top_k=request.top_k,
explain=request.explain
)
retrieve_result = await retriever.search(query_req)
hits = retrieve_result.results
# 3. Strategic Retrieval (WP-06 Kernfeature)
if inject_types:
logger.info(f"[{query_id}] Executing Strategic Retrieval for types: {inject_types}...")
strategy_req = QueryRequest(
query=request.message,
mode="hybrid",
top_k=3,
filters={"type": inject_types},
explain=False
)
strategy_result = await retriever.search(strategy_req)
existing_ids = {h.node_id for h in hits}
for strat_hit in strategy_result.results:
if strat_hit.node_id not in existing_ids:
hits.append(strat_hit)
# 4. Context Building
if not hits:
context_str = "Keine relevanten Notizen gefunden."
else:
context_str = _build_enriched_context(hits)
# 5. Generation Setup
template = llm.prompts.get(prompt_key, "{context_str}\n\n{query}")
if prepend_instr:
context_str = f"{prepend_instr}\n\n{context_str}"
final_prompt = template.replace("{context_str}", context_str).replace("{query}", request.message)
sources_hits = hits
# --- COMMON GENERATION ---
system_prompt = llm.prompts.get("system_prompt", "")
logger.info(f"[{query_id}] Sending to LLM (Intent: {intent}, Template: {prompt_key})...")
# System-Prompt separat übergeben
answer_text = await llm.generate_raw_response(prompt=final_prompt, system=system_prompt)
duration_ms = int((time.time() - start_time) * 1000)
# 6. Logging (Fire & Forget)
try:
log_search(
query_id=query_id,
query_text=request.message,
results=sources_hits,
mode="interview" if intent == "INTERVIEW" else "chat_rag",
metadata={
"intent": intent,
"intent_source": intent_source,
"generated_answer": answer_text,
"model": llm.settings.LLM_MODEL
}
)
except Exception as e:
logger.error(f"Logging failed: {e}")
# 7. Response
return ChatResponse(
query_id=query_id,
answer=answer_text,
sources=sources_hits,
latency_ms=duration_ms,
intent=intent,
intent_source=intent_source
)
except Exception as e:
logger.error(f"Error in chat endpoint: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))