From 8fd7ef804d2d0f1527f54ce3eb32d71fe2f5d2e1 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 10 Jan 2026 14:02:10 +0100 Subject: [PATCH] Update ingestion_processor.py to version 3.4.3: Remove incompatible edge_registry initialization, maintain strict two-phase strategy, and fix ID generation issues. Enhance logging and comments for clarity, ensuring compatibility and improved functionality in the ingestion workflow. --- app/core/ingestion/ingestion_processor.py | 51 ++++++++--------------- 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index aa2423d..9977179 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -4,10 +4,11 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.4.2: Strikte 2-Phasen-Strategie (Authority-First). - Lösung des Ghost-ID Problems & Pydantic-Crash Fix. - Zentralisierte ID-Generierung zur Vermeidung von Duplikaten. -VERSION: 3.4.2 (WP-24c: Unified ID Orchestration) + AUDIT v3.4.3: + - Entfernung des inkompatiblen edge_registry.initialize Aufrufs. + - Beibehaltung der strikten 2-Phasen-Strategie (Authority-First). + - Fix für das Steinzeitaxt-Problem via zentralisierter ID-Logik. +VERSION: 3.4.3 (WP-24c: Compatibility Fix) STATUS: Active """ import logging @@ -22,8 +23,8 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks -# WP-24c: Import der zentralen Identitäts-Logik und Pfad-Getter -from app.core.graph.graph_utils import _mk_edge_id, get_vocab_path, get_schema_path +# WP-24c: Import der zentralen Identitäts-Logik +from app.core.graph.graph_utils import _mk_edge_id # Datenbank-Ebene (Modularisierte database-Infrastruktur) from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes @@ -57,15 +58,10 @@ class IngestionService: self.settings = get_settings() # --- LOGGING CLEANUP --- + # Unterdrückt Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: logging.getLogger(lib).setLevel(logging.WARNING) - # WP-24c: Explizite Initialisierung der Registry mit .env Pfaden - edge_registry.initialize( - vocab_path=get_vocab_path(), - schema_path=get_schema_path() - ) - self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() self.cfg.prefix = self.prefix @@ -104,7 +100,7 @@ class IngestionService: async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]: """ - WP-15b: Phase 1 des Two-Pass Ingestion Workflows. + WP-15b: Phase 1 des Two-Pass Workflows. Verarbeitet Batches und schreibt NUR Nutzer-Autorität (explizite Kanten). """ self.batch_cache.clear() @@ -142,7 +138,6 @@ class IngestionService: """ WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus. Wird am Ende des gesamten Imports aufgerufen. - Sorgt dafür, dass virtuelle Kanten niemals Nutzer-Autorität überschreiben. """ if not self.symmetry_buffer: return {"status": "skipped", "reason": "buffer_empty"} @@ -153,13 +148,13 @@ class IngestionService: src, tgt, kind = v_edge.get("note_id"), v_edge.get("target_id"), v_edge.get("kind") if not src or not tgt: continue - # WP-Fix v3.4.2: NUTZUNG DER ZENTRALEN FUNKTION STATT MANUELLEM STRING + # WP-Fix: Nutzung der zentralisierten ID-Logik aus graph_utils try: v_id = _mk_edge_id(kind, src, tgt, "note") except ValueError: continue - # AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante in der DB existiert + # AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante existiert if not is_explicit_edge_present(self.client, self.prefix, v_id): final_virtuals.append(v_edge) logger.info(f" 🔄 [SYMMETRY] Add inverse: {src} --({kind})--> {tgt}") @@ -178,7 +173,6 @@ class IngestionService: """ Transformiert eine Markdown-Datei (Phase 1). Schreibt Notes/Chunks/Explicit Edges sofort. - Befüllt den Symmetrie-Puffer für Phase 2. """ apply = kwargs.get("apply", False) force_replace = kwargs.get("force_replace", False) @@ -191,7 +185,6 @@ class IngestionService: if ".trash" in file_path or any(part.startswith('.') for part in file_path.split(os.sep)): return {**result, "status": "skipped", "reason": "ignored_folder"} - # Datei einlesen und validieren parsed = read_markdown(file_path) if not parsed: return {**result, "error": "Empty file"} fm = normalize_frontmatter(parsed.frontmatter) @@ -205,7 +198,7 @@ class IngestionService: logger.info(f"📄 Bearbeite: '{note_id}'") - # Change Detection & Fragment-Prüfung + # Change Detection old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) if not (force_replace or not old_payload or c_miss or e_miss): @@ -214,7 +207,7 @@ class IngestionService: if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} - # Deep Processing & MoE (LLM Validierung) + # Chunks & MoE profile = note_pl.get("chunk_profile", "sliding_standard") note_type = resolve_note_type(self.registry, fm.get("type")) chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) @@ -234,17 +227,15 @@ class IngestionService: new_pool.append(cand) ch.candidate_pool = new_pool - # Embeddings erzeugen chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] - # Kanten-Extraktion mit strikter Cache-Resolution (Fix für Ghost-IDs) + # Kanten-Extraktion raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", [])) explicit_edges = [] for e in raw_edges: t_raw = e.get("target_id") - # Kanonisierung: Link-Auflösung über den globalen Cache t_ctx = self.batch_cache.get(t_raw) t_id = t_ctx.note_id if t_ctx else t_raw @@ -254,20 +245,14 @@ class IngestionService: e.update({"kind": resolved_kind, "target_id": t_id, "origin_note_id": note_id, "virtual": False}) explicit_edges.append(e) - # Symmetrie-Gegenkante für Phase 2 puffern + # Symmetrie puffern inv_kind = edge_registry.get_inverse(resolved_kind) if inv_kind and t_id != note_id: v_edge = e.copy() - v_edge.update({ - "note_id": t_id, - "target_id": note_id, - "kind": inv_kind, - "virtual": True, - "origin_note_id": note_id - }) + v_edge.update({"note_id": t_id, "target_id": note_id, "kind": inv_kind, "virtual": True, "origin_note_id": note_id}) self.symmetry_buffer.append(v_edge) - # DB Upsert (Phase 1: Authority Commitment) + # DB Upsert if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) col_n, pts_n = points_for_note(self.prefix, note_pl, None, self.dim) @@ -289,7 +274,7 @@ class IngestionService: return {**result, "status": "error", "error": str(e)} async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: - """Erstellt eine Note aus einem Textstream und triggert die Ingestion.""" + """Erstellt eine Note aus einem Textstream.""" target_path = os.path.join(vault_root, folder, filename) os.makedirs(os.path.dirname(target_path), exist_ok=True) with open(target_path, "w", encoding="utf-8") as f: