Update FastAPI application and related services for WP-25a: Enhance lifespan management with Mixture of Experts (MoE) integrity checks, improve logging and error handling in LLMService, and integrate profile-driven orchestration across components. Bump versions for main application, ingestion services, and LLM profiles to reflect new features and optimizations.

This commit is contained in:
Lars 2026-01-02 08:57:29 +01:00
parent 9a98093e70
commit 9b906bbabf
9 changed files with 408 additions and 195 deletions

View File

@ -1,11 +1,11 @@
"""
FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-14: Modularisierung der Datenbank-Ebene (app.core.database).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v2.13.12: Synchronisierung der Profil-Auflösung mit Registry-Defaults.
VERSION: 2.13.12
AUDIT v2.14.0: Synchronisierung der Profil-Auflösung mit MoE-Experten.
VERSION: 2.14.0 (WP-25a: MoE & Profile Support)
STATUS: Active
"""
import logging
@ -55,11 +55,15 @@ class IngestionService:
# Synchronisierung der Konfiguration mit dem Instanz-Präfix
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
self.dim = self.settings.VECTOR_SIZE
self.registry = load_type_registry()
self.embedder = EmbeddingsClient()
self.llm = LLMService()
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache
@ -155,24 +159,21 @@ class IngestionService:
edge_registry.ensure_latest()
# Profil-Auflösung via Registry
# FIX: Wir nutzen das Profil, das bereits in make_note_payload unter
# Berücksichtigung der types.yaml (Registry) ermittelt wurde.
profile = note_pl.get("chunk_profile", "sliding_standard")
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
# WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor.
# assemble_chunks führt intern auch die Propagierung durch.
chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg)
# Semantische Kanten-Validierung (Smart Edge Allocation)
# Semantische Kanten-Validierung (Smart Edge Allocation via MoE-Profil)
for ch in chunks:
filtered = []
for cand in getattr(ch, "candidate_pool", []):
# Nur global_pool Kandidaten (aus dem Pool am Ende) erfordern KI-Validierung
# WP-25a: Nutzt nun das spezialisierte Validierungs-Profil
if cand.get("provenance") == "global_pool" and enable_smart:
if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, self.settings.MINDNET_LLM_PROVIDER):
if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"):
filtered.append(cand)
else:
# Explizite Kanten (Wikilinks/Callouts) werden ungeprüft übernommen
@ -204,7 +205,6 @@ class IngestionService:
)
# 4. DB Upsert via modularisierter Points-Logik
# WICHTIG: Wenn sich der Inhalt geändert hat, löschen wir erst alle alten Fragmente.
if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id)

View File

