mindnet/app/core/ingestion.py
Lars 5213d262a2
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
app/core/ingestion.py aktualisiert
2025-12-25 21:28:54 +01:00

382 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/core/ingestion.py
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen.
WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free).
WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash.
FIX: Policy-Violation Detection & erzwungener Ollama-Fallback bei Cloud-Refusal.
Dies löst das Problem leerer Kantenlisten bei umfangreichen Protokollen.
VERSION: 2.11.13
STATUS: Active
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service
"""
import os
import json
import re
import logging
import asyncio
import time
from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
extract_edges_with_context,
)
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks, get_chunk_config
from app.core.chunk_payload import make_chunk_payloads
# Fallback für Edges
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
def build_edges_for_note(*args, **kwargs): return []
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService
logger = logging.getLogger(__name__)
# --- Global Helpers ---
def extract_json_from_response(text: str) -> Any:
"""
Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (Mistral/Llama).
Entfernt <s>, [OUT], [/OUT] und Markdown-Blöcke für maximale Robustheit.
"""
if not text or not isinstance(text, str):
return []
# 1. Entferne Mistral/Llama Steuerzeichen und Tags (BOS/EOS)
clean = text.replace("<s>", "").replace("</s>", "")
clean = clean.replace("[OUT]", "").replace("[/OUT]", "")
clean = clean.strip()
# 2. Suche nach Markdown JSON-Blöcken (```json ... ```)
match = re.search(r"```(?:json)?\s*(.*?)\s*```", clean, re.DOTALL)
payload = match.group(1) if match else clean
try:
return json.loads(payload.strip())
except json.JSONDecodeError:
# 3. Recovery: Suche nach der ersten [ und letzten ] (Liste)
start = payload.find('[')
end = payload.rfind(']') + 1
if start != -1 and end > start:
try:
return json.loads(payload[start:end])
except: pass
# 4. Zweite Recovery: Suche nach der ersten { und letzten } (Objekt)
start_obj = payload.find('{')
end_obj = payload.rfind('}') + 1
if start_obj != -1 and end_obj > start_obj:
try:
return json.loads(payload[start_obj:end_obj])
except: pass
return []
def load_type_registry(custom_path: Optional[str] = None) -> dict:
"""Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion."""
import yaml
from app.config import get_settings
settings = get_settings()
path = custom_path or settings.MINDNET_TYPES_FILE
if not os.path.exists(path): return {}
try:
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
except Exception: return {}
# --- Service Class ---
class IngestionService:
def __init__(self, collection_prefix: str = None):
from app.config import get_settings
self.settings = get_settings()
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
self.dim = self.settings.VECTOR_SIZE
self.registry = load_type_registry()
self.embedder = EmbeddingsClient()
self.llm = LLMService()
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
try:
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB init warning: {e}")
def _resolve_note_type(self, requested: Optional[str]) -> str:
"""Bestimmt den finalen Notiz-Typ (Fallback auf 'concept')."""
types = self.registry.get("types", {})
if requested and requested in types: return requested
return "concept"
def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]:
"""Holt die Chunker-Parameter für ein spezifisches Profil aus der Registry."""
profiles = self.registry.get("chunking_profiles", {})
if profile_name in profiles:
cfg = profiles[profile_name].copy()
if "overlap" in cfg and isinstance(cfg["overlap"], list):
cfg["overlap"] = tuple(cfg["overlap"])
return cfg
return get_chunk_config(note_type)
async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]:
"""
KI-Extraktion mit aktiver Erkennung von Cloud-Ablehnungen (Policy Violations).
Erzwingt bei leeren Cloud-Antworten einen automatischen lokalen Ollama-Fallback.
"""
provider = self.settings.MINDNET_LLM_PROVIDER
model = self.settings.OPENROUTER_MODEL if provider == "openrouter" else self.settings.GEMINI_MODEL
logger.info(f"🚀 [Ingestion] Turbo-Mode: Extracting edges for '{note_id}' using {model} on {provider}")
edge_registry.ensure_latest()
valid_types_str = ", ".join(sorted(list(edge_registry.valid_types)))
template = self.llm.get_prompt("edge_extraction", provider)
try:
# Sicherheits-Check: Formatierung des Templates gegen KeyError schützen
try:
# Wir senden max 6000 Zeichen (ca. 1500 Token) an das LLM für die Extraktion
prompt = template.format(
text=text[:6000],
note_id=note_id,
valid_types=valid_types_str
)
except KeyError as ke:
logger.error(f"❌ [Ingestion] Prompt-Template Fehler (Variable {ke} fehlt).")
return []
# Schritt 1: Anfrage an den primären Provider (Cloud)
response_json = await self.llm.generate_raw_response(
prompt=prompt, priority="background", force_json=True,
provider=provider, model_override=model
)
# Nutzt den verbesserten Mistral-sicheren JSON-Extraktor
raw_data = extract_json_from_response(response_json)
# FALLBACK-LOGIK: Wenn Cloud leer liefert (Policy Violation / No data training), erzwinge lokal
if not raw_data and provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED:
logger.warning(
f"🛑 [Ingestion] Cloud-Provider '{provider}' lieferte keine Daten für {note_id}. "
f"Mögliche Policy Violation. Erzwinge LOKALEN FALLBACK via Ollama..."
)
response_json = await self.llm.generate_raw_response(
prompt=prompt, priority="background", force_json=True,
provider="ollama"
)
raw_data = extract_json_from_response(response_json)
# Recovery: Suche nach Listen in Dictionaries (z.B. {"matches": [...]})
if isinstance(raw_data, dict):
logger.info(f" [Ingestion] LLM returned dict, trying recovery for {note_id}")
found_list = False
for k in ["edges", "links", "results", "kanten", "matches", "edge_list"]:
if k in raw_data and isinstance(raw_data[k], list):
raw_data = raw_data[k]
found_list = True
break
# Ultimativer Dict-Fallback: Key-Value Paare als Kanten interpretieren
if not found_list:
new_list = []
for k, v in raw_data.items():
if isinstance(v, str): new_list.append(f"{k}:{v}")
elif isinstance(v, list):
for target in v:
if isinstance(target, str): new_list.append(f"{k}:{target}")
raw_data = new_list
if not isinstance(raw_data, list) or not raw_data:
logger.warning(f"⚠️ [Ingestion] LLM lieferte keine extrahierbaren Kanten für {note_id}")
return []
processed = []
for item in raw_data:
# Erkennt sowohl Dict als auch String ["kind:target"]
if isinstance(item, dict) and "to" in item:
item["provenance"] = "semantic_ai"
item["line"] = f"ai-{provider}"
processed.append(item)
elif isinstance(item, str) and ":" in item:
parts = item.split(":", 1)
processed.append({
"to": parts[1].strip(),
"kind": parts[0].strip(),
"provenance": "semantic_ai",
"line": f"ai-{provider}"
})
return processed
except Exception as e:
logger.warning(f"⚠️ [Ingestion] Smart Edge Allocation failed for {note_id}: {e}")
return []
async def process_file(
self, file_path: str, vault_root: str,
force_replace: bool = False, apply: bool = False, purge_before: bool = False,
note_scope_refs: bool = False, hash_source: str = "parsed", hash_normalize: str = "canonical"
) -> Dict[str, Any]:
"""Transformiert eine Markdown-Datei in den Graphen (Notes, Chunks, Edges)."""
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
# 1. Parse & Lifecycle Gate
try:
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"}
status = fm.get("status", "draft").lower().strip()
if status in ["system", "template", "archive", "hidden"]:
return {**result, "status": "skipped", "reason": f"lifecycle_{status}"}
# 2. Config Resolution & Payload Construction
note_type = self._resolve_note_type(fm.get("type"))
fm["type"] = note_type
try:
note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path)
note_id = note_pl["note_id"]
except Exception as e:
return {**result, "error": f"Payload failed: {str(e)}"}
# 3. Change Detection
old_payload = None if force_replace else self._fetch_note_payload(note_id)
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
new_hash = note_pl.get("hashes", {}).get(check_key)
# Prüfung auf fehlende Artefakte in Qdrant
chunks_missing, edges_missing = self._artifacts_missing(note_id)
should_write = force_replace or (not old_payload) or (old_hash != new_hash) or chunks_missing or edges_missing
if not should_write:
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 4. Processing (Chunking, Embedding, AI Edges)
try:
body_text = getattr(parsed, "body", "") or ""
edge_registry.ensure_latest()
# Profil-gesteuertes Chunking
profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard"
chunk_cfg = self._get_chunk_config_by_profile(profile, note_type)
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg)
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
# Vektorisierung
vecs = []
if chunk_pls:
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
vecs = await self.embedder.embed_documents(texts)
# Kanten-Extraktion
edges = []
context = {"file": file_path, "note_id": note_id}
# A. Explizite Kanten (User)
for e in extract_edges_with_context(parsed):
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
edges.append(e)
# B. KI Kanten (Turbo)
ai_edges = await self._perform_smart_edge_allocation(body_text, note_id)
for e in ai_edges:
valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
e["kind"] = valid_kind
edges.append(e)
# C. System Kanten (Struktur)
try:
sys_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs)
except:
sys_edges = build_edges_for_note(note_id, chunk_pls)
for e in sys_edges:
valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
if valid_kind:
e["kind"] = valid_kind
edges.append(e)
except Exception as e:
logger.error(f"Processing failed for {file_path}: {e}", exc_info=True)
return {**result, "error": f"Processing failed: {str(e)}"}
# 5. DB Upsert
try:
if purge_before and old_payload: self._purge_artifacts(note_id)
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts)
if chunk_pls and vecs:
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, c_name, c_pts)
if edges:
e_name, e_pts = points_for_edges(self.prefix, edges)
upsert_batch(self.client, e_name, e_pts)
return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)}
except Exception as e:
return {**result, "error": f"DB Upsert failed: {e}"}
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
from qdrant_client.http import models as rest
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = self.client.scroll(collection_name=f"{self.prefix}_notes", scroll_filter=f, limit=1, with_payload=True)
return pts[0].payload if pts else None
except: return None
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges."""
from qdrant_client.http import models as rest
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
c_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_chunks", scroll_filter=f, limit=1)
e_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_edges", scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts))
except: return True, True
def _purge_artifacts(self, note_id: str):
from qdrant_client.http import models as rest
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for suffix in ["chunks", "edges"]:
try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f))
except: pass
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Hilfsmethode zur Erstellung einer Note aus einem Textstream."""
target_dir = os.path.join(vault_root, folder)
os.makedirs(target_dir, exist_ok=True)
file_path = os.path.join(target_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)