From b0d73cb053d78fd129ab65418034ce918f36b193 Mon Sep 17 00:00:00 2001 From: Lars Date: Thu, 25 Dec 2025 21:46:40 +0100 Subject: [PATCH] app/core/ingestion.py aktualisiert --- app/core/ingestion.py | 98 +++++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/app/core/ingestion.py b/app/core/ingestion.py index 026c77d..fa71d1f 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -3,11 +3,12 @@ FILE: app/core/ingestion.py DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen. WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free). WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash. -FIX: Policy-Violation Detection & erzwungener Ollama-Fallback bei Cloud-Refusal. - Dies löst das Problem leerer Kantenlisten bei umfangreichen Protokollen. -VERSION: 2.11.13 +FIX: Deep Fallback Logic (v2.11.14). Erkennt Policy Violations auch in validen + JSON-Objekten und erzwingt den lokalen Ollama-Sprung, um Kantenverlust + bei umfangreichen Protokollen zu verhindern. +VERSION: 2.11.14 STATUS: Active -DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service +DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry """ import os import json @@ -57,7 +58,7 @@ def extract_json_from_response(text: str) -> Any: if not text or not isinstance(text, str): return [] - # 1. Entferne Mistral/Llama Steuerzeichen und Tags (BOS/EOS) + # 1. Entferne Mistral/Llama Steuerzeichen und Tags clean = text.replace("", "").replace("", "") clean = clean.replace("[OUT]", "").replace("[/OUT]", "") clean = clean.strip() @@ -139,8 +140,9 @@ class IngestionService: async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]: """ - KI-Extraktion mit aktiver Erkennung von Cloud-Ablehnungen (Policy Violations). - Erzwingt bei leeren Cloud-Antworten einen automatischen lokalen Ollama-Fallback. + KI-Extraktion mit Deep-Fallback Logik. + Erzwingt den lokalen Ollama-Sprung, wenn die Cloud-Antwort keine verwertbaren + Kanten liefert (häufig bei Policy Violations auf OpenRouter). """ provider = self.settings.MINDNET_LLM_PROVIDER model = self.settings.OPENROUTER_MODEL if provider == "openrouter" else self.settings.GEMINI_MODEL @@ -153,9 +155,8 @@ class IngestionService: template = self.llm.get_prompt("edge_extraction", provider) try: - # Sicherheits-Check: Formatierung des Templates gegen KeyError schützen try: - # Wir senden max 6000 Zeichen (ca. 1500 Token) an das LLM für die Extraktion + # Wir begrenzen den Kontext auf 6000 Zeichen (ca. 1500 Token) prompt = template.format( text=text[:6000], note_id=note_id, @@ -165,53 +166,57 @@ class IngestionService: logger.error(f"❌ [Ingestion] Prompt-Template Fehler (Variable {ke} fehlt).") return [] - # Schritt 1: Anfrage an den primären Provider (Cloud) + # 1. Versuch: Anfrage an den primären Cloud-Provider response_json = await self.llm.generate_raw_response( prompt=prompt, priority="background", force_json=True, provider=provider, model_override=model ) - # Nutzt den verbesserten Mistral-sicheren JSON-Extraktor + # Initiales Parsing raw_data = extract_json_from_response(response_json) - # FALLBACK-LOGIK: Wenn Cloud leer liefert (Policy Violation / No data training), erzwinge lokal - if not raw_data and provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED: - logger.warning( - f"🛑 [Ingestion] Cloud-Provider '{provider}' lieferte keine Daten für {note_id}. " - f"Mögliche Policy Violation. Erzwinge LOKALEN FALLBACK via Ollama..." - ) - response_json = await self.llm.generate_raw_response( - prompt=prompt, priority="background", force_json=True, - provider="ollama" - ) - raw_data = extract_json_from_response(response_json) - - # Recovery: Suche nach Listen in Dictionaries (z.B. {"matches": [...]}) - if isinstance(raw_data, dict): - logger.info(f"ℹ️ [Ingestion] LLM returned dict, trying recovery for {note_id}") - found_list = False + # 2. Dictionary Recovery (Versuche Liste aus Dict zu extrahieren) + candidates = [] + if isinstance(raw_data, list): + candidates = raw_data + elif isinstance(raw_data, dict): + logger.info(f"ℹ️ [Ingestion] LLM returned dict, checking for embedded lists in {note_id}") for k in ["edges", "links", "results", "kanten", "matches", "edge_list"]: if k in raw_data and isinstance(raw_data[k], list): - raw_data = raw_data[k] - found_list = True + candidates = raw_data[k] break - # Ultimativer Dict-Fallback: Key-Value Paare als Kanten interpretieren - if not found_list: - new_list = [] + # Wenn immer noch keine Liste gefunden, versuche Key-Value Paare (Dict Recovery) + if not candidates: for k, v in raw_data.items(): - if isinstance(v, str): new_list.append(f"{k}:{v}") - elif isinstance(v, list): - for target in v: - if isinstance(target, str): new_list.append(f"{k}:{target}") - raw_data = new_list - - if not isinstance(raw_data, list) or not raw_data: - logger.warning(f"⚠️ [Ingestion] LLM lieferte keine extrahierbaren Kanten für {note_id}") + if isinstance(v, str): candidates.append(f"{k}:{v}") + elif isinstance(v, list): [candidates.append(f"{k}:{i}") for i in v if isinstance(i, str)] + + # 3. DEEP FALLBACK: Wenn nach allen Recovery-Versuchen die Liste leer ist UND wir in der Cloud waren + # Triggert den Fallback bei "Data Policy Violations" (leere oder Fehler-JSONs). + if not candidates and provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED: + logger.warning( + f"🛑 [Ingestion] Cloud-Antwort für {note_id} lieferte keine verwertbaren Kanten. " + f"Mögliche Policy Violation oder Refusal. Erzwinge LOKALEN FALLBACK via Ollama..." + ) + response_json_local = await self.llm.generate_raw_response( + prompt=prompt, priority="background", force_json=True, provider="ollama" + ) + raw_data_local = extract_json_from_response(response_json_local) + + # Wiederhole Recovery für lokale Antwort + if isinstance(raw_data_local, list): + candidates = raw_data_local + elif isinstance(raw_data_local, dict): + for k in ["edges", "links", "results"]: + if k in raw_data_local and isinstance(raw_data_local[k], list): + candidates = raw_data_local[k]; break + + if not candidates: + logger.warning(f"⚠️ [Ingestion] Auch nach Fallback keine extrahierbaren Kanten für {note_id}") return [] processed = [] - for item in raw_data: - # Erkennt sowohl Dict als auch String ["kind:target"] + for item in candidates: if isinstance(item, dict) and "to" in item: item["provenance"] = "semantic_ai" item["line"] = f"ai-{provider}" @@ -247,6 +252,7 @@ class IngestionService: except Exception as e: return {**result, "error": f"Validation failed: {str(e)}"} + # WP-22: Filter für Systemdateien und Entwürfe status = fm.get("status", "draft").lower().strip() if status in ["system", "template", "archive", "hidden"]: return {**result, "status": "skipped", "reason": f"lifecycle_{status}"} @@ -261,7 +267,7 @@ class IngestionService: except Exception as e: return {**result, "error": f"Payload failed: {str(e)}"} - # 3. Change Detection + # 3. Change Detection (Strikte DoD Umsetzung) old_payload = None if force_replace else self._fetch_note_payload(note_id) check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(check_key) @@ -299,12 +305,12 @@ class IngestionService: edges = [] context = {"file": file_path, "note_id": note_id} - # A. Explizite Kanten (User) + # A. Explizite Kanten (User / Wikilinks) for e in extract_edges_with_context(parsed): e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")}) edges.append(e) - # B. KI Kanten (Turbo) + # B. KI Kanten (Turbo Mode mit v2.11.14 Fallback) ai_edges = await self._perform_smart_edge_allocation(body_text, note_id) for e in ai_edges: valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")}) @@ -347,6 +353,7 @@ class IngestionService: return {**result, "error": f"DB Upsert failed: {e}"} def _fetch_note_payload(self, note_id: str) -> Optional[dict]: + """Holt die Metadaten einer Note aus Qdrant.""" from qdrant_client.http import models as rest try: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) @@ -365,6 +372,7 @@ class IngestionService: except: return True, True def _purge_artifacts(self, note_id: str): + """Löscht verwaiste Chunks/Edges vor einem Re-Import.""" from qdrant_client.http import models as rest f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) for suffix in ["chunks", "edges"]: