diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 7307d59..c10e6ed 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -4,11 +4,10 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. - WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.3.7: Strikte globale Phasentrennung. - Fix für Pydantic Crash (None-ID Guard Clauses). - Erzwingung der Konsistenz (wait=True). -VERSION: 3.3.7 (WP-24c: Strict Authority Commitment) + AUDIT v3.3.8: Lösung des Ghost-ID Problems & Pydantic-Crash Fix. + Strikte Phasentrennung (Phase 2 global am Ende). + Wiederherstellung der LLM-Logging-Transparenz. +VERSION: 3.3.8 (WP-24c: Robust Authority Enforcement) STATUS: Active """ import logging @@ -23,10 +22,9 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks -# WP-24c: Import für die deterministische UUID-Vorabberechnung from app.core.graph.graph_utils import _mk_edge_id -# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene +# Datenbank-Ebene (Modularisierte database-Infrastruktur) from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch from qdrant_client.http import models as rest @@ -43,7 +41,7 @@ from .ingestion_validation import validate_edge_candidate from .ingestion_note_payload import make_note_payload from .ingestion_chunk_payload import make_chunk_payloads -# Fallback für Edges (Struktur-Verknüpfung) +# Fallback für Edges try: from app.core.graph.graph_derive_edges import build_edges_for_note except ImportError: @@ -53,12 +51,11 @@ logger = logging.getLogger(__name__) class IngestionService: def __init__(self, collection_prefix: str = None): - """Initialisiert den Service und nutzt die neue database-Infrastruktur.""" + """Initialisiert den Service und bereinigt das technische Logging.""" from app.config import get_settings self.settings = get_settings() - # --- LOGGING CLEANUP (Business Focus) --- - # Unterdrückt technische Bibliotheks-Header, erhält aber inhaltliche Service-Logs + # --- LOGGING CLEANUP (Header-Noise unterdrücken, Business erhalten) --- for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: logging.getLogger(lib).setLevel(logging.WARNING) @@ -71,49 +68,41 @@ class IngestionService: self.embedder = EmbeddingsClient() self.llm = LLMService() - # WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE) embed_cfg = self.llm.profiles.get("embedding_expert", {}) self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE - self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE - # WP-15b: Kontext-Gedächtnis für ID-Auflösung + # Kontext-Gedächtnis für ID-Auflösung self.batch_cache: Dict[str, NoteContext] = {} - # WP-24c: Puffer für Phase 2 (Symmetrie-Injektion nach dem gesamten Import) + # Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports) self.symmetry_buffer: List[Dict[str, Any]] = [] try: - # Aufruf der modularisierten Schema-Logik ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: logger.warning(f"DB initialization warning: {e}") def _is_valid_note_id(self, text: Optional[str]) -> bool: - """ - WP-24c: Prüft Ziel-Strings auf fachliche Validität. - Verhindert Müll-Kanten zu System-Platzhaltern. - """ + """WP-24c: Fachliche Validitätsprüfung gegen Junk-Kanten.""" if not text or not isinstance(text, str) or len(text.strip()) < 2: return False - blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by", "none", "unknown"} if text.lower().strip() in blacklisted: return False - if len(text) > 200: return False return True async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]: """ - WP-15b: Two-Pass Ingestion Workflow (PHASE 1). - Verarbeitet Batches und schreibt NUR Nutzer-Autorität in die DB. + WP-15b: Phase 1 des Two-Phase Ingestion Workflows. + Verarbeitet Batches und schreibt NUR Nutzer-Autorität (physische Kanten) in die DB. """ self.batch_cache.clear() logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---") - # 1. Schritt: Pre-Scan (Context-Cache füllen) + # 1. Pre-Scan (ID-Gedächtnis füllen) for path in file_paths: try: ctx = pre_scan_markdown(path, registry=self.registry) @@ -125,7 +114,7 @@ class IngestionService: except Exception as e: logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}") - # 2. Schritt: PROCESSING + # 2. Schritt: Batch-Verarbeitung (Explicit Authority) processed_count = 0 success_count = 0 for p in file_paths: @@ -134,7 +123,7 @@ class IngestionService: if res.get("status") == "success": success_count += 1 - logger.info(f"--- ✅ Batch Phase 1 beendet ({success_count}/{processed_count}) ---") + logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---") return { "status": "success", "processed": processed_count, @@ -144,46 +133,40 @@ class IngestionService: async def commit_vault_symmetries(self) -> Dict[str, Any]: """ - WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus. - Wird einmalig am Ende des gesamten Imports aufgerufen. - Sorgt dafür, dass virtuelle Kanten erst NACH der Nutzer-Autorität geschrieben werden. + WP-24c: Globale Symmetrie-Injektion (Phase 2). + Prüft gepufferte Kanten gegen die Instance-of-Truth in Qdrant. """ if not self.symmetry_buffer: - logger.info("⏭️ Symmetrie-Puffer leer. Keine Aktion erforderlich.") + logger.info("⏭️ Symmetrie-Puffer leer.") return {"status": "skipped", "reason": "buffer_empty"} - logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Vorschläge gegen Live-DB...") + logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrien gegen Live-DB...") final_virtuals = [] for v_edge in self.symmetry_buffer: - # Sicherheits-Check: Keine Kanten ohne Ziele zulassen - if not v_edge.get("target_id") or v_edge.get("target_id") == "None": - continue + if not v_edge.get("target_id") or v_edge.get("target_id") == "None": continue - # ID der potenziellen Symmetrie berechnen v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note")) - # AUTHORITY-CHECK: Nur schreiben, wenn KEINE manuelle Kante in der DB existiert + # Schutz der Nutzer-Autorität if not is_explicit_edge_present(self.client, self.prefix, v_id): final_virtuals.append(v_edge) - logger.info(f" 🔄 [SYMMETRY] Erzeuge Gegenkante: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}") + logger.info(f" 🔄 [SYMMETRY] Add inverse: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}") else: logger.debug(f" 🛡️ Schutz: Manuelle Kante belegt ID {v_id}. Symmetrie verworfen.") - added_count = 0 if final_virtuals: - logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten in Qdrant.") e_pts = points_for_edges(self.prefix, final_virtuals)[1] + # wait=True garantiert, dass der nächste Lauf diese Kanten sofort sieht upsert_batch(self.client, f"{self.prefix}_edges", e_pts, wait=True) - added_count = len(final_virtuals) - self.symmetry_buffer.clear() # Puffer leeren - return {"status": "success", "added": added_count} + added = len(final_virtuals) + self.symmetry_buffer.clear() + return {"status": "success", "added": added} async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: """ - Transformiert eine Markdown-Datei. - Schreibt Notes/Chunks/Explicit Edges sofort (Phase 1). - Befüllt den Symmetrie-Puffer für die globale Phase 2. + Transformiert eine Note. + Implementiert strikte ID-Kanonisierung und Pydantic-Safety. """ apply = kwargs.get("apply", False) force_replace = kwargs.get("force_replace", False) @@ -192,7 +175,7 @@ class IngestionService: result = {"path": file_path, "status": "skipped", "changed": False, "error": None} try: - # --- ORDNER-FILTER (.trash) --- + # Ordner-Filter if any(part.startswith('.') for part in file_path.split(os.sep)): return {**result, "status": "skipped", "reason": "hidden_folder"} @@ -210,14 +193,14 @@ class IngestionService: note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry) note_id = note_pl.get("note_id") - # --- GUARD CLAUSE: Fehlende IDs verhindern PointStruct-Crash --- - if not note_id: - logger.warning(f" ⚠️ Fehlende note_id in '{file_path}'. Datei wird ignoriert.") - return {**result, "status": "error", "error": "missing_note_id"} + # --- HARD GUARD: Verhindert Pydantic-Crashes bei unvollständigen Notizen --- + if not note_id or note_id == "None": + logger.warning(f" ⚠️ Ungültige note_id in '{file_path}'. Überspringe.") + return {**result, "status": "error", "error": "invalid_note_id"} logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})") - # Change Detection & Fragment-Prüfung + # Change Detection old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) if not (force_replace or not old_payload or c_miss or e_miss): @@ -226,7 +209,7 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # Deep Processing & MoE (LLM Validierung) + # LLM Validierung (Expert-MoE) profile = note_pl.get("chunk_profile", "sliding_standard") chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) @@ -235,12 +218,11 @@ class IngestionService: for ch in chunks: new_pool = [] for cand in getattr(ch, "candidate_pool", []): - # --- GUARD: Ungültige Ziele im Candidate-Pool filtern --- t_id = cand.get('target_id') or cand.get('note_id') - if not self._is_valid_note_id(t_id): - continue + if not self._is_valid_note_id(t_id): continue if cand.get("provenance") == "global_pool" and enable_smart: + # LLM Logging logger.info(f" ⚖️ [VALIDATING] Relation to '{t_id}' via Expert-LLM...") is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm) logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}") @@ -252,28 +234,31 @@ class IngestionService: chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] - # Kanten-Logik (Kanonisierung via batch_cache) + # --- KANTEN-LOGIK MIT STRIKTER KANONISIERUNG (FIX FÜR STEINZEITAXT) --- raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) - explicit_edges = [] for e in raw_edges: target_raw = e.get("target_id") - # ID-Resolution über den Context-Cache t_ctx = self.batch_cache.get(target_raw) - target_id = t_ctx.note_id if t_ctx else target_raw + # Wenn das Ziel nicht im Cache ist, haben wir keine stabile note_id -> Überspringen (Ghost-ID Schutz) + if not t_ctx: + logger.debug(f" ⚠️ Linkziel '{target_raw}' nicht im Cache. Überspringe Kante.") + continue + + target_id = t_ctx.note_id if not self._is_valid_note_id(target_id): continue resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit")) - # Echte physische Kante markieren (Phase 1 Authority) + # Echte physische Kante markieren (Phase 1) e.update({ "kind": resolved_kind, "target_id": target_id, "origin_note_id": note_id, "virtual": False, "confidence": 1.0 }) explicit_edges.append(e) - # Symmetrie-Kandidat puffern + # Symmetrie puffern inv_kind = edge_registry.get_inverse(resolved_kind) if inv_kind and target_id != note_id: v_edge = e.copy() @@ -284,7 +269,7 @@ class IngestionService: }) self.symmetry_buffer.append(v_edge) - # 4. DB Upsert (Phase 1: Authority Commitment) + # 4. DB Commit (Phase 1) if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) @@ -292,7 +277,7 @@ class IngestionService: if chunk_pls and vecs: upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) if explicit_edges: - # Wichtig: wait=True stellt sicher, dass die Kanten in Phase 2 searchable sind + # WICHTIG: wait=True für Phase-1 Konsistenz upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1], wait=True) logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.")