mindnet/app/core/ingestion.py

355 lines
16 KiB
Python

"""
FILE: app/core/ingestion.py
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen.
WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free).
WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash.
FIX: Finale Mistral-Härtung (<s> & [OUT] Tags), robuste JSON-Recovery & DoD-Sync.
VERSION: 2.11.11
STATUS: Active
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry
"""
import os
import json
import re
import logging
import asyncio
import time
from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
extract_edges_with_context,
)
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks, get_chunk_config
from app.core.chunk_payload import make_chunk_payloads
# Fallback für Edges
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
def build_edges_for_note(*args, **kwargs): return []
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService
logger = logging.getLogger(__name__)
# --- Global Helpers ---
def extract_json_from_response(text: str) -> Any:
"""
Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (Mistral/Llama).
Entfernt <s>, [OUT], [/OUT] und Markdown-Blöcke für maximale Robustheit.
"""
if not text: return []
# 1. Entferne Mistral/Llama Steuerzeichen und Tags
clean = text.replace("<s>", "").replace("</s>", "")
clean = clean.replace("[OUT]", "").replace("[/OUT]", "")
clean = clean.strip()
# 2. Suche nach Markdown JSON-Blöcken (```json ... ```)
match = re.search(r"```(?:json)?\s*(.*?)\s*```", clean, re.DOTALL)
payload = match.group(1) if match else clean
try:
return json.loads(payload.strip())
except json.JSONDecodeError:
# 3. Recovery: Suche nach der ersten [ und letzten ] (Liste)
start = payload.find('[')
end = payload.rfind(']') + 1
if start != -1 and end > start:
try:
return json.loads(payload[start:end])
except: pass
# 4. Zweite Recovery: Suche nach der ersten { und letzten } (Objekt)
start_obj = payload.find('{')
end_obj = payload.rfind('}') + 1
if start_obj != -1 and end_obj > start_obj:
try:
return json.loads(payload[start_obj:end_obj])
except: pass
return []
def load_type_registry(custom_path: Optional[str] = None) -> dict:
"""Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion."""
import yaml
from app.config import get_settings
settings = get_settings()
path = custom_path or settings.MINDNET_TYPES_FILE
if not os.path.exists(path): return {}
try:
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
except Exception: return {}
# --- Service Class ---
class IngestionService:
def __init__(self, collection_prefix: str = None):
from app.config import get_settings
self.settings = get_settings()
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
self.dim = self.settings.VECTOR_SIZE
self.registry = load_type_registry()
self.embedder = EmbeddingsClient()
self.llm = LLMService()
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
try:
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB init warning: {e}")
def _resolve_note_type(self, requested: Optional[str]) -> str:
"""Bestimmt den finalen Notiz-Typ (Fallback auf 'concept')."""
types = self.registry.get("types", {})
if requested and requested in types: return requested
return "concept"
def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]:
"""Holt die Chunker-Parameter für ein spezifisches Profil aus der Registry."""
profiles = self.registry.get("chunking_profiles", {})
if profile_name in profiles:
cfg = profiles[profile_name].copy()
if "overlap" in cfg and isinstance(cfg["overlap"], list):
cfg["overlap"] = tuple(cfg["overlap"])
return cfg
return get_chunk_config(note_type)
async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]:
"""
WP-20: Nutzt das Hybrid LLM für die semantische Kanten-Extraktion.
Respektiert die Provider-Einstellung (OpenRouter Primary).
"""
provider = self.settings.MINDNET_LLM_PROVIDER
model = self.settings.OPENROUTER_MODEL if provider == "openrouter" else self.settings.GEMINI_MODEL
logger.info(f"🚀 [Ingestion] Turbo-Mode: Extracting edges for '{note_id}' using {model} on {provider}")
edge_registry.ensure_latest()
valid_types_str = ", ".join(sorted(list(edge_registry.valid_types)))
template = self.llm.get_prompt("edge_extraction", provider)
try:
# Sicherheits-Check: Formatierung des Templates gegen KeyError schützen
try:
# Nutzt die ersten 6000 Zeichen als Kontext-Fenster
prompt = template.format(
text=text[:6000],
note_id=note_id,
valid_types=valid_types_str
)
except KeyError as ke:
logger.error(f"❌ [Ingestion] Prompt-Template Fehler (Variable {ke} fehlt).")
return []
response_json = await self.llm.generate_raw_response(
prompt=prompt, priority="background", force_json=True,
provider=provider, model_override=model
)
# Nutzt den verbesserten Mistral-sicheren JSON-Extraktor
raw_data = extract_json_from_response(response_json)
# Recovery: Suche nach Listen in Dictionaries (z.B. {"edges": [...]})
if isinstance(raw_data, dict):
for k in ["edges", "links", "results", "kanten"]:
if k in raw_data and isinstance(raw_data[k], list):
raw_data = raw_data[k]
break
if not isinstance(raw_data, list):
logger.warning(f"⚠️ [Ingestion] LLM lieferte keine Liste für {note_id}")
return []
processed = []
for item in raw_data:
# Fix für 'str' object assignment error: Erkennt sowohl Dict als auch String ["kind:target"]
if isinstance(item, dict) and "to" in item:
item["provenance"] = "semantic_ai"
item["line"] = f"ai-{provider}"
processed.append(item)
elif isinstance(item, str) and ":" in item:
parts = item.split(":", 1)
processed.append({
"to": parts[1].strip(),
"kind": parts[0].strip(),
"provenance": "semantic_ai",
"line": f"ai-{provider}"
})
return processed
except Exception as e:
logger.warning(f"⚠️ [Ingestion] Smart Edge Allocation failed for {note_id}: {e}")
return []
async def process_file(
self, file_path: str, vault_root: str,
force_replace: bool = False, apply: bool = False, purge_before: bool = False,
note_scope_refs: bool = False, hash_source: str = "parsed", hash_normalize: str = "canonical"
) -> Dict[str, Any]:
"""Transformiert eine Markdown-Datei in den Graphen (Notes, Chunks, Edges)."""
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
# 1. Parse & Lifecycle Gate
try:
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"}
status = fm.get("status", "draft").lower().strip()
if status in ["system", "template", "archive", "hidden"]:
return {**result, "status": "skipped", "reason": f"lifecycle_{status}"}
# 2. Config Resolution & Payload Construction
note_type = self._resolve_note_type(fm.get("type"))
fm["type"] = note_type
try:
note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path)
note_id = note_pl["note_id"]
except Exception as e:
return {**result, "error": f"Payload failed: {str(e)}"}
# 3. Change Detection (Strikte DoD Umsetzung: Kein Shortcut)
old_payload = None if force_replace else self._fetch_note_payload(note_id)
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
new_hash = note_pl.get("hashes", {}).get(check_key)
# Prüfung auf fehlende Artefakte in Qdrant
chunks_missing, edges_missing = self._artifacts_missing(note_id)
should_write = force_replace or (not old_payload) or (old_hash != new_hash) or chunks_missing or edges_missing
if not should_write:
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 4. Processing (Chunking, Embedding, AI Edges)
try:
body_text = getattr(parsed, "body", "") or ""
edge_registry.ensure_latest()
# Profil-gesteuertes Chunking
profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard"
chunk_cfg = self._get_chunk_config_by_profile(profile, note_type)
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg)
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
# Vektorisierung
vecs = []
if chunk_pls:
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
vecs = await self.embedder.embed_documents(texts)
# Kanten-Extraktion
edges = []
context = {"file": file_path, "note_id": note_id}
# A. Explizite Kanten (User)
for e in extract_edges_with_context(parsed):
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
edges.append(e)
# B. KI Kanten (Turbo)
ai_edges = await self._perform_smart_edge_allocation(body_text, note_id)
for e in ai_edges:
valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
e["kind"] = valid_kind
edges.append(e)
# C. System Kanten (Struktur)
try:
sys_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs)
except:
sys_edges = build_edges_for_note(note_id, chunk_pls)
for e in sys_edges:
valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
if valid_kind:
e["kind"] = valid_kind
edges.append(e)
except Exception as e:
logger.error(f"Processing failed for {file_path}: {e}", exc_info=True)
return {**result, "error": f"Processing failed: {str(e)}"}
# 5. DB Upsert
try:
if purge_before and old_payload: self._purge_artifacts(note_id)
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts)
if chunk_pls and vecs:
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, c_name, c_pts)
if edges:
e_name, e_pts = points_for_edges(self.prefix, edges)
upsert_batch(self.client, e_name, e_pts)
return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)}
except Exception as e:
return {**result, "error": f"DB Upsert failed: {e}"}
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
from qdrant_client.http import models as rest
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = self.client.scroll(collection_name=f"{self.prefix}_notes", scroll_filter=f, limit=1, with_payload=True)
return pts[0].payload if pts else None
except: return None
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges (Kein Shortcut)."""
from qdrant_client.http import models as rest
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
c_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_chunks", scroll_filter=f, limit=1)
e_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_edges", scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts))
except: return True, True
def _purge_artifacts(self, note_id: str):
from qdrant_client.http import models as rest
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for suffix in ["chunks", "edges"]:
try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f))
except: pass
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Hilfsmethode zur Erstellung einer Note aus einem Textstream."""
target_dir = os.path.join(vault_root, folder)
os.makedirs(target_dir, exist_ok=True)
file_path = os.path.join(target_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)