diff --git a/app/core/graph/graph_utils.py b/app/core/graph/graph_utils.py index 54acd36..565c21f 100644 --- a/app/core/graph/graph_utils.py +++ b/app/core/graph/graph_utils.py @@ -2,7 +2,7 @@ FILE: app/core/graph/graph_utils.py DESCRIPTION: Basale Werkzeuge, ID-Generierung und Provenance-Konfiguration für den Graphen. WP-24c: Integration der EdgeRegistry für dynamische Topologie-Defaults. - FIX v1.2.0: Umstellung auf deterministische UUIDs (Qdrant Kompatibilität). + FIX v1.2.0: Umstellung auf deterministische UUIDs für Qdrant-Kompatibilität. VERSION: 1.2.0 STATUS: Active """ diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 5e07213..ef3724b 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,8 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.1.9: Fix für TypeError (Sync-Check), ID-Validierung und UUID-Support. -VERSION: 3.1.9 (WP-24c: Robust Symmetry & Sync Fix) + AUDIT v3.1.9: Vollständiges Script mit Business-Logging, UUIDs und Edge-Fix. +VERSION: 3.1.9 (WP-24c: Robust Orchestration & Full Feature Set) STATUS: Active """ import logging @@ -51,13 +51,18 @@ logger = logging.getLogger(__name__) class IngestionService: def __init__(self, collection_prefix: str = None): - """Initialisiert den Service und nutzt die neue database-Infrastruktur.""" + """Initialisiert den Service und bereinigt das Logging von technischem Lärm.""" from app.config import get_settings self.settings = get_settings() + # --- LOGGING CLEANUP (Business Focus) --- + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + logging.getLogger("qdrant_client").setLevel(logging.WARNING) + logging.getLogger("urllib3").setLevel(logging.WARNING) + self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() - # Synchronisierung der Konfiguration mit dem Instanz-Präfix self.cfg.prefix = self.prefix self.client = get_client(self.cfg) @@ -69,48 +74,40 @@ class IngestionService: embed_cfg = self.llm.profiles.get("embedding_expert", {}) self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE - # Festlegen, welcher Hash für die Change-Detection maßgeblich ist self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache - - # WP-24c: Laufzeit-Speicher für explizite Kanten-IDs im aktuellen Batch self.processed_explicit_ids = set() try: - # Aufruf der modularisierten Schema-Logik ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: logger.warning(f"DB initialization warning: {e}") - def _is_valid_note_id(self, text: str) -> bool: + def _is_valid_note_id(self, text: str, provenance: str = "explicit") -> bool: """ - WP-24c: Prüft, ob ein String eine plausible Note-ID oder ein gültiger Titel ist. - Verhindert Symmetrie-Kanten zu Meta-Begriffen wie 'insight' oder 'event'. + WP-24c: Prüft Ziel-Strings auf Validität. + User-Authority (explicit) wird weniger gefiltert als System-Strukturen. """ - if not text or len(text.strip()) < 3: + if not text or len(text.strip()) < 2: return False - blacklisted = { - "insight", "event", "source", "task", "project", - "person", "concept", "value", "principle", "lesson", - "decision", "requirement", "related_to", "referenced_by" - } - if text.lower().strip() in blacklisted: - return False - - if len(text) > 100: - return False + # Nur System-Kanten (Symmetrie) filtern wir gegen die Typ-Blacklist + if provenance != "explicit": + blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"} + if text.lower().strip() in blacklisted: + return False + if len(text) > 150: return False # Vermutlich ein ganzer Satz return True async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: """ - WP-15b: Implementiert den Two-Pass Ingestion Workflow. + WP-15b: Two-Pass Ingestion Workflow. """ self.processed_explicit_ids.clear() + logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---") - logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") for path in file_paths: try: ctx = pre_scan_markdown(path, registry=self.registry) @@ -122,8 +119,13 @@ class IngestionService: except Exception as e: logger.warning(f"⚠️ Pre-scan failed for {path}: {e}") - logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") - return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths] + results = [] + for p in file_paths: + res = await self.process_file(p, vault_root, apply=True, purge_before=True) + results.append(res) + + logger.info(f"--- ✅ BATCH IMPORT BEENDET ---") + return results async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: """Transformiert eine Markdown-Datei in den Graphen.""" @@ -161,6 +163,8 @@ class IngestionService: ) note_id = note_pl["note_id"] + logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})") + old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(check_key) @@ -174,7 +178,7 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # 3. Deep Processing + # 3. Deep Processing (Chunking, Validation, Embedding) try: body_text = getattr(parsed, "body", "") or "" edge_registry.ensure_latest() @@ -185,6 +189,7 @@ class IngestionService: chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) + # --- WP-25a: MoE Semantische Kanten-Validierung --- for ch in chunks: new_pool = [] for cand in getattr(ch, "candidate_pool", []): @@ -192,6 +197,7 @@ class IngestionService: is_valid = await validate_edge_candidate( ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator" ) + logger.info(f" 🧠 [SMART EDGE] {cand['target_id']} -> {'✅ OK' if is_valid else '❌ SKIP'}") if is_valid: new_pool.append(cand) else: new_pool.append(cand) @@ -212,7 +218,9 @@ class IngestionService: # PHASE 1: Alle expliziten Kanten registrieren for e in raw_edges: target_raw = e.get("target_id") - if not self._is_valid_note_id(target_raw): continue + if not self._is_valid_note_id(target_raw, provenance="explicit"): + logger.warning(f" ⚠️ Ignoriere Kante zu '{target_raw}' (Ungültige ID)") + continue resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit")) e.update({ @@ -233,12 +241,12 @@ class IngestionService: target_ctx = self.batch_cache.get(target_raw) target_id = target_ctx.note_id if target_ctx else target_raw - if (inv_kind and target_id and target_id != note_id and self._is_valid_note_id(target_id)): + if (inv_kind and target_id and target_id != note_id and self._is_valid_note_id(target_id, provenance="structure")): potential_id = _mk_edge_id(inv_kind, target_id, note_id, e.get("scope", "note")) is_in_batch = potential_id in self.processed_explicit_ids - # FIX v3.1.9: Kein 'await' verwenden, da die DB-Funktion synchron ist! + # Real-Time DB Check (Sync) is_in_db = False if not is_in_batch: is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id) @@ -252,39 +260,30 @@ class IngestionService: "origin_note_id": note_id }) final_edges.append(inv_edge) - logger.info(f"🔄 [SYMMETRY] Built inverse: {target_id} --({inv_kind})--> {note_id}") + logger.info(f" 🔄 [SYMMETRY] Gegenkante: {target_id} --({inv_kind})--> {note_id}") edges = final_edges # 4. DB Upsert if apply: - if purge_before and old_payload: - purge_artifacts(self.client, self.prefix, note_id) - - n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) - upsert_batch(self.client, n_name, n_pts) + if purge_before: purge_artifacts(self.client, self.prefix, note_id) + upsert_batch(self.client, f"{self.prefix}_notes", points_for_note(self.prefix, note_pl, None, self.dim)[1]) if chunk_pls and vecs: - c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] - upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) - + upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1]) if edges: - e_pts = points_for_edges(self.prefix, edges)[1] - upsert_batch(self.client, f"{self.prefix}_edges", e_pts) + upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, edges)[1]) - return { - "path": file_path, "status": "success", "changed": True, "note_id": note_id, - "chunks_count": len(chunk_pls), "edges_count": len(edges) - } + logger.info(f" ✨ Fertig: {len(chunk_pls)} Chunks, {len(edges)} Kanten.") + return {"status": "success", "note_id": note_id, "edges_count": len(edges)} except Exception as e: - logger.error(f"Processing failed: {e}", exc_info=True) + logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True) return {**result, "error": str(e)} async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: """Erstellt eine Note aus einem Textstream.""" target_path = os.path.join(vault_root, folder, filename) os.makedirs(os.path.dirname(target_path), exist_ok=True) - with open(target_path, "w", encoding="utf-8") as f: - f.write(markdown_content) + with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content) await asyncio.sleep(0.1) return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) \ No newline at end of file