From 7e4ea670b1ee3afdcdcbed0c5c586accd0fe43b3 Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 9 Jan 2026 22:15:14 +0100 Subject: [PATCH] Update ingestion_processor.py to version 3.2.0: Enhance logging stability and improve edge validation by addressing KeyError risks. Implement batch import with symmetry memory and modularized schema logic for explicit edge handling. Adjust documentation and versioning for improved clarity and robustness. --- app/core/ingestion/ingestion_processor.py | 39 ++++++++++++++--------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index ef3724b..8d9179b 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -2,17 +2,19 @@ FILE: app/core/ingestion/ingestion_processor.py DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). - WP-25a: Integration der Mixture of Experts (MoE) Architektur. + WP-25a: Mixture of Experts (MoE) - LLM Edge Validation. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.1.9: Vollständiges Script mit Business-Logging, UUIDs und Edge-Fix. -VERSION: 3.1.9 (WP-24c: Robust Orchestration & Full Feature Set) + AUDIT v3.2.0: Fix für KeyError 'target_id', stabiles Logging + und Priorisierung expliziter User-Kanten. +VERSION: 3.2.0 (WP-24c: Stability & Business Logging) STATUS: Active """ import logging import asyncio import os import re +import sys from typing import Dict, List, Optional, Tuple, Any # Core Module Imports @@ -21,7 +23,7 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks -# WP-24c: Import für die deterministische ID-Vorabberechnung (nun UUID-basiert) +# WP-24c: Import für die deterministische ID-Vorabberechnung (UUID-basiert) from app.core.graph.graph_utils import _mk_edge_id # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene @@ -41,7 +43,7 @@ from .ingestion_validation import validate_edge_candidate from .ingestion_note_payload import make_note_payload from .ingestion_chunk_payload import make_chunk_payloads -# Fallback für Edges (Struktur-Verknüpfung) +# Fallback für Edges try: from app.core.graph.graph_derive_edges import build_edges_for_note except ImportError: @@ -56,10 +58,9 @@ class IngestionService: self.settings = get_settings() # --- LOGGING CLEANUP (Business Focus) --- - logging.getLogger("httpx").setLevel(logging.WARNING) - logging.getLogger("httpcore").setLevel(logging.WARNING) - logging.getLogger("qdrant_client").setLevel(logging.WARNING) - logging.getLogger("urllib3").setLevel(logging.WARNING) + # Unterdrückt technische Bibliotheks-Meldungen im Log-File und Konsole + for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: + logging.getLogger(lib).setLevel(logging.WARNING) self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() @@ -76,9 +77,12 @@ class IngestionService: self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache + + # WP-24c: Laufzeit-Speicher für explizite Kanten-IDs im aktuellen Batch self.processed_explicit_ids = set() try: + # Aufruf der modularisierten Schema-Logik ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: @@ -104,6 +108,7 @@ class IngestionService: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: """ WP-15b: Two-Pass Ingestion Workflow. + Implementiert Batch-Import mit Symmetrie-Gedächtnis. """ self.processed_explicit_ids.clear() logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---") @@ -112,6 +117,7 @@ class IngestionService: try: ctx = pre_scan_markdown(path, registry=self.registry) if ctx: + # Look-up Index für Note_IDs und Titel self.batch_cache[ctx.note_id] = ctx self.batch_cache[ctx.title] = ctx fname = os.path.splitext(os.path.basename(path))[0] @@ -197,7 +203,9 @@ class IngestionService: is_valid = await validate_edge_candidate( ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator" ) - logger.info(f" 🧠 [SMART EDGE] {cand['target_id']} -> {'✅ OK' if is_valid else '❌ SKIP'}") + # Fix (v3.2.0): Symmetrisches Logging ohne KeyError-Risiko + target_label = cand.get('target_id') or cand.get('note_id') or 'Unbekannt' + logger.info(f" 🧠 [SMART EDGE] {target_label} -> {'✅ OK' if is_valid else '❌ SKIP'}") if is_valid: new_pool.append(cand) else: new_pool.append(cand) @@ -206,6 +214,7 @@ class IngestionService: chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] + # Aggregation aller Kanten raw_edges = build_edges_for_note( note_id, chunk_pls, note_level_references=note_pl.get("references", []), @@ -219,7 +228,6 @@ class IngestionService: for e in raw_edges: target_raw = e.get("target_id") if not self._is_valid_note_id(target_raw, provenance="explicit"): - logger.warning(f" ⚠️ Ignoriere Kante zu '{target_raw}' (Ungültige ID)") continue resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit")) @@ -246,7 +254,6 @@ class IngestionService: is_in_batch = potential_id in self.processed_explicit_ids - # Real-Time DB Check (Sync) is_in_db = False if not is_in_batch: is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id) @@ -264,9 +271,10 @@ class IngestionService: edges = final_edges - # 4. DB Upsert + # 4. DB Upsert via modularisierter Points-Logik if apply: - if purge_before: purge_artifacts(self.client, self.prefix, note_id) + if purge_before and old_payload: + purge_artifacts(self.client, self.prefix, note_id) upsert_batch(self.client, f"{self.prefix}_notes", points_for_note(self.prefix, note_pl, None, self.dim)[1]) if chunk_pls and vecs: @@ -284,6 +292,7 @@ class IngestionService: """Erstellt eine Note aus einem Textstream.""" target_path = os.path.join(vault_root, folder, filename) os.makedirs(os.path.dirname(target_path), exist_ok=True) - with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content) + with open(target_path, "w", encoding="utf-8") as f: + f.write(markdown_content) await asyncio.sleep(0.1) return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file