app/core/ingestion.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
This commit is contained in:
parent
5213d262a2
commit
b0d73cb053
|
|
@ -3,11 +3,12 @@ FILE: app/core/ingestion.py
|
||||||
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen.
|
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen.
|
||||||
WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free).
|
WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free).
|
||||||
WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash.
|
WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash.
|
||||||
FIX: Policy-Violation Detection & erzwungener Ollama-Fallback bei Cloud-Refusal.
|
FIX: Deep Fallback Logic (v2.11.14). Erkennt Policy Violations auch in validen
|
||||||
Dies löst das Problem leerer Kantenlisten bei umfangreichen Protokollen.
|
JSON-Objekten und erzwingt den lokalen Ollama-Sprung, um Kantenverlust
|
||||||
VERSION: 2.11.13
|
bei umfangreichen Protokollen zu verhindern.
|
||||||
|
VERSION: 2.11.14
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service
|
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
|
@ -57,7 +58,7 @@ def extract_json_from_response(text: str) -> Any:
|
||||||
if not text or not isinstance(text, str):
|
if not text or not isinstance(text, str):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# 1. Entferne Mistral/Llama Steuerzeichen und Tags (BOS/EOS)
|
# 1. Entferne Mistral/Llama Steuerzeichen und Tags
|
||||||
clean = text.replace("<s>", "").replace("</s>", "")
|
clean = text.replace("<s>", "").replace("</s>", "")
|
||||||
clean = clean.replace("[OUT]", "").replace("[/OUT]", "")
|
clean = clean.replace("[OUT]", "").replace("[/OUT]", "")
|
||||||
clean = clean.strip()
|
clean = clean.strip()
|
||||||
|
|
@ -139,8 +140,9 @@ class IngestionService:
|
||||||
|
|
||||||
async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]:
|
async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]:
|
||||||
"""
|
"""
|
||||||
KI-Extraktion mit aktiver Erkennung von Cloud-Ablehnungen (Policy Violations).
|
KI-Extraktion mit Deep-Fallback Logik.
|
||||||
Erzwingt bei leeren Cloud-Antworten einen automatischen lokalen Ollama-Fallback.
|
Erzwingt den lokalen Ollama-Sprung, wenn die Cloud-Antwort keine verwertbaren
|
||||||
|
Kanten liefert (häufig bei Policy Violations auf OpenRouter).
|
||||||
"""
|
"""
|
||||||
provider = self.settings.MINDNET_LLM_PROVIDER
|
provider = self.settings.MINDNET_LLM_PROVIDER
|
||||||
model = self.settings.OPENROUTER_MODEL if provider == "openrouter" else self.settings.GEMINI_MODEL
|
model = self.settings.OPENROUTER_MODEL if provider == "openrouter" else self.settings.GEMINI_MODEL
|
||||||
|
|
@ -153,9 +155,8 @@ class IngestionService:
|
||||||
template = self.llm.get_prompt("edge_extraction", provider)
|
template = self.llm.get_prompt("edge_extraction", provider)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Sicherheits-Check: Formatierung des Templates gegen KeyError schützen
|
|
||||||
try:
|
try:
|
||||||
# Wir senden max 6000 Zeichen (ca. 1500 Token) an das LLM für die Extraktion
|
# Wir begrenzen den Kontext auf 6000 Zeichen (ca. 1500 Token)
|
||||||
prompt = template.format(
|
prompt = template.format(
|
||||||
text=text[:6000],
|
text=text[:6000],
|
||||||
note_id=note_id,
|
note_id=note_id,
|
||||||
|
|
@ -165,53 +166,57 @@ class IngestionService:
|
||||||
logger.error(f"❌ [Ingestion] Prompt-Template Fehler (Variable {ke} fehlt).")
|
logger.error(f"❌ [Ingestion] Prompt-Template Fehler (Variable {ke} fehlt).")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Schritt 1: Anfrage an den primären Provider (Cloud)
|
# 1. Versuch: Anfrage an den primären Cloud-Provider
|
||||||
response_json = await self.llm.generate_raw_response(
|
response_json = await self.llm.generate_raw_response(
|
||||||
prompt=prompt, priority="background", force_json=True,
|
prompt=prompt, priority="background", force_json=True,
|
||||||
provider=provider, model_override=model
|
provider=provider, model_override=model
|
||||||
)
|
)
|
||||||
|
|
||||||
# Nutzt den verbesserten Mistral-sicheren JSON-Extraktor
|
# Initiales Parsing
|
||||||
raw_data = extract_json_from_response(response_json)
|
raw_data = extract_json_from_response(response_json)
|
||||||
|
|
||||||
# FALLBACK-LOGIK: Wenn Cloud leer liefert (Policy Violation / No data training), erzwinge lokal
|
# 2. Dictionary Recovery (Versuche Liste aus Dict zu extrahieren)
|
||||||
if not raw_data and provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED:
|
candidates = []
|
||||||
logger.warning(
|
if isinstance(raw_data, list):
|
||||||
f"🛑 [Ingestion] Cloud-Provider '{provider}' lieferte keine Daten für {note_id}. "
|
candidates = raw_data
|
||||||
f"Mögliche Policy Violation. Erzwinge LOKALEN FALLBACK via Ollama..."
|
elif isinstance(raw_data, dict):
|
||||||
)
|
logger.info(f"ℹ️ [Ingestion] LLM returned dict, checking for embedded lists in {note_id}")
|
||||||
response_json = await self.llm.generate_raw_response(
|
|
||||||
prompt=prompt, priority="background", force_json=True,
|
|
||||||
provider="ollama"
|
|
||||||
)
|
|
||||||
raw_data = extract_json_from_response(response_json)
|
|
||||||
|
|
||||||
# Recovery: Suche nach Listen in Dictionaries (z.B. {"matches": [...]})
|
|
||||||
if isinstance(raw_data, dict):
|
|
||||||
logger.info(f"ℹ️ [Ingestion] LLM returned dict, trying recovery for {note_id}")
|
|
||||||
found_list = False
|
|
||||||
for k in ["edges", "links", "results", "kanten", "matches", "edge_list"]:
|
for k in ["edges", "links", "results", "kanten", "matches", "edge_list"]:
|
||||||
if k in raw_data and isinstance(raw_data[k], list):
|
if k in raw_data and isinstance(raw_data[k], list):
|
||||||
raw_data = raw_data[k]
|
candidates = raw_data[k]
|
||||||
found_list = True
|
|
||||||
break
|
break
|
||||||
# Ultimativer Dict-Fallback: Key-Value Paare als Kanten interpretieren
|
# Wenn immer noch keine Liste gefunden, versuche Key-Value Paare (Dict Recovery)
|
||||||
if not found_list:
|
if not candidates:
|
||||||
new_list = []
|
|
||||||
for k, v in raw_data.items():
|
for k, v in raw_data.items():
|
||||||
if isinstance(v, str): new_list.append(f"{k}:{v}")
|
if isinstance(v, str): candidates.append(f"{k}:{v}")
|
||||||
elif isinstance(v, list):
|
elif isinstance(v, list): [candidates.append(f"{k}:{i}") for i in v if isinstance(i, str)]
|
||||||
for target in v:
|
|
||||||
if isinstance(target, str): new_list.append(f"{k}:{target}")
|
# 3. DEEP FALLBACK: Wenn nach allen Recovery-Versuchen die Liste leer ist UND wir in der Cloud waren
|
||||||
raw_data = new_list
|
# Triggert den Fallback bei "Data Policy Violations" (leere oder Fehler-JSONs).
|
||||||
|
if not candidates and provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED:
|
||||||
if not isinstance(raw_data, list) or not raw_data:
|
logger.warning(
|
||||||
logger.warning(f"⚠️ [Ingestion] LLM lieferte keine extrahierbaren Kanten für {note_id}")
|
f"🛑 [Ingestion] Cloud-Antwort für {note_id} lieferte keine verwertbaren Kanten. "
|
||||||
|
f"Mögliche Policy Violation oder Refusal. Erzwinge LOKALEN FALLBACK via Ollama..."
|
||||||
|
)
|
||||||
|
response_json_local = await self.llm.generate_raw_response(
|
||||||
|
prompt=prompt, priority="background", force_json=True, provider="ollama"
|
||||||
|
)
|
||||||
|
raw_data_local = extract_json_from_response(response_json_local)
|
||||||
|
|
||||||
|
# Wiederhole Recovery für lokale Antwort
|
||||||
|
if isinstance(raw_data_local, list):
|
||||||
|
candidates = raw_data_local
|
||||||
|
elif isinstance(raw_data_local, dict):
|
||||||
|
for k in ["edges", "links", "results"]:
|
||||||
|
if k in raw_data_local and isinstance(raw_data_local[k], list):
|
||||||
|
candidates = raw_data_local[k]; break
|
||||||
|
|
||||||
|
if not candidates:
|
||||||
|
logger.warning(f"⚠️ [Ingestion] Auch nach Fallback keine extrahierbaren Kanten für {note_id}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
processed = []
|
processed = []
|
||||||
for item in raw_data:
|
for item in candidates:
|
||||||
# Erkennt sowohl Dict als auch String ["kind:target"]
|
|
||||||
if isinstance(item, dict) and "to" in item:
|
if isinstance(item, dict) and "to" in item:
|
||||||
item["provenance"] = "semantic_ai"
|
item["provenance"] = "semantic_ai"
|
||||||
item["line"] = f"ai-{provider}"
|
item["line"] = f"ai-{provider}"
|
||||||
|
|
@ -247,6 +252,7 @@ class IngestionService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {**result, "error": f"Validation failed: {str(e)}"}
|
return {**result, "error": f"Validation failed: {str(e)}"}
|
||||||
|
|
||||||
|
# WP-22: Filter für Systemdateien und Entwürfe
|
||||||
status = fm.get("status", "draft").lower().strip()
|
status = fm.get("status", "draft").lower().strip()
|
||||||
if status in ["system", "template", "archive", "hidden"]:
|
if status in ["system", "template", "archive", "hidden"]:
|
||||||
return {**result, "status": "skipped", "reason": f"lifecycle_{status}"}
|
return {**result, "status": "skipped", "reason": f"lifecycle_{status}"}
|
||||||
|
|
@ -261,7 +267,7 @@ class IngestionService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {**result, "error": f"Payload failed: {str(e)}"}
|
return {**result, "error": f"Payload failed: {str(e)}"}
|
||||||
|
|
||||||
# 3. Change Detection
|
# 3. Change Detection (Strikte DoD Umsetzung)
|
||||||
old_payload = None if force_replace else self._fetch_note_payload(note_id)
|
old_payload = None if force_replace else self._fetch_note_payload(note_id)
|
||||||
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
|
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
|
||||||
old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
|
old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
|
||||||
|
|
@ -299,12 +305,12 @@ class IngestionService:
|
||||||
edges = []
|
edges = []
|
||||||
context = {"file": file_path, "note_id": note_id}
|
context = {"file": file_path, "note_id": note_id}
|
||||||
|
|
||||||
# A. Explizite Kanten (User)
|
# A. Explizite Kanten (User / Wikilinks)
|
||||||
for e in extract_edges_with_context(parsed):
|
for e in extract_edges_with_context(parsed):
|
||||||
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
|
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
|
||||||
edges.append(e)
|
edges.append(e)
|
||||||
|
|
||||||
# B. KI Kanten (Turbo)
|
# B. KI Kanten (Turbo Mode mit v2.11.14 Fallback)
|
||||||
ai_edges = await self._perform_smart_edge_allocation(body_text, note_id)
|
ai_edges = await self._perform_smart_edge_allocation(body_text, note_id)
|
||||||
for e in ai_edges:
|
for e in ai_edges:
|
||||||
valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
|
valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
|
||||||
|
|
@ -347,6 +353,7 @@ class IngestionService:
|
||||||
return {**result, "error": f"DB Upsert failed: {e}"}
|
return {**result, "error": f"DB Upsert failed: {e}"}
|
||||||
|
|
||||||
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
||||||
|
"""Holt die Metadaten einer Note aus Qdrant."""
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
try:
|
try:
|
||||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
|
|
@ -365,6 +372,7 @@ class IngestionService:
|
||||||
except: return True, True
|
except: return True, True
|
||||||
|
|
||||||
def _purge_artifacts(self, note_id: str):
|
def _purge_artifacts(self, note_id: str):
|
||||||
|
"""Löscht verwaiste Chunks/Edges vor einem Re-Import."""
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
for suffix in ["chunks", "edges"]:
|
for suffix in ["chunks", "edges"]:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user