mindnet/app/core/ingestion/ingestion_processor.py
Lars 9b0d8c18cb Implement LLM validation for candidate edges in ingestion_processor.py
Enhance the edge validation process by introducing logic to validate edges with rule IDs starting with "candidate:". This includes extracting target IDs, validating against the entire note text, and updating rule IDs upon successful validation. Rejected edges are logged for traceability, improving the overall handling of edge data during ingestion.
2026-01-11 21:27:07 +01:00

410 lines
21 KiB
Python

"""
FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v4.2.4:
- GOLD-STANDARD v4.2.4: Hash-basierte Change-Detection (MINDNET_CHANGE_DETECTION_MODE).
- Wiederherstellung des iterativen Abgleichs basierend auf Inhalts-Hashes.
- Phase 2 verwendet exakt dieselbe ID-Generierung wie Phase 1 (inkl. target_section).
- Authority-Check in Phase 2 prüft mit konsistenter ID-Generierung.
- Eliminiert Duplikate durch inkonsistente ID-Generierung (Steinzeitaxt-Problem).
VERSION: 4.2.4 (WP-24c: Hash-Integrität)
STATUS: Active
"""
import logging
import asyncio
import os
import re
from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
from app.core.parser import (
read_markdown, pre_scan_markdown, normalize_frontmatter,
validate_required_frontmatter, NoteContext
)
from app.core.chunking import assemble_chunks
# WP-24c: Import der zentralen Identitäts-Logik
from app.core.graph.graph_utils import _mk_edge_id
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest
# Services
from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService
# Package-Interne Imports (Refactoring WP-14)
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts, is_explicit_edge_present
from .ingestion_validation import validate_edge_candidate
from .ingestion_note_payload import make_note_payload
from .ingestion_chunk_payload import make_chunk_payloads
# Fallback für Edges (Struktur-Verknüpfung)
try:
from app.core.graph.graph_derive_edges import build_edges_for_note
except ImportError:
def build_edges_for_note(*args, **kwargs): return []
logger = logging.getLogger(__name__)
class IngestionService:
def __init__(self, collection_prefix: str = None):
"""Initialisiert den Service und nutzt die neue database-Infrastruktur."""
from app.config import get_settings
self.settings = get_settings()
# --- LOGGING CLEANUP ---
# Unterdrückt Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
logging.getLogger(lib).setLevel(logging.WARNING)
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
self.registry = load_type_registry()
self.embedder = EmbeddingsClient()
self.llm = LLMService()
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
# WP-15b: Kontext-Gedächtnis für ID-Auflösung (Globaler Cache)
self.batch_cache: Dict[str, NoteContext] = {}
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
self.symmetry_buffer: List[Dict[str, Any]] = []
try:
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB initialization warning: {e}")
def _is_valid_id(self, text: Optional[str]) -> bool:
"""WP-24c: Prüft IDs auf fachliche Validität (Ghost-ID Schutz)."""
if not text or not isinstance(text, str) or len(text.strip()) < 2:
return False
blacklisted = {"none", "unknown", "insight", "source", "task", "project", "person", "concept"}
if text.lower().strip() in blacklisted:
return False
return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
"""
WP-15b: Phase 1 des Two-Pass Workflows.
Verarbeitet Batches und schreibt NUR Nutzer-Autorität (explizite Kanten).
"""
self.batch_cache.clear()
logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---")
# 1. Schritt: Pre-Scan (Context-Cache füllen)
for path in file_paths:
try:
ctx = pre_scan_markdown(path, registry=self.registry)
if ctx:
self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx
self.batch_cache[os.path.splitext(os.path.basename(path))[0]] = ctx
except Exception as e:
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
# 2. Schritt: Batch Processing (Authority Only)
processed_count = 0
success_count = 0
for p in file_paths:
processed_count += 1
res = await self.process_file(p, vault_root, apply=True, purge_before=True)
if res.get("status") == "success":
success_count += 1
logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---")
return {
"status": "success",
"processed": processed_count,
"success": success_count,
"buffered_symmetries": len(self.symmetry_buffer)
}
async def commit_vault_symmetries(self) -> Dict[str, Any]:
"""
WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus.
Wird am Ende des gesamten Imports aufgerufen.
"""
if not self.symmetry_buffer:
return {"status": "skipped", "reason": "buffer_empty"}
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrien gegen Live-DB...")
final_virtuals = []
for v_edge in self.symmetry_buffer:
# WP-24c v4.1.0: Korrekte Extraktion der Identitäts-Parameter
src = v_edge.get("source_id") or v_edge.get("note_id") # source_id hat Priorität
tgt = v_edge.get("target_id")
kind = v_edge.get("kind")
scope = v_edge.get("scope", "note")
target_section = v_edge.get("target_section") # WP-24c v4.1.0: target_section berücksichtigen
if not all([src, tgt, kind]):
continue
# WP-24c v4.1.0: Nutzung der zentralisierten ID-Logik aus graph_utils
# GOLD-STANDARD v4.1.0: ID-Generierung muss absolut synchron zu Phase 1 sein
# - Wenn target_section vorhanden, muss es in die ID einfließen
# - Dies stellt sicher, dass der Authority-Check korrekt funktioniert
try:
v_id = _mk_edge_id(kind, src, tgt, scope, target_section=target_section)
except ValueError:
continue
# AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante existiert
# Prüft mit exakt derselben ID, die in Phase 1 verwendet wurde (inkl. target_section)
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
section_info = f" (section: {target_section})" if target_section else ""
logger.info(f" 🔄 [SYMMETRY] Add inverse: {src} --({kind})--> {tgt}{section_info}")
else:
logger.info(f" 🛡️ [PROTECTED] Manuelle Kante gefunden. Symmetrie für {kind} unterdrückt.")
if final_virtuals:
col, pts = points_for_edges(self.prefix, final_virtuals)
upsert_batch(self.client, col, pts, wait=True)
count = len(final_virtuals)
self.symmetry_buffer.clear()
return {"status": "success", "added": count}
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
"""
Transformiert eine Markdown-Datei (Phase 1).
Schreibt Notes/Chunks/Explicit Edges sofort.
"""
apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False)
purge_before = kwargs.get("purge_before", False)
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
try:
# Ordner-Filter (.trash / .obsidian)
if ".trash" in file_path or any(part.startswith('.') for part in file_path.split(os.sep)):
return {**result, "status": "skipped", "reason": "ignored_folder"}
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
note_id = note_pl.get("note_id")
if not note_id:
return {**result, "status": "error", "error": "missing_id"}
logger.info(f"📄 Bearbeite: '{note_id}'")
# Change Detection (WP-24c v4.2.4: Hash-basierte Inhaltsprüfung)
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
content_changed = True
if old_payload and not force_replace:
# Nutzt die über MINDNET_CHANGE_DETECTION_MODE gesteuerte Genauigkeit
# Mapping: 'full' -> 'full:parsed:canonical', 'body' -> 'body:parsed:canonical'
h_key = f"{self.active_hash_mode or 'full'}:parsed:canonical"
new_h = note_pl.get("hashes", {}).get(h_key)
old_h = old_payload.get("hashes", {}).get(h_key)
if new_h and old_h and new_h == old_h:
content_changed = False
if not (force_replace or content_changed or not old_payload or c_miss or e_miss):
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# Chunks & MoE
profile = note_pl.get("chunk_profile", "sliding_standard")
note_type = resolve_note_type(self.registry, fm.get("type"))
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg)
for ch in chunks:
new_pool = []
for cand in getattr(ch, "candidate_pool", []):
# WP-24c v4.4.1: Harmonisierung - akzeptiere sowohl "to" als auch "target_id"
# Der chunking_processor verwendet "to", daher muss die Validierung beide Keys unterstützen
t_id = cand.get('target_id') or cand.get('to') or cand.get('note_id')
if not self._is_valid_id(t_id): continue
# WP-24c v4.4.1: explicit:callout Kanten werden NICHT validiert (bereits präzise)
# Sie müssen den Pool passieren, damit sie in Phase 1 erkannt werden
if cand.get("provenance") == "global_pool" and enable_smart:
is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm)
if is_valid: new_pool.append(cand)
else:
# WP-24c v4.4.1: Alle anderen Provenances (inkl. explicit:callout) passieren ohne Validierung
new_pool.append(cand)
ch.candidate_pool = new_pool
# chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
# v4.2.8 Fix C: Explizite Übergabe des Profil-Namens für den Chunk-Payload
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry, chunk_profile=profile)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# WP-24c v4.2.0: Kanten-Extraktion mit Note-Scope Zonen Support
# Übergabe des Original-Markdown-Texts für Note-Scope Zonen-Extraktion
markdown_body = getattr(parsed, "body", "")
raw_edges = build_edges_for_note(
note_id,
chunk_pls,
note_level_references=note_pl.get("references", []),
markdown_body=markdown_body
)
# WP-24c v4.5.8: Phase 3 - LLM-Validierung für candidate: Kanten
# Prüfe alle Kanten mit rule_id beginnend mit "candidate:"
# Verwende den gesamten Note-Text für die Validierung
note_text = markdown_body or " ".join([c.get("text", "") or c.get("window", "") for c in chunk_pls])
validated_edges = []
rejected_edges = []
for e in raw_edges:
rule_id = e.get("rule_id", "")
# WP-24c v4.5.8: Trigger-Logik basierend auf rule_id (nicht provenance)
if rule_id and rule_id.startswith("candidate:"):
# Extrahiere target_id für Validierung (aus verschiedenen möglichen Feldern)
target_id = e.get("target_id") or e.get("to")
if not target_id:
# Fallback: Versuche aus Payload zu extrahieren
payload = e.get("extra", {}) if isinstance(e.get("extra"), dict) else {}
target_id = payload.get("target_id") or payload.get("to")
if not target_id:
logger.warning(f"⚠️ [VALIDATION] Keine target_id gefunden für Kante: {e}")
rejected_edges.append(e)
continue
kind = e.get("kind", "related_to")
source_id = e.get("source_id", note_id)
# Erstelle Edge-Dict für Validierung (kompatibel mit validate_edge_candidate)
edge_for_validation = {
"kind": kind,
"to": target_id, # validate_edge_candidate erwartet "to"
"target_id": target_id,
"provenance": e.get("provenance", "explicit"),
"confidence": e.get("confidence", 0.9)
}
logger.info(f"🚀 [VALIDATION] Prüfe Kandidat: {source_id} --{kind}--> {target_id}")
# WP-24c v4.5.8: Validiere gegen den gesamten Note-Text
is_valid = await validate_edge_candidate(
chunk_text=note_text,
edge=edge_for_validation,
batch_cache=self.batch_cache,
llm_service=self.llm,
profile_name="ingest_validator"
)
if is_valid:
# WP-24c v4.5.8: Entferne candidate: Präfix (Kante wird zum Fakt)
new_rule_id = rule_id.replace("candidate:", "").strip()
if not new_rule_id:
new_rule_id = e.get("provenance", "explicit")
# Aktualisiere rule_id im Edge (die _edge Funktion merged extra direkt ins Haupt-Dict)
e["rule_id"] = new_rule_id
validated_edges.append(e)
logger.info(f"✅ [VALIDATION] Kandidat bestätigt: {source_id} --{kind}--> {target_id} -> rule_id: {new_rule_id}")
else:
# WP-24c v4.5.8: Kante ablehnen (nicht zu validated_edges hinzufügen)
rejected_edges.append(e)
logger.info(f"🚫 [VALIDATION] Kandidat abgelehnt: {source_id} --{kind}--> {target_id}")
else:
# WP-24c v4.5.8: Keine candidate: Kante -> direkt übernehmen
validated_edges.append(e)
# WP-24c v4.5.8: Verwende validated_edges statt raw_edges für weitere Verarbeitung
explicit_edges = []
for e in validated_edges:
t_raw = e.get("target_id")
t_ctx = self.batch_cache.get(t_raw)
t_id = t_ctx.note_id if t_ctx else t_raw
if not self._is_valid_id(t_id): continue
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance="explicit")
# WP-24c v4.1.0: target_section aus dem Edge-Payload extrahieren und beibehalten
target_section = e.get("target_section")
e.update({
"kind": resolved_kind,
"relation": resolved_kind, # Konsistenz: kind und relation identisch
"target_id": t_id,
"source_id": e.get("source_id") or note_id, # Sicherstellen, dass source_id gesetzt ist
"origin_note_id": note_id,
"virtual": False
})
explicit_edges.append(e)
# Symmetrie puffern (WP-24c v4.1.0: Korrekte Symmetrie-Integrität)
inv_kind = edge_registry.get_inverse(resolved_kind)
if inv_kind and t_id != note_id:
# GOLD-STANDARD v4.1.0: Symmetrie-Integrität
v_edge = {
"note_id": t_id, # Besitzer-Wechsel: Symmetrie gehört zum Link-Ziel
"source_id": t_id, # Neue Quelle ist das Link-Ziel
"target_id": note_id, # Ziel ist die ursprüngliche Quelle
"kind": inv_kind, # Inverser Kanten-Typ
"relation": inv_kind, # Konsistenz: kind und relation identisch
"scope": "note", # Symmetrien sind immer Note-Level
"virtual": True,
"origin_note_id": note_id, # Tracking: Woher kommt die Symmetrie
}
# target_section beibehalten, falls vorhanden (für Section-Links)
if target_section:
v_edge["target_section"] = target_section
self.symmetry_buffer.append(v_edge)
# DB Upsert
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
col_n, pts_n = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, col_n, pts_n, wait=True)
if chunk_pls and vecs:
col_c, pts_c = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, col_c, pts_c, wait=True)
if explicit_edges:
col_e, pts_e = points_for_edges(self.prefix, explicit_edges)
upsert_batch(self.client, col_e, pts_e, wait=True)
logger.info(f" ✨ Phase 1 fertig: {len(explicit_edges)} explizite Kanten für '{note_id}'.")
return {"status": "success", "note_id": note_id}
except Exception as e:
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
return {**result, "status": "error", "error": str(e)}
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)