199 lines
7.8 KiB
Python
199 lines
7.8 KiB
Python
"""
|
||
FILE: app/services/semantic_analyzer.py
|
||
DESCRIPTION: KI-gestützte Kanten-Validierung. Nutzt LLM (Background-Priority), um Kanten präzise einem Chunk zuzuordnen.
|
||
WP-20 Fix: Volle Kompatibilität mit der provider-basierten Routing-Logik (OpenRouter Primary).
|
||
WP-22: Integration von valid_types zur Halluzinations-Vermeidung.
|
||
FIX: Mistral-sicheres JSON-Parsing (<s> & [OUT] Handling) und 100% Logik-Erhalt.
|
||
VERSION: 2.2.6
|
||
STATUS: Active
|
||
DEPENDENCIES: app.services.llm_service, app.services.edge_registry, json, logging, re
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
from typing import List, Optional, Any
|
||
from dataclasses import dataclass
|
||
|
||
# Importe
|
||
from app.services.llm_service import LLMService
|
||
# WP-22: Registry für Vokabular-Erzwingung
|
||
from app.services.edge_registry import registry as edge_registry
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class SemanticAnalyzer:
|
||
def __init__(self):
|
||
self.llm = LLMService()
|
||
|
||
def _is_valid_edge_string(self, edge_str: str) -> bool:
|
||
"""
|
||
Prüft, ob ein String eine valide Kante im Format 'kind:target' ist.
|
||
Verhindert, dass LLM-Geschwätz als Kante durchrutscht.
|
||
"""
|
||
if not isinstance(edge_str, str) or ":" not in edge_str:
|
||
return False
|
||
|
||
parts = edge_str.split(":", 1)
|
||
kind = parts[0].strip()
|
||
target = parts[1].strip()
|
||
|
||
# Regel 1: Ein 'kind' (Beziehungstyp) darf keine Leerzeichen enthalten.
|
||
if " " in kind:
|
||
return False
|
||
|
||
# Regel 2: Plausible Länge für den Typ (Vermeidet Sätze als Typ)
|
||
if len(kind) > 40 or len(kind) < 2:
|
||
return False
|
||
|
||
# Regel 3: Target darf nicht leer sein
|
||
if not target:
|
||
return False
|
||
|
||
return True
|
||
|
||
def _extract_json_safely(self, text: str) -> Any:
|
||
"""
|
||
Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (Mistral/Llama).
|
||
Implementiert robuste Recovery-Logik für Cloud-Provider.
|
||
"""
|
||
if not text:
|
||
return []
|
||
|
||
# 1. Entferne Mistral/Llama Steuerzeichen und Tags
|
||
clean = text.replace("<s>", "").replace("</s>", "")
|
||
clean = clean.replace("[OUT]", "").replace("[/OUT]", "")
|
||
clean = clean.strip()
|
||
|
||
# 2. Suche nach Markdown JSON-Blöcken
|
||
match = re.search(r"```(?:json)?\s*(.*?)\s*```", clean, re.DOTALL)
|
||
payload = match.group(1) if match else clean
|
||
|
||
try:
|
||
return json.loads(payload.strip())
|
||
except json.JSONDecodeError:
|
||
# 3. Recovery: Suche nach der ersten [ und letzten ]
|
||
start = payload.find('[')
|
||
end = payload.rfind(']') + 1
|
||
if start != -1 and end > start:
|
||
try:
|
||
return json.loads(payload[start:end])
|
||
except: pass
|
||
|
||
# 4. Zweite Recovery: Suche nach der ersten { und letzten }
|
||
start_obj = payload.find('{')
|
||
end_obj = payload.rfind('}') + 1
|
||
if start_obj != -1 and end_obj > start_obj:
|
||
try:
|
||
return json.loads(payload[start_obj:end_obj])
|
||
except: pass
|
||
return []
|
||
|
||
async def assign_edges_to_chunk(self, chunk_text: str, all_edges: List[str], note_type: str) -> List[str]:
|
||
"""
|
||
Sendet einen Chunk und eine Liste potenzieller Kanten an das LLM.
|
||
Das LLM filtert heraus, welche Kanten für diesen Chunk relevant sind.
|
||
WP-20: Nutzt primär den konfigurierten Provider (z.B. OpenRouter).
|
||
"""
|
||
if not all_edges:
|
||
return []
|
||
|
||
# 1. Bestimmung des Providers und Modells (Dynamisch über Settings)
|
||
provider = self.llm.settings.MINDNET_LLM_PROVIDER
|
||
model = self.llm.settings.OPENROUTER_MODEL if provider == "openrouter" else self.llm.settings.GEMINI_MODEL
|
||
|
||
# 2. Prompt laden (Provider-spezifisch via get_prompt)
|
||
prompt_template = self.llm.get_prompt("edge_allocation_template", provider)
|
||
|
||
if not prompt_template or not isinstance(prompt_template, str):
|
||
logger.warning("⚠️ [SemanticAnalyzer] Prompt 'edge_allocation_template' ungültig. Nutze Recovery-Template.")
|
||
prompt_template = (
|
||
"TASK: Wähle aus den Kandidaten die relevanten Kanten für den Text.\n"
|
||
"TEXT: {chunk_text}\n"
|
||
"KANDIDATEN: {edge_list}\n"
|
||
"OUTPUT: JSON Liste von Strings [\"kind:target\"]."
|
||
)
|
||
|
||
# 3. Daten für Template vorbereiten (Vokabular-Check)
|
||
edge_registry.ensure_latest()
|
||
valid_types_str = ", ".join(sorted(list(edge_registry.valid_types)))
|
||
edges_str = "\n".join([f"- {e}" for e in all_edges])
|
||
|
||
logger.debug(f"🔍 [SemanticAnalyzer] Request: {len(chunk_text)} chars Text, {len(all_edges)} Candidates.")
|
||
|
||
# 4. Prompt füllen mit Format-Check (Kein Shortcut)
|
||
try:
|
||
# Wir begrenzen den Text auf eine vernünftige Länge für das Kontextfenster
|
||
final_prompt = prompt_template.format(
|
||
chunk_text=chunk_text[:6000],
|
||
edge_list=edges_str,
|
||
valid_types=valid_types_str
|
||
)
|
||
except Exception as format_err:
|
||
logger.error(f"❌ [SemanticAnalyzer] Prompt Formatting failed: {format_err}")
|
||
return []
|
||
|
||
try:
|
||
# 5. LLM Call mit Background Priority & Semaphore Control
|
||
response_json = await self.llm.generate_raw_response(
|
||
prompt=final_prompt,
|
||
force_json=True,
|
||
max_retries=3,
|
||
base_delay=2.0,
|
||
priority="background",
|
||
provider=provider,
|
||
model_override=model
|
||
)
|
||
|
||
# 6. Mistral-sicheres JSON Parsing via Helper
|
||
data = self._extract_json_safely(response_json)
|
||
|
||
if not data:
|
||
return []
|
||
|
||
# 7. Robuste Normalisierung (List vs Dict Recovery)
|
||
raw_candidates = []
|
||
if isinstance(data, list):
|
||
raw_candidates = data
|
||
elif isinstance(data, dict):
|
||
logger.info(f"ℹ️ [SemanticAnalyzer] LLM returned dict, trying recovery.")
|
||
for key in ["edges", "results", "kanten", "matches"]:
|
||
if key in data and isinstance(data[key], list):
|
||
raw_candidates.extend(data[key])
|
||
break
|
||
# Falls immer noch leer, nutze Schlüssel-Wert Paare als Behelf
|
||
if not raw_candidates:
|
||
for k, v in data.items():
|
||
if isinstance(v, str): raw_candidates.append(f"{k}:{v}")
|
||
elif isinstance(v, list):
|
||
for target in v:
|
||
if isinstance(target, str): raw_candidates.append(f"{k}:{target}")
|
||
|
||
# 8. Strikte Validierung gegen Kanten-Format
|
||
valid_edges = []
|
||
for e in raw_candidates:
|
||
e_str = str(e).strip()
|
||
if self._is_valid_edge_string(e_str):
|
||
valid_edges.append(e_str)
|
||
else:
|
||
logger.debug(f" [SemanticAnalyzer] Rejected invalid edge format: '{e_str}'")
|
||
|
||
if valid_edges:
|
||
logger.info(f"✅ [SemanticAnalyzer] Assigned {len(valid_edges)} edges to chunk.")
|
||
return valid_edges
|
||
|
||
except Exception as e:
|
||
logger.error(f"💥 [SemanticAnalyzer] Critical error during analysis: {e}", exc_info=True)
|
||
return []
|
||
|
||
async def close(self):
|
||
if self.llm:
|
||
await self.llm.close()
|
||
|
||
# Singleton Instanziierung
|
||
_analyzer_instance = None
|
||
def get_semantic_analyzer():
|
||
global _analyzer_instance
|
||
if _analyzer_instance is None:
|
||
_analyzer_instance = SemanticAnalyzer()
|
||
return _analyzer_instance |