362 lines
14 KiB
Python
362 lines
14 KiB
Python
"""
|
||
FILE: app/routers/chat.py
|
||
DESCRIPTION: Haupt-Chat-Interface (WP-25b Edition).
|
||
Kombiniert die spezialisierte Interview-Logik mit der neuen
|
||
Lazy-Prompt-Orchestration und MoE-Synthese.
|
||
WP-24c: Integration der Discovery API für proaktive Vernetzung.
|
||
VERSION: 3.1.0 (WP-24c: Discovery API Integration)
|
||
STATUS: Active
|
||
FIX:
|
||
- WP-24c: Neuer Endpunkt /query/discover für proaktive Kanten-Vorschläge.
|
||
- WP-25b: Umstellung des Interview-Modus auf Lazy-Prompt (prompt_key + variables).
|
||
- WP-25b: Delegation der RAG-Phase an die Engine v1.3.0 für konsistente MoE-Steuerung.
|
||
- WP-25a: Voller Erhalt der v3.0.2 Logik (Interview, Schema-Resolution, FastPaths).
|
||
"""
|
||
|
||
from fastapi import APIRouter, HTTPException, Depends
|
||
from typing import List, Dict, Any, Optional
|
||
from pydantic import BaseModel
|
||
import time
|
||
import uuid
|
||
import logging
|
||
import yaml
|
||
import os
|
||
import asyncio
|
||
from pathlib import Path
|
||
|
||
from app.config import get_settings
|
||
from app.models.dto import ChatRequest, ChatResponse, QueryHit, QueryRequest
|
||
from app.services.llm_service import LLMService
|
||
from app.services.feedback_service import log_search
|
||
|
||
router = APIRouter()
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# --- EBENE 0: DTOs FÜR DISCOVERY (WP-24c) ---
|
||
|
||
class DiscoveryRequest(BaseModel):
|
||
content: str
|
||
top_k: int = 8
|
||
min_confidence: float = 0.6
|
||
|
||
class DiscoveryHit(BaseModel):
|
||
target_note: str # Note ID
|
||
target_title: str # Menschenlesbarer Titel
|
||
suggested_edge_type: str # Kanonischer Typ aus edge_vocabulary
|
||
confidence_score: float # Kombinierter Vektor- + KI-Score
|
||
reasoning: str # Kurze Begründung der KI
|
||
|
||
# --- EBENE 1: CONFIG LOADER & CACHING (WP-25 Standard) ---
|
||
|
||
_DECISION_CONFIG_CACHE = None
|
||
_TYPES_CONFIG_CACHE = None
|
||
|
||
def _load_decision_config() -> Dict[str, Any]:
|
||
"""Lädt die Strategie-Konfiguration."""
|
||
settings = get_settings()
|
||
path = Path(settings.DECISION_CONFIG_PATH)
|
||
try:
|
||
if path.exists():
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return yaml.safe_load(f) or {}
|
||
except Exception as e:
|
||
logger.error(f"Failed to load decision config: {e}")
|
||
return {"strategies": {}}
|
||
|
||
def _load_types_config() -> Dict[str, Any]:
|
||
"""Lädt die types.yaml für die Typerkennung."""
|
||
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
|
||
try:
|
||
if os.path.exists(path):
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return yaml.safe_load(f) or {}
|
||
except Exception as e:
|
||
logger.error(f"Failed to load types config: {e}")
|
||
return {}
|
||
|
||
def get_full_config() -> Dict[str, Any]:
|
||
global _DECISION_CONFIG_CACHE
|
||
if _DECISION_CONFIG_CACHE is None:
|
||
_DECISION_CONFIG_CACHE = _load_decision_config()
|
||
return _DECISION_CONFIG_CACHE
|
||
|
||
def get_types_config() -> Dict[str, Any]:
|
||
global _TYPES_CONFIG_CACHE
|
||
if _TYPES_CONFIG_CACHE is None:
|
||
_TYPES_CONFIG_CACHE = _load_types_config()
|
||
return _TYPES_CONFIG_CACHE
|
||
|
||
def get_decision_strategy(intent: str) -> Dict[str, Any]:
|
||
config = get_full_config()
|
||
strategies = config.get("strategies", {})
|
||
return strategies.get(intent, strategies.get("FACT_WHAT", {}))
|
||
|
||
# --- EBENE 2: SPEZIAL-LOGIK (INTERVIEW & DETECTION) ---
|
||
|
||
def _detect_target_type(message: str, configured_schemas: Dict[str, Any]) -> str:
|
||
"""WP-07: Identifiziert den gewünschten Notiz-Typ (Keyword-basiert)."""
|
||
message_lower = message.lower()
|
||
types_cfg = get_types_config()
|
||
types_def = types_cfg.get("types", {})
|
||
|
||
# 1. Check types.yaml detection_keywords
|
||
for type_name, type_data in types_def.items():
|
||
keywords = type_data.get("detection_keywords", [])
|
||
for kw in keywords:
|
||
if kw.lower() in message_lower:
|
||
return type_name
|
||
|
||
# 2. Direkter Match mit Schema-Keys
|
||
for type_key in configured_schemas.keys():
|
||
if type_key == "default": continue
|
||
if type_key in message_lower:
|
||
return type_key
|
||
|
||
# 3. Synonym-Mapping (Legacy)
|
||
synonyms = {
|
||
"projekt": "project", "entscheidung": "decision", "ziel": "goal",
|
||
"erfahrung": "experience", "wert": "value", "prinzip": "principle"
|
||
}
|
||
for term, schema_key in synonyms.items():
|
||
if term in message_lower:
|
||
return schema_key
|
||
|
||
return "default"
|
||
|
||
def _is_question(query: str) -> bool:
|
||
"""Prüft, ob der Input eine Frage ist."""
|
||
q = query.strip().lower()
|
||
if "?" in q: return True
|
||
starters = ["wer", "wie", "was", "wo", "wann", "warum", "weshalb", "wozu", "welche", "bist du"]
|
||
return any(q.startswith(s + " ") for s in starters)
|
||
|
||
async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
|
||
"""Hybrid Router: Keyword-Fast-Paths & DecisionEngine LLM Router."""
|
||
config = get_full_config()
|
||
strategies = config.get("strategies", {})
|
||
query_lower = query.lower()
|
||
|
||
# 1. FAST PATH: Keyword Trigger
|
||
for intent_name, strategy in strategies.items():
|
||
keywords = strategy.get("trigger_keywords", [])
|
||
for k in keywords:
|
||
if k.lower() in query_lower:
|
||
return intent_name, "Keyword (FastPath)"
|
||
|
||
# 2. FAST PATH B: Type Keywords -> INTERVIEW
|
||
if not _is_question(query_lower):
|
||
types_cfg = get_types_config()
|
||
for type_name, type_data in types_cfg.get("types", {}).items():
|
||
for kw in type_data.get("detection_keywords", []):
|
||
if kw.lower() in query_lower:
|
||
return "INTERVIEW", "Keyword (Interview)"
|
||
|
||
# 3. SLOW PATH: DecisionEngine LLM Router (MoE-gesteuert)
|
||
intent = await llm.decision_engine._determine_strategy(query)
|
||
return intent, "DecisionEngine (LLM)"
|
||
|
||
# --- EBENE 3: RETRIEVAL AGGREGATION ---
|
||
|
||
def _collect_all_hits(stream_responses: Dict[str, Any]) -> List[QueryHit]:
|
||
"""Sammelt deduplizierte Treffer aus allen Streams für das Tracing."""
|
||
all_hits = []
|
||
seen_node_ids = set()
|
||
for _, response in stream_responses.items():
|
||
# Sammeln der Hits aus den QueryResponse Objekten
|
||
if hasattr(response, 'results'):
|
||
for hit in response.results:
|
||
if hit.node_id not in seen_node_ids:
|
||
all_hits.append(hit)
|
||
seen_node_ids.add(hit.node_id)
|
||
return sorted(all_hits, key=lambda h: h.total_score, reverse=True)
|
||
|
||
# --- EBENE 4: ENDPUNKTE ---
|
||
|
||
def get_llm_service():
|
||
return LLMService()
|
||
|
||
@router.post("/", response_model=ChatResponse)
|
||
async def chat_endpoint(
|
||
request: ChatRequest,
|
||
llm: LLMService = Depends(get_llm_service)
|
||
):
|
||
start_time = time.time()
|
||
query_id = str(uuid.uuid4())
|
||
logger.info(f"🚀 [WP-25b] Chat request [{query_id}]: {request.message[:50]}...")
|
||
|
||
try:
|
||
# 1. Intent Detection
|
||
intent, intent_source = await _classify_intent(request.message, llm)
|
||
logger.info(f"[{query_id}] Intent: {intent} via {intent_source}")
|
||
|
||
strategy = get_decision_strategy(intent)
|
||
engine = llm.decision_engine
|
||
|
||
sources_hits = []
|
||
answer_text = ""
|
||
|
||
# 2. INTERVIEW MODE (WP-25b Lazy-Prompt Logik)
|
||
if intent == "INTERVIEW":
|
||
target_type = _detect_target_type(request.message, strategy.get("schemas", {}))
|
||
types_cfg = get_types_config()
|
||
type_def = types_cfg.get("types", {}).get(target_type, {})
|
||
fields_list = type_def.get("schema", [])
|
||
|
||
# WP-07: Restaurierte Fallback Logik
|
||
if not fields_list:
|
||
configured_schemas = strategy.get("schemas", {})
|
||
fallback = configured_schemas.get(target_type, configured_schemas.get("default", {}))
|
||
fields_list = fallback.get("fields", []) if isinstance(fallback, dict) else (fallback or [])
|
||
|
||
fields_str = "\n- " + "\n- ".join(fields_list)
|
||
template_key = strategy.get("prompt_template", "interview_template")
|
||
|
||
# WP-25b: Lazy Loading Call
|
||
answer_text = await llm.generate_raw_response(
|
||
prompt_key=template_key,
|
||
variables={
|
||
"query": request.message,
|
||
"target_type": target_type,
|
||
"schema_fields": fields_str
|
||
},
|
||
system=llm.get_prompt("system_prompt"),
|
||
priority="realtime",
|
||
profile_name="compression_fast",
|
||
max_retries=0
|
||
)
|
||
sources_hits = []
|
||
|
||
# 3. RAG MODE (WP-25b Delegation an Engine v1.3.0)
|
||
else:
|
||
# Phase A & B: Retrieval & Kompression (Delegiert an Engine v1.3.0)
|
||
formatted_context_map = await engine._execute_parallel_streams(strategy, request.message)
|
||
|
||
# Erfassung der Quellen für das Tracing
|
||
raw_stream_map = {}
|
||
stream_keys = strategy.get("use_streams", [])
|
||
library = engine.config.get("streams_library", {})
|
||
|
||
retrieval_tasks = []
|
||
active_streams = []
|
||
for key in stream_keys:
|
||
if key in library:
|
||
active_streams.append(key)
|
||
retrieval_tasks.append(engine._run_single_stream(key, library[key], request.message))
|
||
|
||
responses = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
|
||
for name, res in zip(active_streams, responses):
|
||
if not isinstance(res, Exception):
|
||
raw_stream_map[name] = res
|
||
|
||
sources_hits = _collect_all_hits(raw_stream_map)
|
||
|
||
# Phase C: Finale MoE Synthese (Delegiert an Engine v1.3.0)
|
||
answer_text = await engine._generate_final_answer(
|
||
intent, strategy, request.message, formatted_context_map
|
||
)
|
||
|
||
duration_ms = int((time.time() - start_time) * 1000)
|
||
|
||
# Logging (WP-15)
|
||
try:
|
||
log_search(
|
||
query_id=query_id, query_text=request.message, results=sources_hits,
|
||
mode=f"wp25b_{intent.lower()}", metadata={"strategy": intent, "source": intent_source}
|
||
)
|
||
except: pass
|
||
|
||
return ChatResponse(
|
||
query_id=query_id, answer=answer_text, sources=sources_hits,
|
||
latency_ms=duration_ms, intent=intent, intent_source=intent_source
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Chat Endpoint Failure: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.")
|
||
|
||
@router.post("/query/discover", response_model=List[DiscoveryHit])
|
||
async def discover_edges(
|
||
request: DiscoveryRequest,
|
||
llm: LLMService = Depends(get_llm_service)
|
||
):
|
||
"""
|
||
WP-24c: Analysiert Text auf potenzielle Kanten zu bestehendem Wissen.
|
||
Nutzt Vektor-Suche und DecisionEngine-Logik (WP-25b PROMPT-TRACE konform).
|
||
"""
|
||
start_time = time.time()
|
||
logger.info(f"🔍 [WP-24c] Discovery triggered for content: {request.content[:50]}...")
|
||
|
||
try:
|
||
# 1. Kandidaten-Suche via Retriever (Vektor-Match)
|
||
search_req = QueryRequest(
|
||
query=request.content,
|
||
top_k=request.top_k,
|
||
explain=True
|
||
)
|
||
candidates = await llm.decision_engine.retriever.search(search_req)
|
||
|
||
if not candidates.results:
|
||
logger.info("ℹ️ No candidates found for discovery.")
|
||
return []
|
||
|
||
# 2. KI-gestützte Beziehungs-Extraktion (WP-25b)
|
||
discovery_results = []
|
||
|
||
# Zugriff auf gültige Kanten-Typen aus der Registry
|
||
from app.services.edge_registry import registry as edge_reg
|
||
valid_types_str = ", ".join(list(edge_reg.valid_types))
|
||
|
||
# Parallele Evaluierung der Kandidaten für maximale Performance
|
||
async def evaluate_candidate(hit: QueryHit) -> Optional[DiscoveryHit]:
|
||
if hit.total_score < request.min_confidence:
|
||
return None
|
||
|
||
try:
|
||
# Nutzt ingest_extractor Profil für präzise semantische Analyse
|
||
# Wir verwenden das prompt_key Pattern (edge_extraction) gemäß WP-24c Vorgabe
|
||
raw_suggestion = await llm.generate_raw_response(
|
||
prompt_key="edge_extraction",
|
||
variables={
|
||
"note_id": "NEUER_INHALT",
|
||
"text": f"PROXIMITY_TARGET: {hit.source.get('text', '')}\n\nNEW_CONTENT: {request.content}",
|
||
"valid_types": valid_types_str
|
||
},
|
||
profile_name="ingest_extractor",
|
||
priority="realtime"
|
||
)
|
||
|
||
# Parsing der LLM Antwort (Erwartet JSON Liste)
|
||
from app.core.ingestion.ingestion_utils import extract_json_from_response
|
||
suggestions = extract_json_from_response(raw_suggestion)
|
||
|
||
if isinstance(suggestions, list) and len(suggestions) > 0:
|
||
sugg = suggestions[0] # Wir nehmen den stärksten Vorschlag pro Hit
|
||
return DiscoveryHit(
|
||
target_note=hit.note_id,
|
||
target_title=hit.source.get("title") or hit.note_id,
|
||
suggested_edge_type=sugg.get("kind", "related_to"),
|
||
confidence_score=hit.total_score,
|
||
reasoning=f"Semantische Nähe ({int(hit.total_score*100)}%) entdeckt."
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ Discovery evaluation failed for hit {hit.note_id}: {e}")
|
||
return None
|
||
|
||
tasks = [evaluate_candidate(hit) for hit in candidates.results]
|
||
results = await asyncio.gather(*tasks)
|
||
|
||
# Zusammenführung und Duplikat-Bereinigung
|
||
seen_targets = set()
|
||
for r in results:
|
||
if r and r.target_note not in seen_targets:
|
||
discovery_results.append(r)
|
||
seen_targets.add(r.target_note)
|
||
|
||
duration = int((time.time() - start_time) * 1000)
|
||
logger.info(f"✨ Discovery finished: found {len(discovery_results)} edges in {duration}ms")
|
||
|
||
return discovery_results
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Discovery API failure: {e}", exc_info=True)
|
||
raise HTTPException(status_code=500, detail="Discovery-Prozess fehlgeschlagen.") |