Update ingestion_processor.py to version 3.3.8: Address Ghost-ID issues, enhance Pydantic safety, and improve logging clarity. Refine symmetry injection logic and ensure strict phase separation for authority checks. Adjust comments for better understanding and maintainability.

This commit is contained in:
Lars 2026-01-10 08:32:59 +01:00
parent ec89d83916
commit 7e00344b84

View File

@ -4,11 +4,10 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. AUDIT v3.3.8: Lösung des Ghost-ID Problems & Pydantic-Crash Fix.
AUDIT v3.3.7: Strikte globale Phasentrennung. Strikte Phasentrennung (Phase 2 global am Ende).
Fix für Pydantic Crash (None-ID Guard Clauses). Wiederherstellung der LLM-Logging-Transparenz.
Erzwingung der Konsistenz (wait=True). VERSION: 3.3.8 (WP-24c: Robust Authority Enforcement)
VERSION: 3.3.7 (WP-24c: Strict Authority Commitment)
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -23,10 +22,9 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext validate_required_frontmatter, NoteContext
) )
from app.core.chunking import assemble_chunks from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische UUID-Vorabberechnung
from app.core.graph.graph_utils import _mk_edge_id from app.core.graph.graph_utils import _mk_edge_id
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene # Datenbank-Ebene (Modularisierte database-Infrastruktur)
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
@ -43,7 +41,7 @@ from .ingestion_validation import validate_edge_candidate
from .ingestion_note_payload import make_note_payload from .ingestion_note_payload import make_note_payload
from .ingestion_chunk_payload import make_chunk_payloads from .ingestion_chunk_payload import make_chunk_payloads
# Fallback für Edges (Struktur-Verknüpfung) # Fallback für Edges
try: try:
from app.core.graph.graph_derive_edges import build_edges_for_note from app.core.graph.graph_derive_edges import build_edges_for_note
except ImportError: except ImportError:
@ -53,12 +51,11 @@ logger = logging.getLogger(__name__)
class IngestionService: class IngestionService:
def __init__(self, collection_prefix: str = None): def __init__(self, collection_prefix: str = None):
"""Initialisiert den Service und nutzt die neue database-Infrastruktur.""" """Initialisiert den Service und bereinigt das technische Logging."""
from app.config import get_settings from app.config import get_settings
self.settings = get_settings() self.settings = get_settings()
# --- LOGGING CLEANUP (Business Focus) --- # --- LOGGING CLEANUP (Header-Noise unterdrücken, Business erhalten) ---
# Unterdrückt technische Bibliotheks-Header, erhält aber inhaltliche Service-Logs
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
logging.getLogger(lib).setLevel(logging.WARNING) logging.getLogger(lib).setLevel(logging.WARNING)
@ -71,49 +68,41 @@ class IngestionService:
self.embedder = EmbeddingsClient() self.embedder = EmbeddingsClient()
self.llm = LLMService() self.llm = LLMService()
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
embed_cfg = self.llm.profiles.get("embedding_expert", {}) embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
# WP-15b: Kontext-Gedächtnis für ID-Auflösung # Kontext-Gedächtnis für ID-Auflösung
self.batch_cache: Dict[str, NoteContext] = {} self.batch_cache: Dict[str, NoteContext] = {}
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion nach dem gesamten Import) # Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
self.symmetry_buffer: List[Dict[str, Any]] = [] self.symmetry_buffer: List[Dict[str, Any]] = []
try: try:
# Aufruf der modularisierten Schema-Logik
ensure_collections(self.client, self.prefix, self.dim) ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix) ensure_payload_indexes(self.client, self.prefix)
except Exception as e: except Exception as e:
logger.warning(f"DB initialization warning: {e}") logger.warning(f"DB initialization warning: {e}")
def _is_valid_note_id(self, text: Optional[str]) -> bool: def _is_valid_note_id(self, text: Optional[str]) -> bool:
""" """WP-24c: Fachliche Validitätsprüfung gegen Junk-Kanten."""
WP-24c: Prüft Ziel-Strings auf fachliche Validität.
Verhindert Müll-Kanten zu System-Platzhaltern.
"""
if not text or not isinstance(text, str) or len(text.strip()) < 2: if not text or not isinstance(text, str) or len(text.strip()) < 2:
return False return False
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by", "none", "unknown"} blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by", "none", "unknown"}
if text.lower().strip() in blacklisted: if text.lower().strip() in blacklisted:
return False return False
if len(text) > 200: return False if len(text) > 200: return False
return True return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]: async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
""" """
WP-15b: Two-Pass Ingestion Workflow (PHASE 1). WP-15b: Phase 1 des Two-Phase Ingestion Workflows.
Verarbeitet Batches und schreibt NUR Nutzer-Autorität in die DB. Verarbeitet Batches und schreibt NUR Nutzer-Autorität (physische Kanten) in die DB.
""" """
self.batch_cache.clear() self.batch_cache.clear()
logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---") logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---")
# 1. Schritt: Pre-Scan (Context-Cache füllen) # 1. Pre-Scan (ID-Gedächtnis füllen)
for path in file_paths: for path in file_paths:
try: try:
ctx = pre_scan_markdown(path, registry=self.registry) ctx = pre_scan_markdown(path, registry=self.registry)
@ -125,7 +114,7 @@ class IngestionService:
except Exception as e: except Exception as e:
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}") logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
# 2. Schritt: PROCESSING # 2. Schritt: Batch-Verarbeitung (Explicit Authority)
processed_count = 0 processed_count = 0
success_count = 0 success_count = 0
for p in file_paths: for p in file_paths:
@ -134,7 +123,7 @@ class IngestionService:
if res.get("status") == "success": if res.get("status") == "success":
success_count += 1 success_count += 1
logger.info(f"--- ✅ Batch Phase 1 beendet ({success_count}/{processed_count}) ---") logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---")
return { return {
"status": "success", "status": "success",
"processed": processed_count, "processed": processed_count,
@ -144,46 +133,40 @@ class IngestionService:
async def commit_vault_symmetries(self) -> Dict[str, Any]: async def commit_vault_symmetries(self) -> Dict[str, Any]:
""" """
WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus. WP-24c: Globale Symmetrie-Injektion (Phase 2).
Wird einmalig am Ende des gesamten Imports aufgerufen. Prüft gepufferte Kanten gegen die Instance-of-Truth in Qdrant.
Sorgt dafür, dass virtuelle Kanten erst NACH der Nutzer-Autorität geschrieben werden.
""" """
if not self.symmetry_buffer: if not self.symmetry_buffer:
logger.info("⏭️ Symmetrie-Puffer leer. Keine Aktion erforderlich.") logger.info("⏭️ Symmetrie-Puffer leer.")
return {"status": "skipped", "reason": "buffer_empty"} return {"status": "skipped", "reason": "buffer_empty"}
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Vorschläge gegen Live-DB...") logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrien gegen Live-DB...")
final_virtuals = [] final_virtuals = []
for v_edge in self.symmetry_buffer: for v_edge in self.symmetry_buffer:
# Sicherheits-Check: Keine Kanten ohne Ziele zulassen if not v_edge.get("target_id") or v_edge.get("target_id") == "None": continue
if not v_edge.get("target_id") or v_edge.get("target_id") == "None":
continue
# ID der potenziellen Symmetrie berechnen
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note")) v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note"))
# AUTHORITY-CHECK: Nur schreiben, wenn KEINE manuelle Kante in der DB existiert # Schutz der Nutzer-Autorität
if not is_explicit_edge_present(self.client, self.prefix, v_id): if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge) final_virtuals.append(v_edge)
logger.info(f" 🔄 [SYMMETRY] Erzeuge Gegenkante: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}") logger.info(f" 🔄 [SYMMETRY] Add inverse: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}")
else: else:
logger.debug(f" 🛡️ Schutz: Manuelle Kante belegt ID {v_id}. Symmetrie verworfen.") logger.debug(f" 🛡️ Schutz: Manuelle Kante belegt ID {v_id}. Symmetrie verworfen.")
added_count = 0
if final_virtuals: if final_virtuals:
logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten in Qdrant.")
e_pts = points_for_edges(self.prefix, final_virtuals)[1] e_pts = points_for_edges(self.prefix, final_virtuals)[1]
# wait=True garantiert, dass der nächste Lauf diese Kanten sofort sieht
upsert_batch(self.client, f"{self.prefix}_edges", e_pts, wait=True) upsert_batch(self.client, f"{self.prefix}_edges", e_pts, wait=True)
added_count = len(final_virtuals)
self.symmetry_buffer.clear() # Puffer leeren added = len(final_virtuals)
return {"status": "success", "added": added_count} self.symmetry_buffer.clear()
return {"status": "success", "added": added}
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
""" """
Transformiert eine Markdown-Datei. Transformiert eine Note.
Schreibt Notes/Chunks/Explicit Edges sofort (Phase 1). Implementiert strikte ID-Kanonisierung und Pydantic-Safety.
Befüllt den Symmetrie-Puffer für die globale Phase 2.
""" """
apply = kwargs.get("apply", False) apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False) force_replace = kwargs.get("force_replace", False)
@ -192,7 +175,7 @@ class IngestionService:
result = {"path": file_path, "status": "skipped", "changed": False, "error": None} result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
try: try:
# --- ORDNER-FILTER (.trash) --- # Ordner-Filter
if any(part.startswith('.') for part in file_path.split(os.sep)): if any(part.startswith('.') for part in file_path.split(os.sep)):
return {**result, "status": "skipped", "reason": "hidden_folder"} return {**result, "status": "skipped", "reason": "hidden_folder"}
@ -210,14 +193,14 @@ class IngestionService:
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry) note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
note_id = note_pl.get("note_id") note_id = note_pl.get("note_id")
# --- GUARD CLAUSE: Fehlende IDs verhindern PointStruct-Crash --- # --- HARD GUARD: Verhindert Pydantic-Crashes bei unvollständigen Notizen ---
if not note_id: if not note_id or note_id == "None":
logger.warning(f" ⚠️ Fehlende note_id in '{file_path}'. Datei wird ignoriert.") logger.warning(f" ⚠️ Ungültige note_id in '{file_path}'. Überspringe.")
return {**result, "status": "error", "error": "missing_note_id"} return {**result, "status": "error", "error": "invalid_note_id"}
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})") logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
# Change Detection & Fragment-Prüfung # Change Detection
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
if not (force_replace or not old_payload or c_miss or e_miss): if not (force_replace or not old_payload or c_miss or e_miss):
@ -226,7 +209,7 @@ class IngestionService:
if not apply: if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id} return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# Deep Processing & MoE (LLM Validierung) # LLM Validierung (Expert-MoE)
profile = note_pl.get("chunk_profile", "sliding_standard") profile = note_pl.get("chunk_profile", "sliding_standard")
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
@ -235,12 +218,11 @@ class IngestionService:
for ch in chunks: for ch in chunks:
new_pool = [] new_pool = []
for cand in getattr(ch, "candidate_pool", []): for cand in getattr(ch, "candidate_pool", []):
# --- GUARD: Ungültige Ziele im Candidate-Pool filtern ---
t_id = cand.get('target_id') or cand.get('note_id') t_id = cand.get('target_id') or cand.get('note_id')
if not self._is_valid_note_id(t_id): if not self._is_valid_note_id(t_id): continue
continue
if cand.get("provenance") == "global_pool" and enable_smart: if cand.get("provenance") == "global_pool" and enable_smart:
# LLM Logging
logger.info(f" ⚖️ [VALIDATING] Relation to '{t_id}' via Expert-LLM...") logger.info(f" ⚖️ [VALIDATING] Relation to '{t_id}' via Expert-LLM...")
is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm) is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm)
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}") logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
@ -252,28 +234,31 @@ class IngestionService:
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Kanten-Logik (Kanonisierung via batch_cache) # --- KANTEN-LOGIK MIT STRIKTER KANONISIERUNG (FIX FÜR STEINZEITAXT) ---
raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
explicit_edges = [] explicit_edges = []
for e in raw_edges: for e in raw_edges:
target_raw = e.get("target_id") target_raw = e.get("target_id")
# ID-Resolution über den Context-Cache
t_ctx = self.batch_cache.get(target_raw) t_ctx = self.batch_cache.get(target_raw)
target_id = t_ctx.note_id if t_ctx else target_raw
# Wenn das Ziel nicht im Cache ist, haben wir keine stabile note_id -> Überspringen (Ghost-ID Schutz)
if not t_ctx:
logger.debug(f" ⚠️ Linkziel '{target_raw}' nicht im Cache. Überspringe Kante.")
continue
target_id = t_ctx.note_id
if not self._is_valid_note_id(target_id): continue if not self._is_valid_note_id(target_id): continue
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit")) resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
# Echte physische Kante markieren (Phase 1 Authority) # Echte physische Kante markieren (Phase 1)
e.update({ e.update({
"kind": resolved_kind, "target_id": target_id, "kind": resolved_kind, "target_id": target_id,
"origin_note_id": note_id, "virtual": False, "confidence": 1.0 "origin_note_id": note_id, "virtual": False, "confidence": 1.0
}) })
explicit_edges.append(e) explicit_edges.append(e)
# Symmetrie-Kandidat puffern # Symmetrie puffern
inv_kind = edge_registry.get_inverse(resolved_kind) inv_kind = edge_registry.get_inverse(resolved_kind)
if inv_kind and target_id != note_id: if inv_kind and target_id != note_id:
v_edge = e.copy() v_edge = e.copy()
@ -284,7 +269,7 @@ class IngestionService:
}) })
self.symmetry_buffer.append(v_edge) self.symmetry_buffer.append(v_edge)
# 4. DB Upsert (Phase 1: Authority Commitment) # 4. DB Commit (Phase 1)
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
@ -292,7 +277,7 @@ class IngestionService:
if chunk_pls and vecs: if chunk_pls and vecs:
upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
if explicit_edges: if explicit_edges:
# Wichtig: wait=True stellt sicher, dass die Kanten in Phase 2 searchable sind # WICHTIG: wait=True für Phase-1 Konsistenz
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1], wait=True) upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1], wait=True)
logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.") logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.")