139 lines
5.7 KiB
Python
139 lines
5.7 KiB
Python
"""
|
||
app/services/semantic_analyzer.py — Edge Validation & Filtering
|
||
Version: 1.4 (Merged: Retry Strategy + Extended Observability)
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from typing import List, Optional
|
||
from dataclasses import dataclass
|
||
|
||
# Importe
|
||
from app.services.llm_service import LLMService
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class SemanticAnalyzer:
|
||
def __init__(self):
|
||
self.llm = LLMService()
|
||
|
||
async def assign_edges_to_chunk(self, chunk_text: str, all_edges: List[str], note_type: str) -> List[str]:
|
||
"""
|
||
Sendet einen Chunk und eine Liste potenzieller Kanten an das LLM.
|
||
Das LLM filtert heraus, welche Kanten für diesen Chunk relevant sind.
|
||
|
||
Features:
|
||
- Retry Strategy: Wartet bei Überlastung (max_retries=5).
|
||
- Observability: Loggt Input-Größe, Raw-Response und Parsing-Details.
|
||
"""
|
||
if not all_edges:
|
||
return []
|
||
|
||
# 1. Prompt laden
|
||
prompt_template = self.llm.prompts.get("edge_allocation_template")
|
||
|
||
if not prompt_template:
|
||
logger.warning("⚠️ [SemanticAnalyzer] Prompt 'edge_allocation_template' fehlt. Nutze Fallback.")
|
||
prompt_template = (
|
||
"TASK: Wähle aus den Kandidaten die relevanten Kanten für den Text.\n"
|
||
"TEXT: {chunk_text}\n"
|
||
"KANDIDATEN: {edge_list}\n"
|
||
"OUTPUT: JSON Liste von Strings [\"kind:target\"]."
|
||
)
|
||
|
||
# 2. Kandidaten-Liste formatieren
|
||
edges_str = "\n".join([f"- {e}" for e in all_edges])
|
||
|
||
# LOG: Request Info (Wichtig für Debugging)
|
||
logger.debug(f"🔍 [SemanticAnalyzer] Request: {len(chunk_text)} chars Text, {len(all_edges)} Candidates.")
|
||
|
||
# 3. Prompt füllen
|
||
final_prompt = prompt_template.format(
|
||
chunk_text=chunk_text[:3500], # Etwas mehr Kontext als früher (3000 -> 3500)
|
||
edge_list=edges_str
|
||
)
|
||
|
||
try:
|
||
# 4. LLM Call mit JSON Erzwingung UND Retry-Logik (Merged V1.3)
|
||
# max_retries=5 bedeutet: 5s -> 10s -> 20s -> 40s -> 80s Pause.
|
||
response_json = await self.llm.generate_raw_response(
|
||
prompt=final_prompt,
|
||
force_json=True,
|
||
max_retries=5,
|
||
base_delay=5.0
|
||
)
|
||
|
||
# LOG: Raw Response Preview (Wichtig um zu sehen, was das LLM liefert)
|
||
# Zeigt nur die ersten 200 Zeichen, um Log nicht zu fluten
|
||
logger.debug(f"📥 [SemanticAnalyzer] Raw Response (Preview): {response_json[:200]}...")
|
||
|
||
# 5. Parsing & Cleaning
|
||
clean_json = response_json.replace("```json", "").replace("```", "").strip()
|
||
|
||
if not clean_json:
|
||
logger.warning("⚠️ [SemanticAnalyzer] Leere Antwort vom LLM erhalten. Trigger Fallback.")
|
||
return []
|
||
|
||
try:
|
||
data = json.loads(clean_json)
|
||
except json.JSONDecodeError as json_err:
|
||
# LOG: Detaillierter Fehlerbericht für den User
|
||
logger.error(f"❌ [SemanticAnalyzer] JSON Decode Error.")
|
||
logger.error(f" Grund: {json_err}")
|
||
logger.error(f" Empfangener String: {clean_json[:500]}") # Zeige max 500 chars des Fehlers
|
||
logger.info(" -> Workaround: Fallback auf 'Alle Kanten' (durch Chunker).")
|
||
return []
|
||
|
||
valid_edges = []
|
||
|
||
# 6. Robuste Validierung (List vs Dict)
|
||
if isinstance(data, list):
|
||
# Standardfall: ["kind:target", ...]
|
||
valid_edges = [str(e) for e in data if isinstance(e, str) and ":" in e]
|
||
|
||
elif isinstance(data, dict):
|
||
# Abweichende Formate behandeln (Extended Logging V1.2)
|
||
logger.info(f"ℹ️ [SemanticAnalyzer] LLM lieferte Dict statt Liste. Versuche Reparatur. Keys: {list(data.keys())}")
|
||
|
||
for key, val in data.items():
|
||
# Fall A: {"edges": ["kind:target"]}
|
||
if key.lower() in ["edges", "results", "kanten", "matches"] and isinstance(val, list):
|
||
valid_edges.extend([str(e) for e in val if isinstance(e, str) and ":" in e])
|
||
|
||
# Fall B: {"kind": "target"} (Das beobachtete Format im Log)
|
||
elif isinstance(val, str):
|
||
valid_edges.append(f"{key}:{val}")
|
||
|
||
# Fall C: {"kind": ["target1", "target2"]}
|
||
elif isinstance(val, list):
|
||
for target in val:
|
||
if isinstance(target, str):
|
||
valid_edges.append(f"{key}:{target}")
|
||
|
||
# Safety: Filtere nur Kanten, die halbwegs valide aussehen
|
||
final_result = [e for e in valid_edges if ":" in e]
|
||
|
||
# LOG: Ergebnis
|
||
if final_result:
|
||
# Nur Info, wenn wirklich was gefunden wurde, sonst spammt es bei leeren Chunks
|
||
logger.info(f"✅ [SemanticAnalyzer] Success. {len(final_result)} Kanten zugewiesen.")
|
||
else:
|
||
logger.debug(" [SemanticAnalyzer] Keine spezifischen Kanten erkannt (Empty Result).")
|
||
|
||
return final_result
|
||
|
||
except Exception as e:
|
||
logger.error(f"💥 [SemanticAnalyzer] Kritischer Fehler: {e}", exc_info=True)
|
||
return []
|
||
|
||
async def close(self):
|
||
if self.llm:
|
||
await self.llm.close()
|
||
|
||
# Singleton Helper
|
||
_analyzer_instance = None
|
||
def get_semantic_analyzer():
|
||
global _analyzer_instance
|
||
if _analyzer_instance is None:
|
||
_analyzer_instance = SemanticAnalyzer()
|
||
return _analyzer_instance |