mindnet/app/core/ingestion.py
Lars b0d73cb053
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
app/core/ingestion.py aktualisiert
2025-12-25 21:46:40 +01:00

390 lines
18 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/core/ingestion.py
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen.
WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free).
WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash.
FIX: Deep Fallback Logic (v2.11.14). Erkennt Policy Violations auch in validen
JSON-Objekten und erzwingt den lokalen Ollama-Sprung, um Kantenverlust
bei umfangreichen Protokollen zu verhindern.
VERSION: 2.11.14
STATUS: Active
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry
"""
import os
import json
import re
import logging
import asyncio
import time
from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
extract_edges_with_context,
)
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks, get_chunk_config
from app.core.chunk_payload import make_chunk_payloads
# Fallback für Edges
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
def build_edges_for_note(*args, **kwargs): return []
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService
logger = logging.getLogger(__name__)
# --- Global Helpers ---
def extract_json_from_response(text: str) -> Any:
"""
Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (Mistral/Llama).
Entfernt <s>, [OUT], [/OUT] und Markdown-Blöcke für maximale Robustheit.
"""
if not text or not isinstance(text, str):
return []
# 1. Entferne Mistral/Llama Steuerzeichen und Tags
clean = text.replace("<s>", "").replace("</s>", "")
clean = clean.replace("[OUT]", "").replace("[/OUT]", "")
clean = clean.strip()
# 2. Suche nach Markdown JSON-Blöcken (```json ... ```)
match = re.search(r"```(?:json)?\s*(.*?)\s*```", clean, re.DOTALL)
payload = match.group(1) if match else clean
try:
return json.loads(payload.strip())
except json.JSONDecodeError:
# 3. Recovery: Suche nach der ersten [ und letzten ] (Liste)
start = payload.find('[')
end = payload.rfind(']') + 1
if start != -1 and end > start:
try:
return json.loads(payload[start:end])
except: pass
# 4. Zweite Recovery: Suche nach der ersten { und letzten } (Objekt)
start_obj = payload.find('{')
end_obj = payload.rfind('}') + 1
if start_obj != -1 and end_obj > start_obj:
try:
return json.loads(payload[start_obj:end_obj])
except: pass
return []
def load_type_registry(custom_path: Optional[str] = None) -> dict:
"""Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion."""
import yaml
from app.config import get_settings
settings = get_settings()
path = custom_path or settings.MINDNET_TYPES_FILE
if not os.path.exists(path): return {}
try:
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
except Exception: return {}
# --- Service Class ---
class IngestionService:
def __init__(self, collection_prefix: str = None):
from app.config import get_settings
self.settings = get_settings()
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
self.dim = self.settings.VECTOR_SIZE
self.registry = load_type_registry()
self.embedder = EmbeddingsClient()
self.llm = LLMService()
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
try:
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB init warning: {e}")
def _resolve_note_type(self, requested: Optional[str]) -> str:
"""Bestimmt den finalen Notiz-Typ (Fallback auf 'concept')."""
types = self.registry.get("types", {})
if requested and requested in types: return requested
return "concept"
def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]:
"""Holt die Chunker-Parameter für ein spezifisches Profil aus der Registry."""
profiles = self.registry.get("chunking_profiles", {})
if profile_name in profiles:
cfg = profiles[profile_name].copy()
if "overlap" in cfg and isinstance(cfg["overlap"], list):
cfg["overlap"] = tuple(cfg["overlap"])
return cfg
return get_chunk_config(note_type)
async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]:
"""
KI-Extraktion mit Deep-Fallback Logik.
Erzwingt den lokalen Ollama-Sprung, wenn die Cloud-Antwort keine verwertbaren
Kanten liefert (häufig bei Policy Violations auf OpenRouter).
"""
provider = self.settings.MINDNET_LLM_PROVIDER
model = self.settings.OPENROUTER_MODEL if provider == "openrouter" else self.settings.GEMINI_MODEL
logger.info(f"🚀 [Ingestion] Turbo-Mode: Extracting edges for '{note_id}' using {model} on {provider}")
edge_registry.ensure_latest()
valid_types_str = ", ".join(sorted(list(edge_registry.valid_types)))
template = self.llm.get_prompt("edge_extraction", provider)
try:
try:
# Wir begrenzen den Kontext auf 6000 Zeichen (ca. 1500 Token)
prompt = template.format(
text=text[:6000],
note_id=note_id,
valid_types=valid_types_str
)
except KeyError as ke:
logger.error(f"❌ [Ingestion] Prompt-Template Fehler (Variable {ke} fehlt).")
return []
# 1. Versuch: Anfrage an den primären Cloud-Provider
response_json = await self.llm.generate_raw_response(
prompt=prompt, priority="background", force_json=True,
provider=provider, model_override=model
)
# Initiales Parsing
raw_data = extract_json_from_response(response_json)
# 2. Dictionary Recovery (Versuche Liste aus Dict zu extrahieren)
candidates = []
if isinstance(raw_data, list):
candidates = raw_data
elif isinstance(raw_data, dict):
logger.info(f" [Ingestion] LLM returned dict, checking for embedded lists in {note_id}")
for k in ["edges", "links", "results", "kanten", "matches", "edge_list"]:
if k in raw_data and isinstance(raw_data[k], list):
candidates = raw_data[k]
break
# Wenn immer noch keine Liste gefunden, versuche Key-Value Paare (Dict Recovery)
if not candidates:
for k, v in raw_data.items():
if isinstance(v, str): candidates.append(f"{k}:{v}")
elif isinstance(v, list): [candidates.append(f"{k}:{i}") for i in v if isinstance(i, str)]
# 3. DEEP FALLBACK: Wenn nach allen Recovery-Versuchen die Liste leer ist UND wir in der Cloud waren
# Triggert den Fallback bei "Data Policy Violations" (leere oder Fehler-JSONs).
if not candidates and provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED:
logger.warning(
f"🛑 [Ingestion] Cloud-Antwort für {note_id} lieferte keine verwertbaren Kanten. "
f"Mögliche Policy Violation oder Refusal. Erzwinge LOKALEN FALLBACK via Ollama..."
)
response_json_local = await self.llm.generate_raw_response(
prompt=prompt, priority="background", force_json=True, provider="ollama"
)
raw_data_local = extract_json_from_response(response_json_local)
# Wiederhole Recovery für lokale Antwort
if isinstance(raw_data_local, list):
candidates = raw_data_local
elif isinstance(raw_data_local, dict):
for k in ["edges", "links", "results"]:
if k in raw_data_local and isinstance(raw_data_local[k], list):
candidates = raw_data_local[k]; break
if not candidates:
logger.warning(f"⚠️ [Ingestion] Auch nach Fallback keine extrahierbaren Kanten für {note_id}")
return []
processed = []
for item in candidates:
if isinstance(item, dict) and "to" in item:
item["provenance"] = "semantic_ai"
item["line"] = f"ai-{provider}"
processed.append(item)
elif isinstance(item, str) and ":" in item:
parts = item.split(":", 1)
processed.append({
"to": parts[1].strip(),
"kind": parts[0].strip(),
"provenance": "semantic_ai",
"line": f"ai-{provider}"
})
return processed
except Exception as e:
logger.warning(f"⚠️ [Ingestion] Smart Edge Allocation failed for {note_id}: {e}")
return []
async def process_file(
self, file_path: str, vault_root: str,
force_replace: bool = False, apply: bool = False, purge_before: bool = False,
note_scope_refs: bool = False, hash_source: str = "parsed", hash_normalize: str = "canonical"
) -> Dict[str, Any]:
"""Transformiert eine Markdown-Datei in den Graphen (Notes, Chunks, Edges)."""
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
# 1. Parse & Lifecycle Gate
try:
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"}
# WP-22: Filter für Systemdateien und Entwürfe
status = fm.get("status", "draft").lower().strip()
if status in ["system", "template", "archive", "hidden"]:
return {**result, "status": "skipped", "reason": f"lifecycle_{status}"}
# 2. Config Resolution & Payload Construction
note_type = self._resolve_note_type(fm.get("type"))
fm["type"] = note_type
try:
note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path)
note_id = note_pl["note_id"]
except Exception as e:
return {**result, "error": f"Payload failed: {str(e)}"}
# 3. Change Detection (Strikte DoD Umsetzung)
old_payload = None if force_replace else self._fetch_note_payload(note_id)
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
new_hash = note_pl.get("hashes", {}).get(check_key)
# Prüfung auf fehlende Artefakte in Qdrant
chunks_missing, edges_missing = self._artifacts_missing(note_id)
should_write = force_replace or (not old_payload) or (old_hash != new_hash) or chunks_missing or edges_missing
if not should_write:
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 4. Processing (Chunking, Embedding, AI Edges)
try:
body_text = getattr(parsed, "body", "") or ""
edge_registry.ensure_latest()
# Profil-gesteuertes Chunking
profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard"
chunk_cfg = self._get_chunk_config_by_profile(profile, note_type)
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg)
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
# Vektorisierung
vecs = []
if chunk_pls:
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
vecs = await self.embedder.embed_documents(texts)
# Kanten-Extraktion
edges = []
context = {"file": file_path, "note_id": note_id}
# A. Explizite Kanten (User / Wikilinks)
for e in extract_edges_with_context(parsed):
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
edges.append(e)
# B. KI Kanten (Turbo Mode mit v2.11.14 Fallback)
ai_edges = await self._perform_smart_edge_allocation(body_text, note_id)
for e in ai_edges:
valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
e["kind"] = valid_kind
edges.append(e)
# C. System Kanten (Struktur)
try:
sys_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs)
except:
sys_edges = build_edges_for_note(note_id, chunk_pls)
for e in sys_edges:
valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
if valid_kind:
e["kind"] = valid_kind
edges.append(e)
except Exception as e:
logger.error(f"Processing failed for {file_path}: {e}", exc_info=True)
return {**result, "error": f"Processing failed: {str(e)}"}
# 5. DB Upsert
try:
if purge_before and old_payload: self._purge_artifacts(note_id)
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts)
if chunk_pls and vecs:
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, c_name, c_pts)
if edges:
e_name, e_pts = points_for_edges(self.prefix, edges)
upsert_batch(self.client, e_name, e_pts)
return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)}
except Exception as e:
return {**result, "error": f"DB Upsert failed: {e}"}
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
"""Holt die Metadaten einer Note aus Qdrant."""
from qdrant_client.http import models as rest
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = self.client.scroll(collection_name=f"{self.prefix}_notes", scroll_filter=f, limit=1, with_payload=True)
return pts[0].payload if pts else None
except: return None
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges."""
from qdrant_client.http import models as rest
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
c_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_chunks", scroll_filter=f, limit=1)
e_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_edges", scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts))
except: return True, True
def _purge_artifacts(self, note_id: str):
"""Löscht verwaiste Chunks/Edges vor einem Re-Import."""
from qdrant_client.http import models as rest
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for suffix in ["chunks", "edges"]:
try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f))
except: pass
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Hilfsmethode zur Erstellung einer Note aus einem Textstream."""
target_dir = os.path.join(vault_root, folder)
os.makedirs(target_dir, exist_ok=True)
file_path = os.path.join(target_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)