Enhance decision_engine.py to support context reuse during compression failures. Implement error handling to return original content when compression fails, ensuring robust fallback mechanisms without re-retrieval. Update logging for better traceability of compression and fallback processes, improving overall reliability in stream handling.

This commit is contained in:
Lars 2026-01-11 19:14:15 +01:00
parent 3dc81ade0f
commit 716a063849

View File

@ -151,15 +151,21 @@ class DecisionEngine:
retrieval_results = await asyncio.gather(*retrieval_tasks, return_exceptions=True) retrieval_results = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
# Phase 2: Formatierung und optionale Kompression # Phase 2: Formatierung und optionale Kompression
# WP-24c v4.5.5: Context-Reuse - Sicherstellen, dass formatted_context auch bei Kompressions-Fehlern erhalten bleibt
final_stream_tasks = [] final_stream_tasks = []
formatted_contexts = {} # WP-24c v4.5.5: Persistenz für Fallback-Zugriff
for name, res in zip(active_streams, retrieval_results): for name, res in zip(active_streams, retrieval_results):
if isinstance(res, Exception): if isinstance(res, Exception):
logger.error(f"Stream '{name}' failed during retrieval: {res}") logger.error(f"Stream '{name}' failed during retrieval: {res}")
async def _err(): return f"[Fehler im Wissens-Stream {name}]" error_msg = f"[Fehler im Wissens-Stream {name}]"
formatted_contexts[name] = error_msg
async def _err(msg=error_msg): return msg
final_stream_tasks.append(_err()) final_stream_tasks.append(_err())
continue continue
formatted_context = self._format_stream_context(res) formatted_context = self._format_stream_context(res)
formatted_contexts[name] = formatted_context # WP-24c v4.5.5: Persistenz für Fallback
# WP-25a: Kompressions-Check (Inhaltsverdichtung) # WP-25a: Kompressions-Check (Inhaltsverdichtung)
stream_cfg = library.get(name, {}) stream_cfg = library.get(name, {})
@ -168,6 +174,7 @@ class DecisionEngine:
if len(formatted_context) > threshold: if len(formatted_context) > threshold:
logger.info(f"⚙️ [WP-25b] Triggering Lazy-Compression for stream '{name}'...") logger.info(f"⚙️ [WP-25b] Triggering Lazy-Compression for stream '{name}'...")
comp_profile = stream_cfg.get("compression_profile") comp_profile = stream_cfg.get("compression_profile")
# WP-24c v4.5.5: Kompression mit Context-Reuse - bei Fehler wird formatted_context zurückgegeben
final_stream_tasks.append( final_stream_tasks.append(
self._compress_stream_content(name, formatted_context, query, comp_profile) self._compress_stream_content(name, formatted_context, query, comp_profile)
) )
@ -176,12 +183,31 @@ class DecisionEngine:
final_stream_tasks.append(_direct()) final_stream_tasks.append(_direct())
# Finale Inhalte parallel fertigstellen # Finale Inhalte parallel fertigstellen
final_contents = await asyncio.gather(*final_stream_tasks) # WP-24c v4.5.5: Bei Kompressions-Fehlern wird der Original-Content zurückgegeben (siehe _compress_stream_content)
return dict(zip(active_streams, final_contents)) final_contents = await asyncio.gather(*final_stream_tasks, return_exceptions=True)
# WP-24c v4.5.5: Exception-Handling für finale Inhalte - verwende Original-Content bei Fehlern
final_results = {}
for name, content in zip(active_streams, final_contents):
if isinstance(content, Exception):
logger.warning(f"⚠️ [CONTEXT-REUSE] Stream '{name}' Fehler in finaler Verarbeitung: {content}. Verwende Original-Context.")
final_results[name] = formatted_contexts.get(name, f"[Fehler im Stream {name}]")
else:
final_results[name] = content
logger.debug(f"📊 [STREAMS] Finale Stream-Ergebnisse: {[(k, len(v)) for k, v in final_results.items()]}")
return final_results
async def _compress_stream_content(self, stream_name: str, content: str, query: str, profile: Optional[str]) -> str: async def _compress_stream_content(self, stream_name: str, content: str, query: str, profile: Optional[str]) -> str:
"""WP-25b: Inhaltsverdichtung via Lazy-Loading 'compression_template'.""" """
WP-25b: Inhaltsverdichtung via Lazy-Loading 'compression_template'.
WP-24c v4.5.5: Context-Reuse - Bei Fehlern wird der Original-Content zurückgegeben,
um Re-Retrieval zu vermeiden.
"""
try: try:
# WP-24c v4.5.5: Logging für LLM-Trace im Kompressions-Modus
logger.debug(f"🔧 [COMPRESSION] Starte Kompression für Stream '{stream_name}' (Content-Länge: {len(content)})")
summary = await self.llm_service.generate_raw_response( summary = await self.llm_service.generate_raw_response(
prompt_key="compression_template", prompt_key="compression_template",
variables={ variables={
@ -193,9 +219,19 @@ class DecisionEngine:
priority="background", priority="background",
max_retries=1 max_retries=1
) )
return summary.strip() if (summary and len(summary.strip()) > 10) else content
# WP-24c v4.5.5: Validierung des Kompressions-Ergebnisses
if summary and len(summary.strip()) > 10:
logger.debug(f"✅ [COMPRESSION] Kompression erfolgreich für '{stream_name}' (Original: {len(content)}, Komprimiert: {len(summary)})")
return summary.strip()
else:
logger.warning(f"⚠️ [COMPRESSION] Kompressions-Ergebnis zu kurz für '{stream_name}', verwende Original-Content")
return content
except Exception as e: except Exception as e:
logger.error(f"❌ Compression of {stream_name} failed: {e}") # WP-24c v4.5.5: Context-Reuse - Bei Fehlern Original-Content zurückgeben (kein Re-Retrieval)
logger.error(f"❌ [COMPRESSION] Kompression von '{stream_name}' fehlgeschlagen: {e}")
logger.info(f"🔄 [CONTEXT-REUSE] Verwende Original-Content für '{stream_name}' (Länge: {len(content)}) - KEIN Re-Retrieval")
return content return content
async def _run_single_stream(self, name: str, cfg: Dict, query: str) -> QueryResponse: async def _run_single_stream(self, name: str, cfg: Dict, query: str) -> QueryResponse:
@ -289,19 +325,42 @@ class DecisionEngine:
except Exception as e: except Exception as e:
logger.error(f"Final Synthesis failed: {e}") logger.error(f"Final Synthesis failed: {e}")
# ROBUST FALLBACK (v1.2.1 Gate): Versuche eine minimale Antwort zu generieren # WP-24c v4.5.5: ROBUST FALLBACK mit Context-Reuse
# WP-25b FIX: Konsistente Nutzung von prompt_key statt hardcodiertem Prompt # WICHTIG: stream_results werden Wiederverwendet - KEIN Re-Retrieval
logger.info(f"🔄 [FALLBACK] Verwende vorhandene stream_results (KEIN Re-Retrieval)")
logger.debug(f" -> Verfügbare Streams: {list(stream_results.keys())}")
logger.debug(f" -> Stream-Längen: {[(k, len(v)) for k, v in stream_results.items()]}")
# WP-24c v4.5.5: Context-Reuse - Nutze vorhandene stream_results
fallback_context = "\n\n".join([v for v in stream_results.values() if len(v) > 20]) fallback_context = "\n\n".join([v for v in stream_results.values() if len(v) > 20])
if not fallback_context or len(fallback_context.strip()) < 20:
logger.warning(f"⚠️ [FALLBACK] Fallback-Context zu kurz ({len(fallback_context)} Zeichen). Stream-Ergebnisse möglicherweise leer.")
return f"Entschuldigung, ich konnte keine relevanten Informationen zu Ihrer Anfrage finden. (Fehler: {str(e)})"
try: try:
return await self.llm_service.generate_raw_response( # WP-24c v4.5.5: Fallback-Synthese mit LLM-Trace-Logging
logger.info(f"🔄 [FALLBACK] Starte Fallback-Synthese mit vorhandenem Context (Länge: {len(fallback_context)})")
logger.debug(f" -> Fallback-Profile: {profile}, Template: fallback_synthesis")
result = await self.llm_service.generate_raw_response(
prompt_key="fallback_synthesis", prompt_key="fallback_synthesis",
variables={"query": query, "context": fallback_context}, variables={"query": query, "context": fallback_context},
system=system_prompt, priority="realtime", profile_name=profile system=system_prompt, priority="realtime", profile_name=profile
) )
except (ValueError, KeyError):
logger.info(f"✅ [FALLBACK] Fallback-Synthese erfolgreich (Antwort-Länge: {len(result) if result else 0})")
return result
except (ValueError, KeyError) as template_error:
# Fallback auf direkten Prompt, falls Template nicht existiert # Fallback auf direkten Prompt, falls Template nicht existiert
logger.warning("⚠️ Fallback template 'fallback_synthesis' not found. Using direct prompt.") logger.warning(f"⚠️ [FALLBACK] Template 'fallback_synthesis' nicht gefunden: {template_error}. Verwende direkten Prompt.")
return await self.llm_service.generate_raw_response( logger.debug(f" -> Direkter Prompt mit Context-Länge: {len(fallback_context)}")
result = await self.llm_service.generate_raw_response(
prompt=f"Beantworte: {query}\n\nKontext:\n{fallback_context}", prompt=f"Beantworte: {query}\n\nKontext:\n{fallback_context}",
system=system_prompt, priority="realtime", profile_name=profile system=system_prompt, priority="realtime", profile_name=profile
) )
logger.info(f"✅ [FALLBACK] Direkter Prompt erfolgreich (Antwort-Länge: {len(result) if result else 0})")
return result