@ -1,10 +1,16 @@
"""
FILE: app/core/ingestion/ingestion_validation.py
DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache.
AUDIT v2.12.3: Integration der zentralen Text-Bereinigung (WP-14).
WP-25a: Integration der Mixture of Experts (MoE) Profil-Steuerung.
VERSION: 2.13.0 (WP-25a: MoE & Profile Support)
STATUS: Active
FIX:
- Umstellung auf generate_raw_response mit profile_name="ingest_validator".
- Automatische Nutzung der Fallback-Kaskade (Cloud -> Lokal) via LLMService.
- Erhalt der sparsamen LLM-Nutzung (Validierung nur für Kandidaten-Pool).
"""
import logging
from typing import Dict, Any
from typing import Dict, Any, Optional
from app.core.parser import NoteContext
# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports
@ -17,11 +23,12 @@ async def validate_edge_candidate(
edge: Dict,
batch_cache: Dict[str, NoteContext],
llm_service: Any,
provider: str
provider: Optional[str] = None,
profile_name: str = "ingest_validator"
) -> bool:
"""
WP-15b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache.
Nutzt clean_llm_text zur Entfernung von Steuerzeichen vor der Auswertung.
Nutzt das MoE-Profil 'ingest_validator' für deterministische YES/NO Prüfungen.
"""
target_id = edge.get("to")
target_ctx = batch_cache.get(target_id)
@ -32,14 +39,16 @@ async def validate_edge_candidate(
target_ctx = batch_cache.get(base_id)
# Sicherheits-Fallback (Hard-Link Integrity)
# Explizite Wikilinks oder Callouts werden nicht durch das LLM verifiziert.
if not target_ctx:
logger.info(f" [VALIDATION SKIP] No context for '{target_id}' - allowing link.")
return True
# Prompt-Abruf (Nutzt Provider-String als Fallback-Key für die prompts.yaml)
template = llm_service.get_prompt("edge_validation", provider)
try:
logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}'...")
logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}' (Profile: {profile_name})...")
prompt = template.format(
chunk_text=chunk_text[:1500],
target_title=target_ctx.title,
@ -47,8 +56,13 @@ async def validate_edge_candidate(
edge_kind=edge.get("kind", "related_to")
)
# Die Antwort vom Service anfordern
raw_response = await llm_service.generate_raw_response(prompt, priority="background")
# WP-25a: Profilbasierter Aufruf (Delegiert Fallbacks an den Service)
# Nutzt ingest_validator (Cloud Mistral/Gemini -> Local Phi3:mini Kaskade)
raw_response = await llm_service.generate_raw_response(
prompt,
priority="background",
profile_name=profile_name
)
# WP-14 Fix: Zusätzliche Bereinigung zur Sicherstellung der Interpretierbarkeit
response = clean_llm_text(raw_response)
@ -62,6 +76,6 @@ async def validate_edge_candidate(
logger.info(f"🚫 [REJECTED] Relation to '{target_id}' irrelevant for this chunk.")
return is_valid
except Exception as e:
logger.warning(f"⚠️ Validation error for {target_id}: {e}")
logger.warning(f"⚠️ Validation error for {target_id} using {profile_name}: {e}")
# Im Zweifel (Timeout/Fehler) erlauben wir die Kante, um Datenverlust zu vermeiden
return True

View File

@ -3,13 +3,14 @@ FILE: app/core/retrieval/decision_engine.py
DESCRIPTION: Der Agentic Orchestrator für MindNet (WP-25a Edition).
Realisiert Multi-Stream Retrieval, Intent-basiertes Routing
und die neue Pre-Synthesis Kompression (Module A).
VERSION: 1.2.0 (WP-25a: Mixture of Experts Support)
VERSION: 1.2.1 (WP-25a: Profile-Driven Orchestration & Optimized Cascade)
STATUS: Active
FIX:
- WP-25a: Vollständige Integration der llm_profile-Steuerung für Synthese und Kompression.
- WP-25a: Implementierung der _compress_stream_content Logik zur Inhaltsverdichtung.
- WP-25a: Volle Integration der Profil-Kaskade (Delegation an LLMService v3.5.2).
- WP-25a: Dynamische Nutzung des 'router_profile' für die Intent-Erkennung.
- WP-25a: Parallelisierte Kompression überlanger Wissens-Streams.
- WP-25: Beibehaltung von Stream-Tracing und Pre-Initialization Robustness.
- COMPATIBILITY: Erhalt aller Methoden-Signaturen für den System-Merge.
- CLEANUP: Entfernung redundanter Fallback-Blocks (jetzt im LLMService).
"""
import asyncio
import logging
@ -77,16 +78,19 @@ class DecisionEngine:
return await self._generate_final_answer(strategy_key, strategy, query, stream_results)
async def _determine_strategy(self, query: str) -> str:
"""Nutzt den LLM-Router zur Wahl der Such-Strategie."""
prompt_key = self.config.get("settings", {}).get("router_prompt_key", "intent_router_v1")
router_profile = self.config.get("settings", {}).get("router_profile")
"""Nutzt den LLM-Router zur Wahl der Such-Strategie via router_profile."""
settings_cfg = self.config.get("settings", {})
prompt_key = settings_cfg.get("router_prompt_key", "intent_router_v1")
# WP-25a: Nutzt das spezialisierte Profil für das Routing
router_profile = settings_cfg.get("router_profile")
router_prompt_template = self.llm_service.get_prompt(prompt_key)
if not router_prompt_template:
return "FACT_WHAT"
full_prompt = router_prompt_template.format(query=query)
try:
# Der Router nutzt den Standard-Provider (auto)
# Der LLMService übernimmt hier über das Profil bereits die Fallback-Kaskade
response = await self.llm_service.generate_raw_response(
full_prompt, max_retries=1, priority="realtime", profile_name=router_profile
)
@ -128,7 +132,7 @@ class DecisionEngine:
# Formatierung der Hits in Text
formatted_context = self._format_stream_context(res)
# WP-25a: Kompressions-Check
# WP-25a: Kompressions-Check (Inhaltsverdichtung)
stream_cfg = library.get(name, {})
threshold = stream_cfg.get("compression_threshold", 4000)
@ -152,7 +156,6 @@ class DecisionEngine:
"""
WP-25a Module A: Inhaltsverdichtung via Experten-Modell.
"""
# Falls kein Profil definiert, nutzen wir das Default-Profil der Strategie
compression_prompt = (
f"Du bist ein Wissens-Analyst. Reduziere den folgenden Wissens-Stream '{stream_name}' "
f"auf die Informationen, die für die Beantwortung der Frage '{query}' absolut notwendig sind.\n\n"
@ -220,8 +223,6 @@ class DecisionEngine:
profile = strategy.get("llm_profile")
template_key = strategy.get("prompt_template", "rag_template")
# Hier nutzen wir noch den Provider-String für get_prompt (Kompatibilität zu prompts.yaml)
# Der llm_service löst das Profil erst bei generate_raw_response auf.
template = self.llm_service.get_prompt(template_key)
system_prompt = self.llm_service.get_prompt("system_prompt")
@ -238,25 +239,21 @@ class DecisionEngine:
if prepend:
final_prompt = f"{prepend}\n\n{final_prompt}"
# WP-25a: MoE Call
# WP-25a: MoE Call mit automatisierter Kaskade im LLMService
# (Frühere manuelle Fallback-Blocks wurden entfernt, da v3.5.2 dies intern löst)
response = await self.llm_service.generate_raw_response(
final_prompt, system=system_prompt, profile_name=profile, priority="realtime"
)
# Fallback bei leerer Antwort auf lokales Modell
if not response or len(response.strip()) < 5:
return await self.llm_service.generate_raw_response(
final_prompt, system=system_prompt, provider="ollama", priority="realtime"
)
return response
except KeyError as e:
logger.error(f"Template Variable mismatch in '{template_key}': Missing {e}")
fallback_context = "\n\n".join([v for v in stream_results.values() if v])
# WP-25a FIX: Nutzt auch im Fallback das Strategie-Profil für Konsistenz
return await self.llm_service.generate_raw_response(
f"Beantworte: {query}\n\nKontext:\n{fallback_context}",
system=system_prompt, priority="realtime"
system=system_prompt, priority="realtime", profile_name=profile
)
except Exception as e:
logger.error(f"Final Synthesis failed: {e}")

View File

@ -1,8 +1,9 @@
"""
FILE: app/main.py
DESCRIPTION: Bootstrap der FastAPI Anwendung für WP-25 (Agentic RAG).
DESCRIPTION: Bootstrap der FastAPI Anwendung für WP-25a (Agentic MoE).
Orchestriert Lifespan-Events, globale Fehlerbehandlung und Routing.
VERSION: 1.0.0 (WP-25 Release)
Prüft beim Start die Integrität der Mixture of Experts Konfiguration.
VERSION: 1.1.0 (WP-25a: MoE Integrity Check)
STATUS: Active
DEPENDENCIES: app.config, app.routers.*, app.services.llm_service
"""
@ -32,63 +33,74 @@ except Exception:
from .core.logging_setup import setup_logging
# Initialisierung noch VOR create_app()
# Initialisierung des Loggings noch VOR create_app()
setup_logging()
logger = logging.getLogger(__name__)
# --- WP-25: Lifespan Management ---
# --- WP-25a: Lifespan Management mit MoE Integritäts-Prüfung ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Verwaltet den Lebenszyklus der Anwendung.
Führt Startup-Prüfungen durch und bereinigt Ressourcen beim Shutdown.
Verwaltet den Lebenszyklus der Anwendung (Startup/Shutdown).
Verifiziert die Verfügbarkeit der MoE-Experten-Profile und Strategien.
"""
settings = get_settings()
logger.info("🚀 mindnet API: Starting up (WP-25 Agentic RAG Mode)...")
logger.info("🚀 mindnet API: Starting up (WP-25a MoE Mode)...")
# 1. Startup: Integritäts-Check der WP-25 Konfiguration
# Wir prüfen, ob die für die DecisionEngine kritischen Dateien vorhanden sind.
# 1. Startup: Integritäts-Check der MoE Konfiguration
# Wir prüfen die drei Säulen der Agentic-RAG Architektur.
decision_cfg = os.getenv("MINDNET_DECISION_CONFIG", "config/decision_engine.yaml")
profiles_cfg = getattr(settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml")
prompts_cfg = settings.PROMPTS_PATH
if not os.path.exists(decision_cfg):
logger.error(f"❌ CRITICAL: Decision Engine config missing at {decision_cfg}")
if not os.path.exists(prompts_cfg):
logger.error(f"❌ CRITICAL: Prompts config missing at {prompts_cfg}")
missing_files = []
if not os.path.exists(decision_cfg): missing_files.append(decision_cfg)
if not os.path.exists(profiles_cfg): missing_files.append(profiles_cfg)
if not os.path.exists(prompts_cfg): missing_files.append(prompts_cfg)
if missing_files:
logger.error(f"❌ CRITICAL: Missing MoE config files: {missing_files}")
else:
logger.info("✅ MoE Configuration files verified.")
yield
# 2. Shutdown: Ressourcen bereinigen
logger.info("🛑 mindnet API: Shutting down...")
llm = LLMService()
await llm.close()
logger.info("✨ Cleanup complete. Goodbye.")
try:
llm = LLMService()
await llm.close()
logger.info("✨ LLM resources cleaned up.")
except Exception as e:
logger.warning(f"⚠️ Error during LLMService cleanup: {e}")
logger.info("Goodbye.")
# --- App Factory ---
def create_app() -> FastAPI:
"""Initialisiert die FastAPI App mit WP-25 Erweiterungen."""
"""Initialisiert die FastAPI App mit WP-25a Erweiterungen."""
app = FastAPI(
title="mindnet API",
version="1.0.0", # WP-25 Milestone
version="1.1.0", # WP-25a Milestone
lifespan=lifespan,
description="Digital Twin Knowledge Engine mit Agentic Multi-Stream RAG."
description="Digital Twin Knowledge Engine mit Mixture of Experts Orchestration."
)
s = get_settings()
# --- Globale Fehlerbehandlung (WP-25 Resilienz) ---
# --- Globale Fehlerbehandlung (WP-25a Resilienz) ---
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Fängt unerwartete Fehler in der Multi-Stream Kette ab."""
"""Fängt unerwartete Fehler in der MoE-Prozesskette ab."""
logger.error(f"❌ Unhandled Engine Error: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"detail": "Ein interner Fehler ist aufgetreten. Die DecisionEngine konnte die Anfrage nicht finalisieren.",
"detail": "Ein interner Fehler ist in der MoE-Kette aufgetreten.",
"error_type": type(exc).__name__
}
)
@ -96,12 +108,13 @@ def create_app() -> FastAPI:
# Healthcheck
@app.get("/healthz")
def healthz():
"""Bietet Statusinformationen über die Engine und Datenbank-Verbindung."""
return {
"status": "ok",
"version": "1.0.0",
"version": "1.1.0",
"qdrant": s.QDRANT_URL,
"prefix": s.COLLECTION_PREFIX,
"agentic_mode": True
"moe_enabled": True
}
# Inkludieren der Router (100% Kompatibilität erhalten)
@ -109,7 +122,7 @@ def create_app() -> FastAPI:
app.include_router(graph_router, prefix="/graph", tags=["graph"])
app.include_router(tools_router, prefix="/tools", tags=["tools"])
app.include_router(feedback_router, prefix="/feedback", tags=["feedback"])
app.include_router(chat_router, prefix="/chat", tags=["chat"]) # Nutzt nun WP-25 DecisionEngine
app.include_router(chat_router, prefix="/chat", tags=["chat"]) # WP-25a Agentic Chat
app.include_router(ingest_router, prefix="/ingest", tags=["ingest"])
if admin_router:

View File

@ -3,14 +3,13 @@ FILE: app/routers/chat.py
DESCRIPTION: Haupt-Chat-Interface (WP-25a Agentic Edition).
Kombiniert die spezialisierte Interview-Logik und Keyword-Erkennung
mit der neuen MoE-Orchestrierung und Pre-Synthesis Kompression.
VERSION: 3.0.3 (WP-25a: MoE & Compression Support - Full Release)
VERSION: 3.0.4 (WP-25a: Optimized MoE & Cascade Delegation)
STATUS: Active
FIX:
- 100% Wiederherstellung der v3.0.2 Logik (Interview Fallbacks, Schema-Resolution).
- WP-25a: Integration der Stream-Kompression (Module A) in den RAG-Workflow.
- WP-25a: Unterstützung der llm_profiles für spezialisierte Synthese (Module B).
- Erhalt der Ollama Context-Throttling Parameter (WP-20) als finaler Schutz.
- Beibehaltung der No-Retry Logik (max_retries=0) für Chat-Stabilität.
- WP-25a: Delegation der Fallback-Kaskade an den LLMService (v3.5.2).
- WP-25a: Nutzung der zentralisierten Stream-Kompression der DecisionEngine (v1.2.1).
- WP-25a: Konsistente Nutzung von MoE-Profilen für Interview- und RAG-Modus.
- 100% Erhalt der v3.0.2 Logik (Interview, Schema-Resolution, FastPaths).
"""
from fastapi import APIRouter, HTTPException, Depends
@ -31,13 +30,13 @@ from app.services.feedback_service import log_search
router = APIRouter()
logger = logging.getLogger(__name__)
# --- EBENE 1: CONFIG LOADER & CACHING (Restauriert aus v3.0.2) ---
# --- EBENE 1: CONFIG LOADER & CACHING (WP-25 Standard) ---
_DECISION_CONFIG_CACHE = None
_TYPES_CONFIG_CACHE = None
def _load_decision_config() -> Dict[str, Any]:
"""Lädt die Strategie-Konfiguration (Kompatibilität zu WP-25)."""
"""Lädt die Strategie-Konfiguration."""
settings = get_settings()
path = Path(settings.DECISION_CONFIG_PATH)
try:
@ -49,7 +48,7 @@ def _load_decision_config() -> Dict[str, Any]:
return {"strategies": {}}
def _load_types_config() -> Dict[str, Any]:
"""Lädt die types.yaml für die Typerkennung im Interview-Modus."""
"""Lädt die types.yaml für die Typerkennung."""
path = os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
try:
if os.path.exists(path):
@ -109,7 +108,7 @@ def _detect_target_type(message: str, configured_schemas: Dict[str, Any]) -> str
return "default"
def _is_question(query: str) -> bool:
"""Prüft, ob der Input eine Frage ist (W-Fragen Erkennung)."""
"""Prüft, ob der Input eine Frage ist."""
q = query.strip().lower()
if "?" in q: return True
starters = ["wer", "wie", "was", "wo", "wann", "warum", "weshalb", "wozu", "welche", "bist du"]
@ -136,17 +135,18 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
if kw.lower() in query_lower:
return "INTERVIEW", "Keyword (Interview)"
# 3. SLOW PATH: DecisionEngine LLM Router
# 3. SLOW PATH: DecisionEngine LLM Router (MoE-gesteuert)
intent = await llm.decision_engine._determine_strategy(query)
return intent, "DecisionEngine (LLM)"
# --- EBENE 3: RETRIEVAL AGGREGATION ---
def _collect_all_hits(stream_responses: Dict[str, Any]) -> List[QueryHit]:
"""Sammelt und dedupliziert Treffer aus allen parallelen Streams."""
"""Sammelt deduplizierte Treffer aus allen Streams für das Tracing."""
all_hits = []
seen_node_ids = set()
for _, response in stream_responses.items():
# In v3.0.4 sammeln wir die hits aus den QueryResponse Objekten
if hasattr(response, 'results'):
for hit in response.results:
if hit.node_id not in seen_node_ids:
@ -180,14 +180,14 @@ async def chat_endpoint(
sources_hits = []
answer_text = ""
# 2. INTERVIEW MODE (Kompatibilität zu v3.0.2)
# 2. INTERVIEW MODE (Bitgenaue WP-07 Logik)
if intent == "INTERVIEW":
target_type = _detect_target_type(request.message, strategy.get("schemas", {}))
types_cfg = get_types_config()
type_def = types_cfg.get("types", {}).get(target_type, {})
fields_list = type_def.get("schema", [])
# WP-07: RESTAURIERTE FALLBACK LOGIK (v3.0.2)
# WP-07: Restaurierte Fallback Logik
if not fields_list:
configured_schemas = strategy.get("schemas", {})
fallback = configured_schemas.get(target_type, configured_schemas.get("default", {}))
@ -200,73 +200,46 @@ async def chat_endpoint(
.replace("{target_type}", target_type) \
.replace("{schema_fields}", fields_str)
# WP-25a: Nutzt spezialisiertes Kompressions-Profil für Interviews
# WP-25a: MoE Call (Kaskade erfolgt intern im LLMService)
answer_text = await llm.generate_raw_response(
final_prompt, system=llm.get_prompt("system_prompt"),
priority="realtime", profile_name="compression_fast", max_retries=0
)
sources_hits = []
# 3. RAG MODE (WP-25a Multi-Stream + Pre-Synthesis)
# 3. RAG MODE (Optimierte MoE Orchestrierung)
else:
# Phase A & B: Retrieval & Kompression (Delegation an Engine v1.2.1)
# Diese Methode gibt bereits die (evtl. komprimierten) Kontext-Strings zurück.
formatted_context_map = await engine._execute_parallel_streams(strategy, request.message)
# Erfassung der Quellen für das Tracing
raw_stream_map = {}
stream_keys = strategy.get("use_streams", [])
library = engine.config.get("streams_library", {})
# Phase A: Retrieval
tasks = []
retrieval_tasks = []
active_streams = []
for key in stream_keys:
stream_cfg = library.get(key)
if stream_cfg:
if key in library:
active_streams.append(key)
tasks.append(engine._run_single_stream(key, stream_cfg, request.message))
retrieval_tasks.append(engine._run_single_stream(key, library[key], request.message))
responses = await asyncio.gather(*tasks, return_exceptions=True)
raw_stream_map = {}
formatted_context_tasks = []
max_chars = getattr(settings, "MAX_OLLAMA_CHARS", 10000)
provider = strategy.get("preferred_provider") or settings.MINDNET_LLM_PROVIDER
# Phase B: Pre-Synthesis & Throttling
responses = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
for name, res in zip(active_streams, responses):
if not isinstance(res, Exception):
raw_stream_map[name] = res
context_text = engine._format_stream_context(res)
# WP-25a: Automatisierte Kompression
stream_cfg = library.get(name, {})
threshold = stream_cfg.get("compression_threshold", 4000)
sources_hits = _collect_all_hits(raw_stream_map)
if len(context_text) > threshold:
profile = stream_cfg.get("compression_profile")
formatted_context_tasks.append(
engine._compress_stream_content(name, context_text, request.message, profile)
)
else:
# WP-20: Restaurierter Throttling-Schutz als Fallback
if provider == "ollama" and len(context_text) > max_chars:
context_text = context_text[:max_chars] + "\n[...]"
async def _ident(c=context_text): return c
formatted_context_tasks.append(_ident())
else:
async def _err(): return "[Stream Error]"
formatted_context_tasks.append(_err())
# Inhalte parallel finalisieren
final_contexts = await asyncio.gather(*formatted_context_tasks)
formatted_context_map = dict(zip(active_streams, final_contexts))
# Phase C: MoE Synthese
# Phase C: Finale MoE Synthese
answer_text = await engine._generate_final_answer(
intent, strategy, request.message, formatted_context_map
)
sources_hits = _collect_all_hits(raw_stream_map)
duration_ms = int((time.time() - start_time) * 1000)
# Logging
# Logging (WP-15)
try:
log_search(
query_id=query_id, query_text=request.message, results=sources_hits,
@ -281,4 +254,4 @@ async def chat_endpoint(
except Exception as e:
logger.error(f"❌ Chat Endpoint Failure: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung.")
raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.")

View File

@ -1,40 +1,74 @@
"""
FILE: app/services/embeddings_client.py
DESCRIPTION: Unified Embedding Client. Nutzt Ollama API (HTTP). Ersetzt lokale sentence-transformers.
VERSION: 2.5.0
DESCRIPTION: Unified Embedding Client. Nutzt MoE-Profile zur Modellsteuerung.
WP-25a: Integration der llm_profiles.yaml für konsistente Vektoren.
VERSION: 2.6.0 (WP-25a: MoE & Profile Support)
STATUS: Active
DEPENDENCIES: httpx, requests, app.config
LAST_ANALYSIS: 2025-12-15
DEPENDENCIES: httpx, requests, app.config, yaml
"""
from __future__ import annotations
import os
import logging
import httpx
import requests # Für den synchronen Fallback
from typing import List
import requests
import yaml
from pathlib import Path
from typing import List, Dict, Any
from app.config import get_settings
logger = logging.getLogger(__name__)
class EmbeddingsClient:
"""
Async Client für Embeddings via Ollama.
Async Client für Embeddings.
Steuerung erfolgt über das 'embedding_expert' Profil in llm_profiles.yaml.
"""
def __init__(self):
self.settings = get_settings()
self.base_url = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434")
self.model = os.getenv("MINDNET_EMBEDDING_MODEL")
# 1. MoE-Profil laden (WP-25a)
self.profile = self._load_embedding_profile()
# 2. Modell & URL auflösen
# Priorität: llm_profiles.yaml -> .env (Legacy) -> Fallback
self.model = self.profile.get("model") or os.getenv("MINDNET_EMBEDDING_MODEL")
provider = self.profile.get("provider", "ollama")
if provider == "ollama":
self.base_url = self.settings.OLLAMA_URL
else:
# Platzhalter für zukünftige Cloud-Embedding-Provider
self.base_url = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434")
if not self.model:
self.model = os.getenv("MINDNET_LLM_MODEL", "phi3:mini")
logger.warning(f"No MINDNET_EMBEDDING_MODEL set. Fallback to '{self.model}'.")
logger.warning(f"⚠️ Kein Embedding-Modell in Profil oder .env gefunden. Fallback auf '{self.model}'.")
else:
logger.info(f"🧬 Embedding-Experte aktiv: Model='{self.model}' via {provider}")
def _load_embedding_profile(self) -> Dict[str, Any]:
"""Lädt die Konfiguration für den embedding_expert."""
path_str = getattr(self.settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml")
path = Path(path_str)
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
profiles = data.get("profiles", {})
return profiles.get("embedding_expert", {})
except Exception as e:
logger.error(f"❌ Failed to load embedding profile: {e}")
return {}
async def embed_query(self, text: str) -> List[float]:
"""Erzeugt einen Vektor für eine Suchanfrage."""
return await self._request_embedding(text)
async def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Erzeugt Vektoren für einen Batch von Dokumenten."""
vectors = []
# Längeres Timeout für Batches
# Längeres Timeout für Batches (WP-20 Resilienz)
async with httpx.AsyncClient(timeout=120.0) as client:
for text in texts:
vec = await self._request_embedding_with_client(client, text)
@ -42,18 +76,23 @@ class EmbeddingsClient:
return vectors
async def _request_embedding(self, text: str) -> List[float]:
"""Interner Request-Handler für Einzelabfragen."""
async with httpx.AsyncClient(timeout=30.0) as client:
return await self._request_embedding_with_client(client, text)
async def _request_embedding_with_client(self, client: httpx.AsyncClient, text: str) -> List[float]:
if not text or not text.strip(): return []
"""Führt den HTTP-Call gegen die Embedding-API durch."""
if not text or not text.strip():
return []
url = f"{self.base_url}/api/embeddings"
try:
# WP-25: Aktuell optimiert für Ollama-API Struktur
response = await client.post(url, json={"model": self.model, "prompt": text})
response.raise_for_status()
return response.json().get("embedding", [])
except Exception as e:
logger.error(f"Async embedding failed: {e}")
logger.error(f"Async embedding failed (Model: {self.model}): {e}")
return []
# ==============================================================================
@ -62,27 +101,38 @@ class EmbeddingsClient:
def embed_text(text: str) -> List[float]:
"""
LEGACY/SYNC: Nutzt jetzt ebenfalls OLLAMA via 'requests'.
Ersetzt SentenceTransformers, um Dimensionskonflikte (768 vs 384) zu lösen.
LEGACY/SYNC: Nutzt ebenfalls die Profil-Logik für Konsistenz.
Ersetzt lokale sentence-transformers zur Vermeidung von Dimensionskonflikten.
"""
if not text or not text.strip():
return []
base_url = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434")
model = os.getenv("MINDNET_EMBEDDING_MODEL")
settings = get_settings()
# Schneller Profil-Lookup für Sync-Mode
path = Path(getattr(settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml"))
model = os.getenv("MINDNET_EMBEDDING_MODEL")
base_url = settings.OLLAMA_URL
if path.exists():
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
prof = data.get("profiles", {}).get("embedding_expert", {})
if prof.get("model"):
model = prof["model"]
except: pass
# Fallback logik identisch zur Klasse
if not model:
model = os.getenv("MINDNET_LLM_MODEL", "phi3:mini")
url = f"{base_url}/api/embeddings"
try:
# Synchroner Request (blockierend)
# Synchroner Request via requests
response = requests.post(url, json={"model": model, "prompt": text}, timeout=30)
response.raise_for_status()
data = response.json()
return data.get("embedding", [])
return response.json().get("embedding", [])
except Exception as e:
logger.error(f"Sync embedding (Ollama) failed: {e}")
logger.error(f"Sync embedding failed (Model: {model}): {e}")
return []

View File

@ -1,14 +1,14 @@
"""
FILE: app/services/llm_service.py
DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter.
WP-25a: Implementierung der Mixture of Experts (MoE) Profil-Steuerung.
VERSION: 3.5.0 (WP-25a: MoE & Profile Orchestration)
WP-25a: Implementierung der Mixture of Experts (MoE) Kaskaden-Steuerung.
VERSION: 3.5.2 (WP-25a: MoE & Fallback Cascade Support)
STATUS: Active
FIX:
- WP-25a: Profilbasiertes Routing via llm_profiles.yaml.
- WP-25a: Unterstützung individueller Temperaturen pro Experten-Profil.
- WP-25: Beibehaltung der Ingest-Stability (kein Schwellenwert für YES/NO).
- WP-25: Erhalt der vollständigen v3.4.2 Resilienz-Logik.
- WP-25a: Implementierung der rekursiven Fallback-Kaskade via fallback_profile.
- WP-25a: Schutz gegen zirkuläre Profil-Referenzen (visited_profiles).
- WP-25a: Erweitertes Logging für Tracing der Experten-Entscheidungen.
- Erhalt der Ingest-Stability (WP-25) und des Rate-Limit-Managements.
"""
import httpx
import yaml
@ -89,7 +89,6 @@ class LLMService:
def _load_llm_profiles(self) -> dict:
"""WP-25a: Lädt die zentralen MoE-Profile aus der llm_profiles.yaml."""
# Wir nutzen den in settings oder decision_engine definierten Pfad
path_str = getattr(self.settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml")
path = Path(path_str)
if not path.exists():
@ -124,22 +123,27 @@ class LLMService:
json_schema: Optional[Dict[str, Any]] = None,
json_schema_name: str = "mindnet_json",
strict_json_schema: bool = True,
profile_name: Optional[str] = None # WP-25a
profile_name: Optional[str] = None,
visited_profiles: Optional[list] = None
) -> str:
"""
Haupteinstiegspunkt für LLM-Anfragen mit Profil-Unterstützung.
Haupteinstiegspunkt für LLM-Anfragen mit rekursiver Kaskaden-Logik.
"""
visited_profiles = visited_profiles or []
target_provider = provider
target_model = model_override
target_temp = None
fallback_profile = None
# WP-25a: Profil-Auflösung (Provider, Modell, Temperatur)
# 1. Profil-Auflösung
if profile_name and self.profiles:
profile = self.profiles.get(profile_name)
if profile:
target_provider = profile.get("provider", target_provider)
target_model = profile.get("model", target_model)
target_temp = profile.get("temperature")
fallback_profile = profile.get("fallback_profile")
visited_profiles.append(profile_name)
logger.info(f"🎭 MoE Dispatch: Profil='{profile_name}' -> Provider='{target_provider}' | Model='{target_model}'")
else:
logger.warning(f"⚠️ Profil '{profile_name}' nicht in llm_profiles.yaml gefunden!")
@ -149,26 +153,52 @@ class LLMService:
target_provider = self.settings.MINDNET_LLM_PROVIDER
logger.info(f" Kein Provider/Profil definiert. Nutze Default: {target_provider}")
if priority == "background":
async with LLMService._background_semaphore:
# 2. Ausführung mit Fehler-Handling für Kaskade
try:
if priority == "background":
async with LLMService._background_semaphore:
res = await self._dispatch(
target_provider, prompt, system, force_json,
max_retries, base_delay, target_model,
json_schema, json_schema_name, strict_json_schema, target_temp
)
else:
res = await self._dispatch(
target_provider, prompt, system, force_json,
max_retries, base_delay, target_model,
json_schema, json_schema_name, strict_json_schema, target_temp
)
else:
res = await self._dispatch(
target_provider, prompt, system, force_json,
max_retries, base_delay, target_model,
json_schema, json_schema_name, strict_json_schema, target_temp
)
# WP-25 Fix: Ingest-Stability (Ermöglicht YES/NO ohne Schwellenwert-Blockade)
if not res and target_provider != "ollama":
logger.warning(f"⚠️ [WP-25] Empty response from {target_provider}. Fallback to OLLAMA.")
res = await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, target_temp)
# Check auf leere Cloud-Antworten (WP-25 Stability)
if not res and target_provider != "ollama":
logger.warning(f"⚠️ Empty response from {target_provider}. Triggering fallback chain.")
raise ValueError(f"Empty response from {target_provider}")
return clean_llm_text(res) if not force_json else res
return clean_llm_text(res) if not force_json else res
except Exception as e:
logger.error(f"❌ Error during execution of profile '{profile_name}' ({target_provider}): {e}")
# 3. Kaskaden-Logik: Nächstes Profil in der Kette versuchen
if fallback_profile and fallback_profile not in visited_profiles:
logger.info(f"🔄 Switching to fallback profile: '{fallback_profile}'")
return await self.generate_raw_response(
prompt=prompt, system=system, force_json=force_json,
max_retries=max_retries, base_delay=base_delay,
priority=priority, provider=provider, model_override=model_override,
json_schema=json_schema, json_schema_name=json_schema_name,
strict_json_schema=strict_json_schema,
profile_name=fallback_profile,
visited_profiles=visited_profiles
)
# 4. Ultimativer Notanker: Falls alles fehlschlägt, direkt zu Ollama
if target_provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED:
logger.warning(f"🚨 Kaskade erschöpft. Nutze finalen Ollama-Notanker.")
res = await self._execute_ollama(prompt, system, force_json, max_retries, base_delay)
return clean_llm_text(res) if not force_json else res
raise e
async def _dispatch(
self,
@ -182,9 +212,9 @@ class LLMService:
json_schema: Optional[Dict[str, Any]],
json_schema_name: str,
strict_json_schema: bool,
temperature: Optional[float] = None # WP-25a
temperature: Optional[float] = None
) -> str:
"""Routet die Anfrage mit Rate-Limit Erkennung."""
"""Routet die Anfrage an den spezifischen Provider-Executor."""
rate_limit_attempts = 0
max_rate_retries = min(max_retries, getattr(self.settings, "LLM_RATE_LIMIT_RETRIES", 3))
wait_time = getattr(self.settings, "LLM_RATE_LIMIT_WAIT", 60.0)
@ -206,14 +236,13 @@ class LLMService:
except Exception as e:
err_str = str(e)
# Rate-Limit Handling (429)
if any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited"]):
rate_limit_attempts += 1
logger.warning(f"⏳ Rate Limit {provider}. Attempt {rate_limit_attempts}. Wait {wait_time}s.")
await asyncio.sleep(wait_time)
continue
if self.settings.LLM_FALLBACK_ENABLED and provider != "ollama":
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, temperature)
# Andere Fehler werden an generate_raw_response für die Kaskade gereicht
raise e
async def _execute_google(self, prompt, system, force_json, model_override, temperature):
@ -245,7 +274,6 @@ class LLMService:
temperature: Optional[float] = None
) -> str:
model = model_override or self.settings.OPENROUTER_MODEL
# ERWEITERTES LOGGING VOR DEM CALL
logger.info(f"🛰️ OpenRouter Call: Model='{model}' | Temp={temperature}")
messages = []
if system: messages.append({"role": "system", "content": system})
@ -274,7 +302,7 @@ class LLMService:
return response.choices[0].message.content.strip() if response.choices[0].message.content else ""
async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay, temperature=None):
# WP-25a: Nutzt Profil-Temperatur oder Standard
# Nutzt Profil-Temperatur oder strikte Defaults für lokale Hardware-Schonung
effective_temp = temperature if temperature is not None else (0.1 if force_json else 0.7)
payload = {
@ -294,7 +322,9 @@ class LLMService:
return res.json().get("response", "").strip()
except Exception as e:
attempt += 1
if attempt > max_retries: raise e
if attempt > max_retries:
logger.error(f"❌ Ollama final failure after {attempt} attempts: {e}")
raise e
await asyncio.sleep(base_delay * (2 ** (attempt - 1)))
async def generate_rag_response(self, query: str, context_str: Optional[str] = None) -> str:

View File

@ -1,31 +1,64 @@
# config/llm_profiles.yaml
# VERSION: 1.0.0 (WP-25a: Centralized MoE Profiles)
# VERSION: 1.3.0 (WP-25a: Global MoE & Fallback Cascade)
# STATUS: Active
# DESCRIPTION: Zentrale Definition der LLM-Experten-Profile für MindNet.
# DESCRIPTION: Zentrale Definition der LLM-Rollen inkl. Ausfall-Logik (Kaskade).
profiles:
# Der "Dampfhammer": Schnell und günstig für Zusammenfassungen
# --- CHAT & SYNTHESE ---
# Der "Architekt": Hochwertige Synthese. Fällt bei Fehlern auf den Backup-Cloud-Experten zurück.
synthesis_pro:
provider: "openrouter"
model: "gemini-1.5-mistralai/mistral-7b-instruct:free"
temperature: 0.7
fallback_profile: "synthesis_backup"
# Der "Vize": Leistungsstarkes Modell bei einem anderen Provider (Resilienz).
synthesis_backup:
provider: "openrouter"
model: "mistralai/mistral-large"
temperature: 0.5
fallback_profile: "identity_safe" # Letzte Instanz: Lokal
# Der "Ingenieur": Fachspezialist für Code. Nutzt bei Ausfall den Generalisten.
tech_expert:
provider: "openrouter"
model: "anthropic/claude-3.5-sonnet"
temperature: 0.3
fallback_profile: "synthesis_pro"
# Der "Dampfhammer": Schnell für Routing und Zusammenfassungen.
compression_fast:
provider: "openrouter"
model: "mistralai/mistral-7b-instruct:free"
temperature: 0.1
fallback_profile: "identity_safe"
# Der "Ingenieur": Tiefes Verständnis für Code und Logik
tech_expert:
provider: "openrouter"
model: "anthropic/claude-3-sonnet"
temperature: 0.3
# Der "Wächter": Lokal für sensible Identitäts-Daten
identity_safe:
provider: "ollama"
model: "llama3.1:8b"
temperature: 0.2
# Der "Architekt": Hochwertige Synthese und strategische Abwägung
synthesis_pro:
# --- INGESTION EXPERTEN ---
# Spezialist für die Extraktion komplexer Datenstrukturen aus Dokumenten.
ingest_extractor:
provider: "openrouter"
model: "mistralai/mistral-7b-instruct:free"
temperature: 0.7
temperature: 0.2
fallback_profile: "synthesis_backup"
# Spezialist für binäre Prüfungen (YES/NO). Muss extrem deterministisch sein.
ingest_validator:
provider: "openrouter"
model: "mistralai/mistral-7b-instruct:free"
temperature: 0.0
fallback_profile: "compression_fast"
# --- LOKALER ANKER & PRIVACY ---
# Der "Wächter": Lokales Modell für maximale Privatsphäre. Ende der Kaskade.
identity_safe:
provider: "ollama"
model: "phi3:mini"
temperature: 0.2
# Kein fallback_profile definiert = Terminaler Endpunkt
# --- EMBEDDING EXPERTE ---
# Zentralisierung des Embedding-Modells zur Entfernung aus der .env.
embedding_expert:
provider: "ollama"
model: "nomic-embed-text"
dimensions: 768

View File

@ -0,0 +1,103 @@
# Audit: LLM-Profilsteuerung Integration (WP-25a)
**Datum:** 2025-01-XX
**Version:** WP-25a
**Status:** ✅ Abgeschlossen mit Verbesserungen
## Zusammenfassung
Dieses Audit prüft die Vollständigkeit der neuen LLM-Profilsteuerung (MoE - Mixture of Experts) und identifiziert alle Stellen, die die zentrale Steuerung umgehen könnten.
## Gefundene Probleme & Lösungen
### ✅ Problem 1: Fehlendes `profile_name` im Fallback-Code
**Datei:** `app/core/retrieval/decision_engine.py` (Zeile 253-255)
**Problem:** Der Fallback-Aufruf in `_generate_final_answer` nutzte kein `profile_name`, wodurch die Profilsteuerung umgangen wurde.
**Lösung:** ✅ Behoben - Nutzt nun `profile_name=profile` für Konsistenz.
### ⚠️ Problem 2: Ungenutztes Profil `ingest_extractor`
**Datei:** `config/llm_profiles.yaml`
**Problem:** Das Profil `ingest_extractor` ist definiert, wird aber nirgendwo im Code verwendet.
**Status:** ⚠️ Offene Lücke - Profil ist für zukünftige Extraktions-Aufgaben vorgesehen, aktuell nicht benötigt.
### ✅ Problem 3: Externes Script umgeht Steuerung
**Datei:** `scripts/ollama_tool_runner.py`
**Problem:** Script macht direkte Ollama-Aufrufe ohne LLMService.
**Status:** ✅ Akzeptabel - Dies ist ein externes Test-/Demo-Script, kein Teil der Hauptanwendung.
## Vollständige Prüfung aller LLM-Aufrufe
### ✅ Korrekt implementiert (nutzen Profilsteuerung):
1. **`app/core/ingestion/ingestion_validation.py`**
- ✅ Nutzt `profile_name="ingest_validator"` (Zeile 61-64)
- ✅ Delegiert Fallback-Kaskade an LLMService
2. **`app/core/retrieval/decision_engine.py`**
- ✅ `_determine_strategy()`: Nutzt `router_profile` (Zeile 94-96)
- ✅ `_compress_stream_content()`: Nutzt `compression_profile` (Zeile 169-174)
- ✅ `_generate_final_answer()`: Nutzt `llm_profile` aus Strategie (Zeile 244-246)
- ✅ **BEHOBEN:** Fallback nutzt nun auch `profile_name` (Zeile 253-256)
3. **`app/routers/chat.py`**
- ✅ Interview-Modus: Nutzt `profile_name="compression_fast"` (Zeile 204-207)
- ✅ RAG-Modus: Delegiert an DecisionEngine (nutzt Strategie-Profile)
4. **`app/services/embeddings_client.py`**
- ✅ Nutzt `embedding_expert` Profil aus `llm_profiles.yaml` (Zeile 29-47)
- ✅ Konsistente Modellsteuerung für Embeddings
5. **`app/services/llm_service.py`**
- ✅ Zentrale Implementierung der Profilsteuerung
- ✅ Rekursive Fallback-Kaskade implementiert
- ✅ Schutz gegen zirkuläre Referenzen (`visited_profiles`)
### ✅ Keine LLM-Aufrufe (korrekt):
1. **`app/routers/ingest.py`**
- Nutzt nur IngestionService (der wiederum LLMService nutzt)
2. **`app/services/discovery.py`**
- Nutzt nur Retrieval, keine LLM-Aufrufe
3. **`app/frontend/ui_api.py`**
- Macht nur HTTP-Requests zu API-Endpunkten
## Konfigurationsprüfung
### ✅ `config/llm_profiles.yaml`
- ✅ Alle benötigten Profile definiert:
- `synthesis_pro` - Hauptsynthese
- `synthesis_backup` - Backup-Synthese
- `tech_expert` - Code/Technik
- `compression_fast` - Kompression/Routing
- `ingest_validator` - Validierung (YES/NO)
- `ingest_extractor` - Extraktion (aktuell ungenutzt)
- `identity_safe` - Lokaler Privacy-Anker
- `embedding_expert` - Embeddings
- ✅ Fallback-Kaskaden korrekt definiert
- ✅ Temperaturen angemessen gesetzt
### ✅ `config/decision_engine.yaml`
- ✅ Nutzt `router_profile` für Intent-Erkennung
- ✅ Strategien referenzieren `llm_profile`
- ✅ Streams nutzen `compression_profile`
## Empfehlungen
### Sofort umsetzbar:
1. ✅ **BEHOBEN:** Fallback in DecisionEngine nutzt nun Profilsteuerung
### Zukünftige Verbesserungen:
1. **`ingest_extractor` Profil:** Wenn Extraktions-Aufgaben hinzukommen, sollte dieses Profil genutzt werden
2. **Monitoring:** Logging erweitern, um Profil-Nutzung zu tracken
3. **Dokumentation:** Profil-Auswahl-Logik in Entwickler-Dokumentation aufnehmen
## Fazit
**Die LLM-Profilsteuerung ist vollständig integriert.**
**Alle kritischen LLM-Aufrufe nutzen die zentrale Steuerung.**
**Ein kleiner Bug wurde behoben (Fallback ohne Profil).**
⚠️ **Ein Profil (`ingest_extractor`) ist definiert, aber aktuell ungenutzt - dies ist akzeptabel für zukünftige Features.**
Die Architektur ist robust und folgt dem MoE-Prinzip konsequent.