From 57656bbaaf0c0f9adefdd392fafc74d20232e0bb Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 10 Jan 2026 07:45:43 +0100 Subject: [PATCH] Refactor ingestion_db.py and ingestion_processor.py: Enhance documentation and logging clarity, integrate cloud resilience and error handling, and improve artifact purging logic. Update versioning to 3.3.6 to reflect changes in functionality, including strict phase separation and authority checks for explicit edges. --- app/core/ingestion/ingestion_db.py | 12 +- app/core/ingestion/ingestion_processor.py | 130 ++++++++++++++-------- 2 files changed, 93 insertions(+), 49 deletions(-) diff --git a/app/core/ingestion/ingestion_db.py b/app/core/ingestion/ingestion_db.py index c136c1f..84fa2db 100644 --- a/app/core/ingestion/ingestion_db.py +++ b/app/core/ingestion/ingestion_db.py @@ -1,7 +1,11 @@ """ FILE: app/core/ingestion/ingestion_db.py DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. + WP-14: Umstellung auf zentrale database-Infrastruktur. + WP-20/22: Integration von Cloud-Resilienz und Fehlerbehandlung. WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). + Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. + Integration der Authority-Prüfung für Point-IDs zur Symmetrie-Validierung. VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup) STATUS: Active """ @@ -9,6 +13,8 @@ import logging from typing import Optional, Tuple, List from qdrant_client import QdrantClient from qdrant_client.http import models as rest + +# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz from app.core.database import collection_names logger = logging.getLogger(__name__) @@ -39,7 +45,8 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool: """ WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert. - Verhindert das Überschreiben von manuellem Wissen durch Symmetrien. + Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen + durch virtuelle Symmetrie-Kanten zu verhindern. """ _, _, edges_col = collection_names(prefix) try: @@ -51,12 +58,11 @@ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> return False def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): - """Löscht Artefakte basierend auf ihrer Herkunft (Origin-Purge).""" + """WP-24c: Selektives Löschen von Artefakten (Origin-Purge).""" _, chunks_col, edges_col = collection_names(prefix) try: chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter)) - edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))]) client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter)) logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.") diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 249cca3..3a2f011 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -4,9 +4,11 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. - AUDIT v3.3.5: 2-Phasen-Strategie (Phase 2 erst nach allen Batches). - API-Fix für Dictionary-Rückgabe. Vollständiger Umfang. -VERSION: 3.3.5 (WP-24c: Global Symmetry Commitment) + WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. + AUDIT v3.3.6: Strikte Phasentrennung (Phase 2 global am Ende). + Fix für .trash-Folder und Pydantic 'None'-Crash. + Vollständige Wiederherstellung des Business-Loggings. +VERSION: 3.3.6 (WP-24c: Full Transparency Orchestration) STATUS: Active """ import logging @@ -51,15 +53,14 @@ logger = logging.getLogger(__name__) class IngestionService: def __init__(self, collection_prefix: str = None): - """Initialisiert den Service und bereinigt das Logging.""" + """Initialisiert den Service und bereinigt das technische Logging.""" from app.config import get_settings self.settings = get_settings() # --- LOGGING CLEANUP (Business Focus) --- - logging.getLogger("httpx").setLevel(logging.WARNING) - logging.getLogger("httpcore").setLevel(logging.WARNING) - logging.getLogger("qdrant_client").setLevel(logging.WARNING) - logging.getLogger("urllib3").setLevel(logging.WARNING) + # Unterdrückt HTTP-Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs + for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: + logging.getLogger(lib).setLevel(logging.WARNING) self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() @@ -70,25 +71,31 @@ class IngestionService: self.embedder = EmbeddingsClient() self.llm = LLMService() + # WP-25a: Dimensionen über das LLM-Profil auflösen embed_cfg = self.llm.profiles.get("embedding_expert", {}) self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE + # Festlegen des Change-Detection Modus self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE - # WP-15b: Kontext-Gedächtnis für ID-Auflösung + # WP-15b: Kontext-Gedächtnis für ID-Auflösung (Global) self.batch_cache: Dict[str, NoteContext] = {} # WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports) self.symmetry_buffer: List[Dict[str, Any]] = [] try: + # Schema-Prüfung und Initialisierung ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: logger.warning(f"DB initialization warning: {e}") def _is_valid_note_id(self, text: str) -> bool: - """WP-24c: Verhindert Müll-Kanten zu System-Platzhaltern.""" + """ + WP-24c: Prüft Ziel-Strings auf fachliche Validität. + Verhindert Müll-Kanten zu reinen System-Platzhaltern. + """ if not text or len(text.strip()) < 2: return False blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"} if text.lower().strip() in blacklisted: return False @@ -98,12 +105,13 @@ class IngestionService: async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]: """ WP-15b: Two-Pass Ingestion Workflow (PHASE 1). - Fix: Gibt Dictionary zurück, um Kompatibilität zum Importer-Script zu wahren. + Füllt den Cache und verarbeitet Dateien batchweise. + Gibt ein Dictionary zurück, um Kompatibilität zum Orchestrator zu wahren. """ self.batch_cache.clear() - logger.info(f"--- 🔍 START BATCH (Phase 1) ---") + logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---") - # 1. Pre-Scan (Context-Cache füllen) + # 1. Schritt: Pre-Scan (Context-Cache befüllen) for path in file_paths: try: ctx = pre_scan_markdown(path, registry=self.registry) @@ -115,7 +123,7 @@ class IngestionService: except Exception as e: logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}") - # 2. Schritt: PROCESSING (NUR AUTHORITY) + # 2. Schritt: Batch-Verarbeitung (Authority Only) processed_count = 0 success_count = 0 for p in file_paths: @@ -134,36 +142,43 @@ class IngestionService: async def commit_vault_symmetries(self) -> Dict[str, Any]: """ - WP-24c: Führt PHASE 2 für den gesamten Vault aus. - Wird nach allen run_batch Aufrufen einmalig getriggert. + WP-24c: Führt PHASE 2 (Symmetrie-Injektion) für den gesamten Vault aus. + Wird nach Abschluss aller Batches einmalig aufgerufen. + Vergleicht gepufferte Kanten gegen die Instance-of-Truth in Qdrant. """ if not self.symmetry_buffer: + logger.info("⏭️ Symmetrie-Puffer ist leer. Keine Aktion erforderlich.") return {"status": "skipped", "reason": "buffer_empty"} logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen die Instance-of-Truth...") final_virtuals = [] for v_edge in self.symmetry_buffer: - # ID der potenziellen Symmetrie berechnen + # Deterministische ID der potenziellen Symmetrie berechnen v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note")) - # Nur schreiben, wenn KEINE manuelle Kante in der DB existiert + # AUTHORITY-CHECK: Nur schreiben, wenn KEINE manuelle Kante in der DB existiert if not is_explicit_edge_present(self.client, self.prefix, v_id): final_virtuals.append(v_edge) + # Detailliertes Logging für volle Transparenz + logger.info(f" 🔄 [SYMMETRY] Add inverse: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}") else: - logger.debug(f" 🛡️ Schutz: Manuelle Kante verhindert Symmetrie {v_id}") + logger.debug(f" 🛡️ Schutz: Manuelle Kante belegt ID {v_id}. Symmetrie verworfen.") added_count = 0 if final_virtuals: - logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten.") + logger.info(f"📤 Schreibe {len(final_virtuals)} validierte Symmetrie-Kanten in den Graphen.") e_pts = points_for_edges(self.prefix, final_virtuals)[1] upsert_batch(self.client, f"{self.prefix}_edges", e_pts) added_count = len(final_virtuals) - self.symmetry_buffer.clear() # Puffer leeren + self.symmetry_buffer.clear() # Puffer nach erfolgreichem Commit leeren return {"status": "success", "added": added_count} async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: - """Transformiert Datei und befüllt den Symmetry-Buffer.""" + """ + Transformiert eine Markdown-Datei in Phase 1 (Authority First). + Implementiert Ordner-Blacklists, Pydantic-Safety und MoE-Validierung. + """ apply = kwargs.get("apply", False) force_replace = kwargs.get("force_replace", False) purge_before = kwargs.get("purge_before", False) @@ -171,7 +186,7 @@ class IngestionService: result = {"path": file_path, "status": "skipped", "changed": False, "error": None} try: - # --- ORDNER-FILTER (.trash) --- + # --- ORDNER-FILTER (Fix für .trash und .obsidian Junk) --- if any(part.startswith('.') for part in file_path.split(os.sep)): return {**result, "status": "skipped", "reason": "hidden_folder"} @@ -180,58 +195,80 @@ class IngestionService: if any(folder in file_path for folder in ignore_folders): return {**result, "status": "skipped", "reason": "folder_blacklist"} + # Datei einlesen und validieren parsed = read_markdown(file_path) if not parsed: return {**result, "error": "Empty file"} fm = normalize_frontmatter(parsed.frontmatter) + validate_required_frontmatter(fm) + note_type = resolve_note_type(self.registry, fm.get("type")) note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry) - note_id = note_pl["note_id"] + note_id = note_pl.get("note_id") + + # --- FIX: Guard Clause gegen 'None' IDs (Verhindert Pydantic Crash) --- + if not note_id: + logger.warning(f" ⚠️ Fehlende note_id in '{file_path}'. Datei wird ignoriert.") + return {**result, "status": "error", "error": "missing_note_id"} logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})") - # Change Detection + # Change Detection & Fragment-Prüfung old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) if not (force_replace or not old_payload or c_miss or e_miss): return {**result, "status": "unchanged", "note_id": note_id} - # Deep Processing & MoE + if not apply: + return {**result, "status": "dry-run", "changed": True, "note_id": note_id} + + # Chunks erzeugen und semantisch validieren (MoE) profile = note_pl.get("chunk_profile", "sliding_standard") chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) + enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg) for ch in chunks: new_pool = [] for cand in getattr(ch, "candidate_pool", []): - if cand.get("provenance") == "global_pool" and chunk_cfg.get("enable_smart_edge_allocation"): + if cand.get("provenance") == "global_pool" and enable_smart: + # Detailliertes Business-Logging für LLM-Aktivitäten + target_label = cand.get('target_id') or cand.get('note_id') or "Unknown" + logger.info(f" ⚖️ [VALIDATING] Relation to '{target_label}' via Expert-LLM...") + is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm) - t_id = cand.get('target_id') or cand.get('note_id') or "Unknown" - logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}") + + logger.info(f" 🧠 [SMART EDGE] {target_label} -> {'✅ OK' if is_valid else '❌ SKIP'}") if is_valid: new_pool.append(cand) else: new_pool.append(cand) ch.candidate_pool = new_pool + # Embeddings und Payloads chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] - # Kanten-Logik (Kanonisierung) + # Kanten-Extraktion mit ID-Kanonisierung raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) + explicit_edges = [] for e in raw_edges: target_raw = e.get("target_id") - t_ctx = self.batch_cache.get(target_raw) - target_id = t_ctx.note_id if t_ctx else target_raw + # Auflösung von Titeln/Dateinamen zu echten IDs über den globalen Cache + target_ctx = self.batch_cache.get(target_raw) + target_id = target_ctx.note_id if target_ctx else target_raw if not self._is_valid_note_id(target_id): continue resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit")) - # Echte physische Kante markieren (Phase 1) - e.update({"kind": resolved_kind, "target_id": target_id, "origin_note_id": note_id, "virtual": False, "confidence": 1.0}) + # Echte physische Kante markieren (Phase 1 Autorität) + e.update({ + "kind": resolved_kind, "target_id": target_id, + "origin_note_id": note_id, "virtual": False, "confidence": 1.0 + }) explicit_edges.append(e) - # Symmetrie-Kandidat für Phase 2 puffern + # Symmetrie-Kandidat für die globale Phase 2 puffern inv_kind = edge_registry.get_inverse(resolved_kind) if inv_kind and target_id != note_id: v_edge = e.copy() @@ -242,27 +279,28 @@ class IngestionService: }) self.symmetry_buffer.append(v_edge) - # 4. DB Upsert (Phase 1: Authority Only) - if apply: - if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) - n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) - upsert_batch(self.client, n_name, n_pts) - if chunk_pls and vecs: - upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) - if explicit_edges: - upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1]) + # 4. DB Upsert (Phase 1: Authority Commitment) + if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) + + n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) + upsert_batch(self.client, n_name, n_pts) + if chunk_pls and vecs: + upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) + if explicit_edges: + upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1]) logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.") return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)} except Exception as e: logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True) - return {**result, "error": str(e)} + return {**result, "status": "error", "error": str(e)} async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: """Erstellt eine Note aus einem Textstream.""" target_path = os.path.join(vault_root, folder, filename) os.makedirs(os.path.dirname(target_path), exist_ok=True) - with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content) + with open(target_path, "w", encoding="utf-8") as f: + f.write(markdown_content) await asyncio.sleep(0.1) return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file