diff --git a/app/core/ingestion/ingestion_db.py b/app/core/ingestion/ingestion_db.py index 84fa2db..db6d5d2 100644 --- a/app/core/ingestion/ingestion_db.py +++ b/app/core/ingestion/ingestion_db.py @@ -2,11 +2,11 @@ FILE: app/core/ingestion/ingestion_db.py DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. WP-14: Umstellung auf zentrale database-Infrastruktur. - WP-20/22: Integration von Cloud-Resilienz und Fehlerbehandlung. + WP-20/22: Cloud-Resilienz und Fehlerbehandlung. WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. Integration der Authority-Prüfung für Point-IDs zur Symmetrie-Validierung. -VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup) +VERSION: 2.2.1 (WP-24c: Robust Authority Lookup) STATUS: Active """ import logging @@ -45,26 +45,57 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool: """ WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert. - Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen - durch virtuelle Symmetrie-Kanten zu verhindern. + Wird vom IngestionProcessor in Phase 2 genutzt, um das Überschreiben + von manuellem Wissen durch virtuelle Symmetrie-Kanten zu verhindern. """ + if not edge_id: return False + _, _, edges_col = collection_names(prefix) try: - res = client.retrieve(collection_name=edges_col, ids=[edge_id], with_payload=True) - if res and not res[0].payload.get("virtual", False): - return True + # retrieve ist der schnellste Weg, um einen spezifischen Punkt via ID zu laden + res = client.retrieve( + collection_name=edges_col, + ids=[edge_id], + with_payload=True + ) + # Wenn der Punkt existiert und NICHT virtuell ist, handelt es sich um eine Nutzer-Autorität + if res and len(res) > 0: + payload = res[0].payload + if not payload.get("virtual", False): + return True return False - except Exception: + except Exception as e: + logger.debug(f"Authority check for {edge_id} failed: {e}") return False def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): - """WP-24c: Selektives Löschen von Artefakten (Origin-Purge).""" + """ + WP-24c: Selektives Löschen von Artefakten vor einem Re-Import. + Implementiert das Origin-Purge-Prinzip zur Sicherung der bidirektionalen Graph-Integrität. + """ _, chunks_col, edges_col = collection_names(prefix) + try: - chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter)) - edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))]) - client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter)) + # 1. Chunks löschen (immer fest an die note_id gebunden) + chunks_filter = rest.Filter(must=[ + rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)) + ]) + client.delete( + collection_name=chunks_col, + points_selector=rest.FilterSelector(filter=chunks_filter) + ) + + # 2. WP-24c: Kanten löschen (HERKUNFTS-BASIERT via origin_note_id) + # Wir löschen alle Kanten, die von DIESER Note erzeugt wurden. + edges_filter = rest.Filter(must=[ + rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id)) + ]) + client.delete( + collection_name=edges_col, + points_selector=rest.FilterSelector(filter=edges_filter) + ) + logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.") + except Exception as e: logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}") \ No newline at end of file diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 3a2f011..7307d59 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,10 +5,10 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.3.6: Strikte Phasentrennung (Phase 2 global am Ende). - Fix für .trash-Folder und Pydantic 'None'-Crash. - Vollständige Wiederherstellung des Business-Loggings. -VERSION: 3.3.6 (WP-24c: Full Transparency Orchestration) + AUDIT v3.3.7: Strikte globale Phasentrennung. + Fix für Pydantic Crash (None-ID Guard Clauses). + Erzwingung der Konsistenz (wait=True). +VERSION: 3.3.7 (WP-24c: Strict Authority Commitment) STATUS: Active """ import logging @@ -26,7 +26,7 @@ from app.core.chunking import assemble_chunks # WP-24c: Import für die deterministische UUID-Vorabberechnung from app.core.graph.graph_utils import _mk_edge_id -# Datenbank-Ebene (Modularisierte database-Infrastruktur) +# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch from qdrant_client.http import models as rest @@ -53,12 +53,12 @@ logger = logging.getLogger(__name__) class IngestionService: def __init__(self, collection_prefix: str = None): - """Initialisiert den Service und bereinigt das technische Logging.""" + """Initialisiert den Service und nutzt die neue database-Infrastruktur.""" from app.config import get_settings self.settings = get_settings() # --- LOGGING CLEANUP (Business Focus) --- - # Unterdrückt HTTP-Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs + # Unterdrückt technische Bibliotheks-Header, erhält aber inhaltliche Service-Logs for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: logging.getLogger(lib).setLevel(logging.WARNING) @@ -71,47 +71,49 @@ class IngestionService: self.embedder = EmbeddingsClient() self.llm = LLMService() - # WP-25a: Dimensionen über das LLM-Profil auflösen + # WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE) embed_cfg = self.llm.profiles.get("embedding_expert", {}) self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE - # Festlegen des Change-Detection Modus self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE - # WP-15b: Kontext-Gedächtnis für ID-Auflösung (Global) + # WP-15b: Kontext-Gedächtnis für ID-Auflösung self.batch_cache: Dict[str, NoteContext] = {} - # WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports) + # WP-24c: Puffer für Phase 2 (Symmetrie-Injektion nach dem gesamten Import) self.symmetry_buffer: List[Dict[str, Any]] = [] try: - # Schema-Prüfung und Initialisierung + # Aufruf der modularisierten Schema-Logik ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: logger.warning(f"DB initialization warning: {e}") - def _is_valid_note_id(self, text: str) -> bool: + def _is_valid_note_id(self, text: Optional[str]) -> bool: """ WP-24c: Prüft Ziel-Strings auf fachliche Validität. - Verhindert Müll-Kanten zu reinen System-Platzhaltern. + Verhindert Müll-Kanten zu System-Platzhaltern. """ - if not text or len(text.strip()) < 2: return False - blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"} - if text.lower().strip() in blacklisted: return False + if not text or not isinstance(text, str) or len(text.strip()) < 2: + return False + + blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by", "none", "unknown"} + if text.lower().strip() in blacklisted: + return False + if len(text) > 200: return False return True async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]: """ WP-15b: Two-Pass Ingestion Workflow (PHASE 1). - Füllt den Cache und verarbeitet Dateien batchweise. - Gibt ein Dictionary zurück, um Kompatibilität zum Orchestrator zu wahren. + Verarbeitet Batches und schreibt NUR Nutzer-Autorität in die DB. """ self.batch_cache.clear() logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---") - # 1. Schritt: Pre-Scan (Context-Cache befüllen) + # 1. Schritt: Pre-Scan (Context-Cache füllen) for path in file_paths: try: ctx = pre_scan_markdown(path, registry=self.registry) @@ -123,7 +125,7 @@ class IngestionService: except Exception as e: logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}") - # 2. Schritt: Batch-Verarbeitung (Authority Only) + # 2. Schritt: PROCESSING processed_count = 0 success_count = 0 for p in file_paths: @@ -132,52 +134,56 @@ class IngestionService: if res.get("status") == "success": success_count += 1 - logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---") + logger.info(f"--- ✅ Batch Phase 1 beendet ({success_count}/{processed_count}) ---") return { "status": "success", "processed": processed_count, "success": success_count, - "buffered_virtuals": len(self.symmetry_buffer) + "buffered_symmetries": len(self.symmetry_buffer) } async def commit_vault_symmetries(self) -> Dict[str, Any]: """ - WP-24c: Führt PHASE 2 (Symmetrie-Injektion) für den gesamten Vault aus. - Wird nach Abschluss aller Batches einmalig aufgerufen. - Vergleicht gepufferte Kanten gegen die Instance-of-Truth in Qdrant. + WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus. + Wird einmalig am Ende des gesamten Imports aufgerufen. + Sorgt dafür, dass virtuelle Kanten erst NACH der Nutzer-Autorität geschrieben werden. """ if not self.symmetry_buffer: - logger.info("⏭️ Symmetrie-Puffer ist leer. Keine Aktion erforderlich.") + logger.info("⏭️ Symmetrie-Puffer leer. Keine Aktion erforderlich.") return {"status": "skipped", "reason": "buffer_empty"} - logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen die Instance-of-Truth...") + logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Vorschläge gegen Live-DB...") final_virtuals = [] for v_edge in self.symmetry_buffer: - # Deterministische ID der potenziellen Symmetrie berechnen + # Sicherheits-Check: Keine Kanten ohne Ziele zulassen + if not v_edge.get("target_id") or v_edge.get("target_id") == "None": + continue + + # ID der potenziellen Symmetrie berechnen v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note")) # AUTHORITY-CHECK: Nur schreiben, wenn KEINE manuelle Kante in der DB existiert if not is_explicit_edge_present(self.client, self.prefix, v_id): final_virtuals.append(v_edge) - # Detailliertes Logging für volle Transparenz - logger.info(f" 🔄 [SYMMETRY] Add inverse: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}") + logger.info(f" 🔄 [SYMMETRY] Erzeuge Gegenkante: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}") else: logger.debug(f" 🛡️ Schutz: Manuelle Kante belegt ID {v_id}. Symmetrie verworfen.") added_count = 0 if final_virtuals: - logger.info(f"📤 Schreibe {len(final_virtuals)} validierte Symmetrie-Kanten in den Graphen.") + logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten in Qdrant.") e_pts = points_for_edges(self.prefix, final_virtuals)[1] - upsert_batch(self.client, f"{self.prefix}_edges", e_pts) + upsert_batch(self.client, f"{self.prefix}_edges", e_pts, wait=True) added_count = len(final_virtuals) - self.symmetry_buffer.clear() # Puffer nach erfolgreichem Commit leeren + self.symmetry_buffer.clear() # Puffer leeren return {"status": "success", "added": added_count} async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: """ - Transformiert eine Markdown-Datei in Phase 1 (Authority First). - Implementiert Ordner-Blacklists, Pydantic-Safety und MoE-Validierung. + Transformiert eine Markdown-Datei. + Schreibt Notes/Chunks/Explicit Edges sofort (Phase 1). + Befüllt den Symmetrie-Puffer für die globale Phase 2. """ apply = kwargs.get("apply", False) force_replace = kwargs.get("force_replace", False) @@ -186,7 +192,7 @@ class IngestionService: result = {"path": file_path, "status": "skipped", "changed": False, "error": None} try: - # --- ORDNER-FILTER (Fix für .trash und .obsidian Junk) --- + # --- ORDNER-FILTER (.trash) --- if any(part.startswith('.') for part in file_path.split(os.sep)): return {**result, "status": "skipped", "reason": "hidden_folder"} @@ -195,7 +201,6 @@ class IngestionService: if any(folder in file_path for folder in ignore_folders): return {**result, "status": "skipped", "reason": "folder_blacklist"} - # Datei einlesen und validieren parsed = read_markdown(file_path) if not parsed: return {**result, "error": "Empty file"} fm = normalize_frontmatter(parsed.frontmatter) @@ -205,7 +210,7 @@ class IngestionService: note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry) note_id = note_pl.get("note_id") - # --- FIX: Guard Clause gegen 'None' IDs (Verhindert Pydantic Crash) --- + # --- GUARD CLAUSE: Fehlende IDs verhindern PointStruct-Crash --- if not note_id: logger.warning(f" ⚠️ Fehlende note_id in '{file_path}'. Datei wird ignoriert.") return {**result, "status": "error", "error": "missing_note_id"} @@ -221,7 +226,7 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # Chunks erzeugen und semantisch validieren (MoE) + # Deep Processing & MoE (LLM Validierung) profile = note_pl.get("chunk_profile", "sliding_standard") chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) @@ -230,45 +235,45 @@ class IngestionService: for ch in chunks: new_pool = [] for cand in getattr(ch, "candidate_pool", []): + # --- GUARD: Ungültige Ziele im Candidate-Pool filtern --- + t_id = cand.get('target_id') or cand.get('note_id') + if not self._is_valid_note_id(t_id): + continue + if cand.get("provenance") == "global_pool" and enable_smart: - # Detailliertes Business-Logging für LLM-Aktivitäten - target_label = cand.get('target_id') or cand.get('note_id') or "Unknown" - logger.info(f" ⚖️ [VALIDATING] Relation to '{target_label}' via Expert-LLM...") - + logger.info(f" ⚖️ [VALIDATING] Relation to '{t_id}' via Expert-LLM...") is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm) - - logger.info(f" 🧠 [SMART EDGE] {target_label} -> {'✅ OK' if is_valid else '❌ SKIP'}") + logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}") if is_valid: new_pool.append(cand) else: new_pool.append(cand) ch.candidate_pool = new_pool - # Embeddings und Payloads chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] - # Kanten-Extraktion mit ID-Kanonisierung + # Kanten-Logik (Kanonisierung via batch_cache) raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) explicit_edges = [] for e in raw_edges: target_raw = e.get("target_id") - # Auflösung von Titeln/Dateinamen zu echten IDs über den globalen Cache - target_ctx = self.batch_cache.get(target_raw) - target_id = target_ctx.note_id if target_ctx else target_raw + # ID-Resolution über den Context-Cache + t_ctx = self.batch_cache.get(target_raw) + target_id = t_ctx.note_id if t_ctx else target_raw if not self._is_valid_note_id(target_id): continue resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit")) - # Echte physische Kante markieren (Phase 1 Autorität) + # Echte physische Kante markieren (Phase 1 Authority) e.update({ "kind": resolved_kind, "target_id": target_id, "origin_note_id": note_id, "virtual": False, "confidence": 1.0 }) explicit_edges.append(e) - # Symmetrie-Kandidat für die globale Phase 2 puffern + # Symmetrie-Kandidat puffern inv_kind = edge_registry.get_inverse(resolved_kind) if inv_kind and target_id != note_id: v_edge = e.copy() @@ -287,7 +292,8 @@ class IngestionService: if chunk_pls and vecs: upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) if explicit_edges: - upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1]) + # Wichtig: wait=True stellt sicher, dass die Kanten in Phase 2 searchable sind + upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1], wait=True) logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.") return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)} @@ -300,7 +306,6 @@ class IngestionService: """Erstellt eine Note aus einem Textstream.""" target_path = os.path.join(vault_root, folder, filename) os.makedirs(os.path.dirname(target_path), exist_ok=True) - with open(target_path, "w", encoding="utf-8") as f: - f.write(markdown_content) + with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content) await asyncio.sleep(0.1) return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index d616f3a..efeb765 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -2,19 +2,11 @@ # -*- coding: utf-8 -*- """ FILE: scripts/import_markdown.py -VERSION: 2.5.0 (2026-01-10) +VERSION: 2.6.0 (2026-01-10) STATUS: Active (Core) -COMPATIBILITY: IngestionProcessor v3.3.5+ - -Zweck: -------- -Hauptwerkzeug zum Importieren von Markdown-Dateien aus einem Vault in Qdrant. -Implementiert die globale 2-Phasen-Schreibstrategie. - -Änderungen v2.5.0: ------------------- -- Globale Phasentrennung: commit_vault_symmetries() wird erst am Ende aufgerufen. -- Erweiterter Ordner-Filter: Schließt .trash und andere Systemordner aus. +COMPATIBILITY: IngestionProcessor v3.3.7+ +Zweck: Hauptwerkzeug zum Importieren von Markdown-Dateien. + Implementiert die globale 2-Phasen-Schreibstrategie. """ import asyncio import os @@ -24,10 +16,8 @@ import sys from pathlib import Path from dotenv import load_dotenv -# Setzt das Level global auf INFO +# Root Logger Setup logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') - -# Stelle sicher, dass das Root-Verzeichnis im Python-Pfad ist sys.path.append(os.getcwd()) from app.core.ingestion import IngestionService @@ -41,14 +31,13 @@ async def main_async(args): logger.error(f"Vault path does not exist: {vault_path}") return - # 1. Service initialisieren logger.info(f"Initializing IngestionService (Prefix: {args.prefix})") service = IngestionService(collection_prefix=args.prefix) logger.info(f"Scanning {vault_path}...") all_files = list(vault_path.rglob("*.md")) - # --- ORDNER-FILTER --- + # --- GLOBALER ORDNER-FILTER --- files = [] ignore_folders = [".trash", ".obsidian", ".sync", "templates", "_system"] for f in all_files: @@ -74,7 +63,7 @@ async def main_async(args): except Exception: pass # ========================================================================= - # PHASE 1: Batch-Import (Explicit Edges only) + # PHASE 1: Batch-Import (Notes & Explicit Edges) # ========================================================================= stats = {"processed": 0, "skipped": 0, "errors": 0} sem = asyncio.Semaphore(5) @@ -82,6 +71,7 @@ async def main_async(args): async def process_with_limit(f_path): async with sem: try: + # Nutzt process_file (v3.3.7) return await service.process_file( file_path=str(f_path), vault_root=str(vault_path), force_replace=args.force, apply=args.apply, purge_before=True @@ -101,15 +91,15 @@ async def main_async(args): else: stats["skipped"] += 1 # ========================================================================= - # PHASE 2: Global Symmetry Injection + # PHASE 2: Global Symmetry Injection (Nach Abschluss aller Batches) # ========================================================================= if args.apply: - logger.info(f"🔄 [Phase 2] Starting global symmetry injection...") + logger.info(f"🔄 [Phase 2] Starting global symmetry injection for the entire vault...") sym_res = await service.commit_vault_symmetries() if sym_res.get("status") == "success": - logger.info(f"✅ Added {sym_res.get('added', 0)} protected symmetry edges.") + logger.info(f"✅ Finished global symmetry injection. Added: {sym_res.get('added', 0)}") - logger.info(f"Done. Final Stats: {stats}") + logger.info(f"Final Stats: {stats}") def main(): load_dotenv()