Update ingestion_processor.py to version 3.2.0: Enhance logging stability and improve edge validation by addressing KeyError risks. Implement batch import with symmetry memory and modularized schema logic for explicit edge handling. Adjust documentation and versioning for improved clarity and robustness.

This commit is contained in:
Lars 2026-01-09 22:15:14 +01:00
parent 008a470f02
commit 7e4ea670b1

View File

@ -2,17 +2,19 @@
FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-25a: Mixture of Experts (MoE) - LLM Edge Validation.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.1.9: Vollständiges Script mit Business-Logging, UUIDs und Edge-Fix.
VERSION: 3.1.9 (WP-24c: Robust Orchestration & Full Feature Set)
AUDIT v3.2.0: Fix für KeyError 'target_id', stabiles Logging
und Priorisierung expliziter User-Kanten.
VERSION: 3.2.0 (WP-24c: Stability & Business Logging)
STATUS: Active
"""
import logging
import asyncio
import os
import re
import sys
from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
@ -21,7 +23,7 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext
)
from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische ID-Vorabberechnung (nun UUID-basiert)
# WP-24c: Import für die deterministische ID-Vorabberechnung (UUID-basiert)
from app.core.graph.graph_utils import _mk_edge_id
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
@ -41,7 +43,7 @@ from .ingestion_validation import validate_edge_candidate
from .ingestion_note_payload import make_note_payload
from .ingestion_chunk_payload import make_chunk_payloads
# Fallback für Edges (Struktur-Verknüpfung)
# Fallback für Edges
try:
from app.core.graph.graph_derive_edges import build_edges_for_note
except ImportError:
@ -56,10 +58,9 @@ class IngestionService:
self.settings = get_settings()
# --- LOGGING CLEANUP (Business Focus) ---
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("qdrant_client").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Unterdrückt technische Bibliotheks-Meldungen im Log-File und Konsole
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
logging.getLogger(lib).setLevel(logging.WARNING)
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
@ -76,9 +77,12 @@ class IngestionService:
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache
# WP-24c: Laufzeit-Speicher für explizite Kanten-IDs im aktuellen Batch
self.processed_explicit_ids = set()
try:
# Aufruf der modularisierten Schema-Logik
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
@ -104,6 +108,7 @@ class IngestionService:
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
"""
WP-15b: Two-Pass Ingestion Workflow.
Implementiert Batch-Import mit Symmetrie-Gedächtnis.
"""
self.processed_explicit_ids.clear()
logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---")
@ -112,6 +117,7 @@ class IngestionService:
try:
ctx = pre_scan_markdown(path, registry=self.registry)
if ctx:
# Look-up Index für Note_IDs und Titel
self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx
fname = os.path.splitext(os.path.basename(path))[0]
@ -197,7 +203,9 @@ class IngestionService:
is_valid = await validate_edge_candidate(
ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
)
logger.info(f" 🧠 [SMART EDGE] {cand['target_id']} -> {'✅ OK' if is_valid else '❌ SKIP'}")
# Fix (v3.2.0): Symmetrisches Logging ohne KeyError-Risiko
target_label = cand.get('target_id') or cand.get('note_id') or 'Unbekannt'
logger.info(f" 🧠 [SMART EDGE] {target_label} -> {'✅ OK' if is_valid else '❌ SKIP'}")
if is_valid: new_pool.append(cand)
else:
new_pool.append(cand)
@ -206,6 +214,7 @@ class IngestionService:
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Aggregation aller Kanten
raw_edges = build_edges_for_note(
note_id, chunk_pls,
note_level_references=note_pl.get("references", []),
@ -219,7 +228,6 @@ class IngestionService:
for e in raw_edges:
target_raw = e.get("target_id")
if not self._is_valid_note_id(target_raw, provenance="explicit"):
logger.warning(f" ⚠️ Ignoriere Kante zu '{target_raw}' (Ungültige ID)")
continue
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
@ -246,7 +254,6 @@ class IngestionService:
is_in_batch = potential_id in self.processed_explicit_ids
# Real-Time DB Check (Sync)
is_in_db = False
if not is_in_batch:
is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id)
@ -264,9 +271,10 @@ class IngestionService:
edges = final_edges
# 4. DB Upsert
# 4. DB Upsert via modularisierter Points-Logik
if apply:
if purge_before: purge_artifacts(self.client, self.prefix, note_id)
if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id)
upsert_batch(self.client, f"{self.prefix}_notes", points_for_note(self.prefix, note_pl, None, self.dim)[1])
if chunk_pls and vecs:
@ -284,6 +292,7 @@ class IngestionService:
"""Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content)
with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)