mindnet/app/routers/chat.py

362 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/routers/chat.py
DESCRIPTION: Haupt-Chat-Interface (WP-25b Edition).
Kombiniert die spezialisierte Interview-Logik mit der neuen
Lazy-Prompt-Orchestration und MoE-Synthese.
WP-24c: Integration der Discovery API für proaktive Vernetzung.
VERSION: 3.1.0 (WP-24c: Discovery API Integration)
STATUS: Active
FIX:
- WP-24c: Neuer Endpunkt /query/discover für proaktive Kanten-Vorschläge.
- WP-25b: Umstellung des Interview-Modus auf Lazy-Prompt (prompt_key + variables).
- WP-25b: Delegation der RAG-Phase an die Engine v1.3.0 für konsistente MoE-Steuerung.
- WP-25a: Voller Erhalt der v3.0.2 Logik (Interview, Schema-Resolution, FastPaths).
"""
from fastapi import APIRouter, HTTPException, Depends
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
import time
import uuid
import logging
import yaml
import os
import asyncio
from pathlib import Path
from app.config import get_settings
from app.models.dto import ChatRequest, ChatResponse, QueryHit, QueryRequest
from app.services.llm_service import LLMService
from app.services.feedback_service import log_search
router = APIRouter()
logger = logging.getLogger(__name__)
# --- EBENE 0: DTOs FÜR DISCOVERY (WP-24c) ---
class DiscoveryRequest(BaseModel):
content: str
top_k: int = 8
min_confidence: float = 0.6
class DiscoveryHit(BaseModel):
target_note: str # Note ID
target_title: str # Menschenlesbarer Titel
suggested_edge_type: str # Kanonischer Typ aus edge_vocabulary
confidence_score: float # Kombinierter Vektor- + KI-Score
reasoning: str # Kurze Begründung der KI
# --- EBENE 1: CONFIG LOADER & CACHING (WP-25 Standard) ---
_DECISION_CONFIG_CACHE = None
_TYPES_CONFIG_CACHE = None
def _load_decision_config() -> Dict[str, Any]:
"""Lädt die Strategie-Konfiguration."""
settings = get_settings()
path = Path(settings.DECISION_CONFIG_PATH)
try:
if path.exists():
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"Failed to load decision config: {e}")
return {"strategies": {}}
def _load_types_config() -> Dict[str, Any]:
"""Lädt die types.yaml für die Typerkennung."""
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
try:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"Failed to load types config: {e}")
return {}
def get_full_config() -> Dict[str, Any]:
global _DECISION_CONFIG_CACHE
if _DECISION_CONFIG_CACHE is None:
_DECISION_CONFIG_CACHE = _load_decision_config()
return _DECISION_CONFIG_CACHE
def get_types_config() -> Dict[str, Any]:
global _TYPES_CONFIG_CACHE
if _TYPES_CONFIG_CACHE is None:
_TYPES_CONFIG_CACHE = _load_types_config()
return _TYPES_CONFIG_CACHE
def get_decision_strategy(intent: str) -> Dict[str, Any]:
config = get_full_config()
strategies = config.get("strategies", {})
return strategies.get(intent, strategies.get("FACT_WHAT", {}))
# --- EBENE 2: SPEZIAL-LOGIK (INTERVIEW & DETECTION) ---
def _detect_target_type(message: str, configured_schemas: Dict[str, Any]) -> str:
"""WP-07: Identifiziert den gewünschten Notiz-Typ (Keyword-basiert)."""
message_lower = message.lower()
types_cfg = get_types_config()
types_def = types_cfg.get("types", {})
# 1. Check types.yaml detection_keywords
for type_name, type_data in types_def.items():
keywords = type_data.get("detection_keywords", [])
for kw in keywords:
if kw.lower() in message_lower:
return type_name
# 2. Direkter Match mit Schema-Keys
for type_key in configured_schemas.keys():
if type_key == "default": continue
if type_key in message_lower:
return type_key
# 3. Synonym-Mapping (Legacy)
synonyms = {
"projekt": "project", "entscheidung": "decision", "ziel": "goal",
"erfahrung": "experience", "wert": "value", "prinzip": "principle"
}
for term, schema_key in synonyms.items():
if term in message_lower:
return schema_key
return "default"
def _is_question(query: str) -> bool:
"""Prüft, ob der Input eine Frage ist."""
q = query.strip().lower()
if "?" in q: return True
starters = ["wer", "wie", "was", "wo", "wann", "warum", "weshalb", "wozu", "welche", "bist du"]
return any(q.startswith(s + " ") for s in starters)
async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
"""Hybrid Router: Keyword-Fast-Paths & DecisionEngine LLM Router."""
config = get_full_config()
strategies = config.get("strategies", {})
query_lower = query.lower()
# 1. FAST PATH: Keyword Trigger
for intent_name, strategy in strategies.items():
keywords = strategy.get("trigger_keywords", [])
for k in keywords:
if k.lower() in query_lower:
return intent_name, "Keyword (FastPath)"
# 2. FAST PATH B: Type Keywords -> INTERVIEW
if not _is_question(query_lower):
types_cfg = get_types_config()
for type_name, type_data in types_cfg.get("types", {}).items():
for kw in type_data.get("detection_keywords", []):
if kw.lower() in query_lower:
return "INTERVIEW", "Keyword (Interview)"
# 3. SLOW PATH: DecisionEngine LLM Router (MoE-gesteuert)
intent = await llm.decision_engine._determine_strategy(query)
return intent, "DecisionEngine (LLM)"
# --- EBENE 3: RETRIEVAL AGGREGATION ---
def _collect_all_hits(stream_responses: Dict[str, Any]) -> List[QueryHit]:
"""Sammelt deduplizierte Treffer aus allen Streams für das Tracing."""
all_hits = []
seen_node_ids = set()
for _, response in stream_responses.items():
# Sammeln der Hits aus den QueryResponse Objekten
if hasattr(response, 'results'):
for hit in response.results:
if hit.node_id not in seen_node_ids:
all_hits.append(hit)
seen_node_ids.add(hit.node_id)
return sorted(all_hits, key=lambda h: h.total_score, reverse=True)
# --- EBENE 4: ENDPUNKTE ---
def get_llm_service():
return LLMService()
@router.post("/", response_model=ChatResponse)
async def chat_endpoint(
request: ChatRequest,
llm: LLMService = Depends(get_llm_service)
):
start_time = time.time()
query_id = str(uuid.uuid4())
logger.info(f"🚀 [WP-25b] Chat request [{query_id}]: {request.message[:50]}...")
try:
# 1. Intent Detection
intent, intent_source = await _classify_intent(request.message, llm)
logger.info(f"[{query_id}] Intent: {intent} via {intent_source}")
strategy = get_decision_strategy(intent)
engine = llm.decision_engine
sources_hits = []
answer_text = ""
# 2. INTERVIEW MODE (WP-25b Lazy-Prompt Logik)
if intent == "INTERVIEW":
target_type = _detect_target_type(request.message, strategy.get("schemas", {}))
types_cfg = get_types_config()
type_def = types_cfg.get("types", {}).get(target_type, {})
fields_list = type_def.get("schema", [])
# WP-07: Restaurierte Fallback Logik
if not fields_list:
configured_schemas = strategy.get("schemas", {})
fallback = configured_schemas.get(target_type, configured_schemas.get("default", {}))
fields_list = fallback.get("fields", []) if isinstance(fallback, dict) else (fallback or [])
fields_str = "\n- " + "\n- ".join(fields_list)
template_key = strategy.get("prompt_template", "interview_template")
# WP-25b: Lazy Loading Call
answer_text = await llm.generate_raw_response(
prompt_key=template_key,
variables={
"query": request.message,
"target_type": target_type,
"schema_fields": fields_str
},
system=llm.get_prompt("system_prompt"),
priority="realtime",
profile_name="compression_fast",
max_retries=0
)
sources_hits = []
# 3. RAG MODE (WP-25b Delegation an Engine v1.3.0)
else:
# Phase A & B: Retrieval & Kompression (Delegiert an Engine v1.3.0)
formatted_context_map = await engine._execute_parallel_streams(strategy, request.message)
# Erfassung der Quellen für das Tracing
raw_stream_map = {}
stream_keys = strategy.get("use_streams", [])
library = engine.config.get("streams_library", {})
retrieval_tasks = []
active_streams = []
for key in stream_keys:
if key in library:
active_streams.append(key)
retrieval_tasks.append(engine._run_single_stream(key, library[key], request.message))
responses = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
for name, res in zip(active_streams, responses):
if not isinstance(res, Exception):
raw_stream_map[name] = res
sources_hits = _collect_all_hits(raw_stream_map)
# Phase C: Finale MoE Synthese (Delegiert an Engine v1.3.0)
answer_text = await engine._generate_final_answer(
intent, strategy, request.message, formatted_context_map
)
duration_ms = int((time.time() - start_time) * 1000)
# Logging (WP-15)
try:
log_search(
query_id=query_id, query_text=request.message, results=sources_hits,
mode=f"wp25b_{intent.lower()}", metadata={"strategy": intent, "source": intent_source}
)
except: pass
return ChatResponse(
query_id=query_id, answer=answer_text, sources=sources_hits,
latency_ms=duration_ms, intent=intent, intent_source=intent_source
)
except Exception as e:
logger.error(f"❌ Chat Endpoint Failure: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.")
@router.post("/query/discover", response_model=List[DiscoveryHit])
async def discover_edges(
request: DiscoveryRequest,
llm: LLMService = Depends(get_llm_service)
):
"""
WP-24c: Analysiert Text auf potenzielle Kanten zu bestehendem Wissen.
Nutzt Vektor-Suche und DecisionEngine-Logik (WP-25b PROMPT-TRACE konform).
"""
start_time = time.time()
logger.info(f"🔍 [WP-24c] Discovery triggered for content: {request.content[:50]}...")
try:
# 1. Kandidaten-Suche via Retriever (Vektor-Match)
search_req = QueryRequest(
query=request.content,
top_k=request.top_k,
explain=True
)
candidates = await llm.decision_engine.retriever.search(search_req)
if not candidates.results:
logger.info(" No candidates found for discovery.")
return []
# 2. KI-gestützte Beziehungs-Extraktion (WP-25b)
discovery_results = []
# Zugriff auf gültige Kanten-Typen aus der Registry
from app.services.edge_registry import registry as edge_reg
valid_types_str = ", ".join(list(edge_reg.valid_types))
# Parallele Evaluierung der Kandidaten für maximale Performance
async def evaluate_candidate(hit: QueryHit) -> Optional[DiscoveryHit]:
if hit.total_score < request.min_confidence:
return None
try:
# Nutzt ingest_extractor Profil für präzise semantische Analyse
# Wir verwenden das prompt_key Pattern (edge_extraction) gemäß WP-24c Vorgabe
raw_suggestion = await llm.generate_raw_response(
prompt_key="edge_extraction",
variables={
"note_id": "NEUER_INHALT",
"text": f"PROXIMITY_TARGET: {hit.source.get('text', '')}\n\nNEW_CONTENT: {request.content}",
"valid_types": valid_types_str
},
profile_name="ingest_extractor",
priority="realtime"
)
# Parsing der LLM Antwort (Erwartet JSON Liste)
from app.core.ingestion.ingestion_utils import extract_json_from_response
suggestions = extract_json_from_response(raw_suggestion)
if isinstance(suggestions, list) and len(suggestions) > 0:
sugg = suggestions[0] # Wir nehmen den stärksten Vorschlag pro Hit
return DiscoveryHit(
target_note=hit.note_id,
target_title=hit.source.get("title") or hit.note_id,
suggested_edge_type=sugg.get("kind", "related_to"),
confidence_score=hit.total_score,
reasoning=f"Semantische Nähe ({int(hit.total_score*100)}%) entdeckt."
)
except Exception as e:
logger.warning(f"⚠️ Discovery evaluation failed for hit {hit.note_id}: {e}")
return None
tasks = [evaluate_candidate(hit) for hit in candidates.results]
results = await asyncio.gather(*tasks)
# Zusammenführung und Duplikat-Bereinigung
seen_targets = set()
for r in results:
if r and r.target_note not in seen_targets:
discovery_results.append(r)
seen_targets.add(r.target_note)
duration = int((time.time() - start_time) * 1000)
logger.info(f"✨ Discovery finished: found {len(discovery_results)} edges in {duration}ms")
return discovery_results
except Exception as e:
logger.error(f"❌ Discovery API failure: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Discovery-Prozess fehlgeschlagen.")