Refactor ingestion_processor.py for version 3.2.0: Integrate Mixture of Experts architecture, enhance logging stability, and improve edge validation. Update batch import process with symmetry memory and modularized schema logic. Adjust documentation for clarity and robustness.
This commit is contained in:
parent
7e4ea670b1
commit
00264a9653
|
|
@ -2,19 +2,18 @@
|
|||
FILE: app/core/ingestion/ingestion_processor.py
|
||||
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
|
||||
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
|
||||
WP-25a: Mixture of Experts (MoE) - LLM Edge Validation.
|
||||
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
||||
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
||||
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
||||
AUDIT v3.2.0: Fix für KeyError 'target_id', stabiles Logging
|
||||
und Priorisierung expliziter User-Kanten.
|
||||
VERSION: 3.2.0 (WP-24c: Stability & Business Logging)
|
||||
AUDIT v3.2.0: Fix für KeyError 'target_id', TypeError (Sync-Check)
|
||||
und Business-Centric Logging.
|
||||
VERSION: 3.2.0 (WP-24c: Stability & Full Feature Set)
|
||||
STATUS: Active
|
||||
"""
|
||||
import logging
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
|
||||
# Core Module Imports
|
||||
|
|
@ -23,7 +22,7 @@ from app.core.parser import (
|
|||
validate_required_frontmatter, NoteContext
|
||||
)
|
||||
from app.core.chunking import assemble_chunks
|
||||
# WP-24c: Import für die deterministische ID-Vorabberechnung (UUID-basiert)
|
||||
# WP-24c: Import für die deterministische ID-Vorabberechnung
|
||||
from app.core.graph.graph_utils import _mk_edge_id
|
||||
|
||||
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
|
||||
|
|
@ -43,7 +42,7 @@ from .ingestion_validation import validate_edge_candidate
|
|||
from .ingestion_note_payload import make_note_payload
|
||||
from .ingestion_chunk_payload import make_chunk_payloads
|
||||
|
||||
# Fallback für Edges
|
||||
# Fallback für Edges (Struktur-Verknüpfung)
|
||||
try:
|
||||
from app.core.graph.graph_derive_edges import build_edges_for_note
|
||||
except ImportError:
|
||||
|
|
@ -53,17 +52,20 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
class IngestionService:
|
||||
def __init__(self, collection_prefix: str = None):
|
||||
"""Initialisiert den Service und bereinigt das Logging von technischem Lärm."""
|
||||
"""Initialisiert den Service und nutzt die neue database-Infrastruktur."""
|
||||
from app.config import get_settings
|
||||
self.settings = get_settings()
|
||||
|
||||
# --- LOGGING CLEANUP (Business Focus) ---
|
||||
# Unterdrückt technische Bibliotheks-Meldungen im Log-File und Konsole
|
||||
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
|
||||
logging.getLogger(lib).setLevel(logging.WARNING)
|
||||
# --- LOGGING CLEANUP ---
|
||||
# Unterdrückt Bibliotheks-Lärm in Konsole und Datei
|
||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
||||
logging.getLogger("qdrant_client").setLevel(logging.WARNING)
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
|
||||
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
|
||||
self.cfg = QdrantConfig.from_env()
|
||||
# Synchronisierung der Konfiguration mit dem Instanz-Präfix
|
||||
self.cfg.prefix = self.prefix
|
||||
self.client = get_client(self.cfg)
|
||||
|
||||
|
|
@ -75,6 +77,7 @@ class IngestionService:
|
|||
embed_cfg = self.llm.profiles.get("embedding_expert", {})
|
||||
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
|
||||
|
||||
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist
|
||||
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
|
||||
self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache
|
||||
|
||||
|
|
@ -91,33 +94,35 @@ class IngestionService:
|
|||
def _is_valid_note_id(self, text: str, provenance: str = "explicit") -> bool:
|
||||
"""
|
||||
WP-24c: Prüft Ziel-Strings auf Validität.
|
||||
User-Authority (explicit) wird weniger gefiltert als System-Strukturen.
|
||||
User-Links (explicit) werden weniger gefiltert als System-Symmetrien.
|
||||
"""
|
||||
if not text or len(text.strip()) < 2:
|
||||
return False
|
||||
|
||||
# Nur System-Kanten (Symmetrie) filtern wir gegen die Typ-Blacklist
|
||||
# Symmetrie-Filter gegen Typ-Strings
|
||||
if provenance != "explicit":
|
||||
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
|
||||
if text.lower().strip() in blacklisted:
|
||||
return False
|
||||
|
||||
if len(text) > 150: return False # Vermutlich ein ganzer Satz
|
||||
if len(text) > 150: return False # Wahrscheinlich kein Titel
|
||||
return True
|
||||
|
||||
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
WP-15b: Two-Pass Ingestion Workflow.
|
||||
Implementiert Batch-Import mit Symmetrie-Gedächtnis.
|
||||
WP-15b: Implementiert den Two-Pass Ingestion Workflow.
|
||||
Pass 1: Pre-Scan füllt den Context-Cache.
|
||||
Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung.
|
||||
"""
|
||||
self.processed_explicit_ids.clear()
|
||||
logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---")
|
||||
|
||||
for path in file_paths:
|
||||
try:
|
||||
# Übergabe der Registry für dynamische Scan-Tiefe
|
||||
ctx = pre_scan_markdown(path, registry=self.registry)
|
||||
if ctx:
|
||||
# Look-up Index für Note_IDs und Titel
|
||||
# Mehrfache Indizierung für robusten Look-up
|
||||
self.batch_cache[ctx.note_id] = ctx
|
||||
self.batch_cache[ctx.title] = ctx
|
||||
fname = os.path.splitext(os.path.basename(path))[0]
|
||||
|
|
@ -203,9 +208,9 @@ class IngestionService:
|
|||
is_valid = await validate_edge_candidate(
|
||||
ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
|
||||
)
|
||||
# Fix (v3.2.0): Symmetrisches Logging ohne KeyError-Risiko
|
||||
target_label = cand.get('target_id') or cand.get('note_id') or 'Unbekannt'
|
||||
logger.info(f" 🧠 [SMART EDGE] {target_label} -> {'✅ OK' if is_valid else '❌ SKIP'}")
|
||||
# Fix v3.2.0: Sicherer Zugriff via .get() verhindert Crash bei fehlender target_id
|
||||
t_id = cand.get('target_id') or cand.get('note_id') or "Unknown"
|
||||
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
|
||||
if is_valid: new_pool.append(cand)
|
||||
else:
|
||||
new_pool.append(cand)
|
||||
|
|
@ -214,7 +219,7 @@ class IngestionService:
|
|||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
|
||||
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
|
||||
|
||||
# Aggregation aller Kanten
|
||||
# Aggregation aller finalen Kanten (Edges)
|
||||
raw_edges = build_edges_for_note(
|
||||
note_id, chunk_pls,
|
||||
note_level_references=note_pl.get("references", []),
|
||||
|
|
@ -230,12 +235,17 @@ class IngestionService:
|
|||
if not self._is_valid_note_id(target_raw, provenance="explicit"):
|
||||
continue
|
||||
|
||||
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
|
||||
e.update({
|
||||
"kind": resolved_kind, "origin_note_id": note_id,
|
||||
"virtual": False, "confidence": 1.0
|
||||
})
|
||||
resolved_kind = edge_registry.resolve(
|
||||
e.get("kind", "related_to"),
|
||||
provenance=e.get("provenance", "explicit"),
|
||||
context={"file": file_path, "note_id": note_id}
|
||||
)
|
||||
e["kind"] = resolved_kind
|
||||
e["origin_note_id"] = note_id
|
||||
e["virtual"] = False
|
||||
e["confidence"] = e.get("confidence", 1.0)
|
||||
|
||||
# Registrierung für Batch-Authority
|
||||
edge_id = _mk_edge_id(resolved_kind, note_id, target_raw, e.get("scope", "note"))
|
||||
self.processed_explicit_ids.add(edge_id)
|
||||
final_edges.append(e)
|
||||
|
|
@ -254,6 +264,7 @@ class IngestionService:
|
|||
|
||||
is_in_batch = potential_id in self.processed_explicit_ids
|
||||
|
||||
# Real-Time DB Check (Ohne 'await', da sync)
|
||||
is_in_db = False
|
||||
if not is_in_batch:
|
||||
is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id)
|
||||
|
|
@ -276,14 +287,23 @@ class IngestionService:
|
|||
if purge_before and old_payload:
|
||||
purge_artifacts(self.client, self.prefix, note_id)
|
||||
|
||||
upsert_batch(self.client, f"{self.prefix}_notes", points_for_note(self.prefix, note_pl, None, self.dim)[1])
|
||||
# Speichern
|
||||
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
||||
upsert_batch(self.client, n_name, n_pts)
|
||||
|
||||
if chunk_pls and vecs:
|
||||
upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
|
||||
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
|
||||
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
|
||||
|
||||
if edges:
|
||||
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, edges)[1])
|
||||
e_pts = points_for_edges(self.prefix, edges)[1]
|
||||
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
|
||||
|
||||
logger.info(f" ✨ Fertig: {len(chunk_pls)} Chunks, {len(edges)} Kanten.")
|
||||
return {"status": "success", "note_id": note_id, "edges_count": len(edges)}
|
||||
return {
|
||||
"path": file_path, "status": "success", "changed": True, "note_id": note_id,
|
||||
"chunks_count": len(chunk_pls), "edges_count": len(edges)
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
|
||||
return {**result, "error": str(e)}
|
||||
|
|
@ -292,7 +312,6 @@ class IngestionService:
|
|||
"""Erstellt eine Note aus einem Textstream."""
|
||||
target_path = os.path.join(vault_root, folder, filename)
|
||||
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
||||
with open(target_path, "w", encoding="utf-8") as f:
|
||||
f.write(markdown_content)
|
||||
with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content)
|
||||
await asyncio.sleep(0.1)
|
||||
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
|
||||
Loading…
Reference in New Issue
Block a user