Update ingestion_processor.py to version 3.3.2: Implement two-phase write strategy and API compatibility fix, ensuring data authority for explicit edges. Enhance logging clarity and adjust batch import process to maintain compatibility with importer script. Refine comments for improved understanding and maintainability.

This commit is contained in:
Lars 2026-01-10 06:43:31 +01:00
parent 981b0cba1f
commit 114cea80de

View File

@ -5,9 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.3.1: Strikte Trennung von Explicit-Write (Phase 1) und
Symmetry-Validation (Phase 2). 100% Datenhoheit für den Nutzer.
VERSION: 3.3.1 (WP-24c: Authority-First Ingestion)
AUDIT v3.3.2: 2-Phasen-Schreibstrategie & API-Kompatibilitäts Fix.
Garantiert Datenhoheit expliziter Kanten.
VERSION: 3.3.2 (WP-24c: Authority-First Batch Orchestration)
STATUS: Active
"""
import logging
@ -25,7 +25,7 @@ from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische UUID-Vorabberechnung
from app.core.graph.graph_utils import _mk_edge_id
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest
@ -56,7 +56,7 @@ class IngestionService:
from app.config import get_settings
self.settings = get_settings()
# --- LOGGING CLEANUP ---
# --- LOGGING CLEANUP (Business Focus) ---
# Unterdrückt Bibliotheks-Lärm in Konsole und Datei (via tee)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
@ -79,7 +79,12 @@ class IngestionService:
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
self.batch_cache: Dict[str, NoteContext] = {} # Globaler Kontext-Cache (Pass 1)
# WP-15b: Kontext-Gedächtnis für ID-Auflösung
self.batch_cache: Dict[str, NoteContext] = {}
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion)
self.symmetry_buffer: List[Dict[str, Any]] = []
try:
# Aufruf der modularisierten Schema-Logik
@ -91,7 +96,7 @@ class IngestionService:
def _is_valid_note_id(self, text: str) -> bool:
"""
WP-24c: Prüft Ziel-Strings auf fachliche Validität.
Verhindert das Anlegen von Kanten zu reinen System-Platzhaltern.
Verhindert Müll-Kanten zu System-Platzhaltern.
"""
if not text or len(text.strip()) < 2:
return False
@ -101,21 +106,25 @@ class IngestionService:
if text.lower().strip() in blacklisted:
return False
if len(text) > 120: return False # Wahrscheinlich kein Titel
# Längere Titel zulassen (z.B. für Hubs), aber keine ganzen Sätze
if len(text) > 200: return False
return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
"""
WP-15b: Two-Pass Ingestion Workflow mit 2-Phasen-Schreibstrategie.
Fix: Gibt Dictionary zurück, um Kompatibilität zum Importer-Script zu wahren.
"""
self.batch_cache.clear()
self.symmetry_buffer.clear()
logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---")
# SCHRITT 1: Pre-Scan (Context-Cache füllen)
# 1. Schritt: Pre-Scan (Context-Cache füllen)
for path in file_paths:
try:
ctx = pre_scan_markdown(path, registry=self.registry)
if ctx:
# Look-up Index für Note_IDs und Titel
self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx
fname = os.path.splitext(os.path.basename(path))[0]
@ -123,31 +132,30 @@ class IngestionService:
except Exception as e:
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
# SCHRITT 2: PHASE 1 (Authority-Schreiben)
# Wir verarbeiten alle Dateien und schreiben NUR explizite Kanten in die DB.
results = []
all_virtual_candidates = []
# 2. Schritt: PROCESSING (PHASE 1: AUTHORITY)
# Verarbeitet alle Dateien und schreibt NUR explizite Kanten in die DB.
processed_count = 0
success_count = 0
for p in file_paths:
# process_file liefert in dieser Version (res, virtual_candidates) zurück
res, candidates = await self.process_file(p, vault_root, apply=True, purge_before=True)
results.append(res)
all_virtual_candidates.extend(candidates)
processed_count += 1
res = await self.process_file(p, vault_root, apply=True, purge_before=True)
if res.get("status") == "success":
success_count += 1
# SCHRITT 3: PHASE 2 (Symmetrie-Ergänzung)
# Nachdem alle expliziten Kanten fest in Qdrant liegen, prüfen wir die Inversen.
if all_virtual_candidates:
logger.info(f"🔄 PHASE 2: Validiere {len(all_virtual_candidates)} Symmetrie-Kandidaten gegen Live-DB...")
# 3. Schritt: SYMMETRY INJECTION (PHASE 2)
# Erst jetzt, wo alle manuellen Kanten in Qdrant liegen, prüfen wir die Symmetrien.
if self.symmetry_buffer:
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen Live-DB...")
final_virtuals = []
for v_edge in all_virtual_candidates:
# Eindeutige ID berechnen (muss exakt der ID in Phase 1 entsprechen)
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], "note")
for v_edge in self.symmetry_buffer:
# Eindeutige ID der potenziellen Symmetrie-Kante berechnen
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note"))
# Check: Liegt dort bereits eine manuelle Kante?
# Nur schreiben, wenn Qdrant sagt: "Keine manuelle Kante für diese ID vorhanden"
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
else:
logger.debug(f" 🛡️ Symmetrie übersprungen (Manuelle Kante hat Vorrang): {v_id}")
logger.debug(f" 🛡️ Symmetrie unterdrückt (Manuelle Kante existiert): {v_id}")
if final_virtuals:
logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten.")
@ -155,13 +163,18 @@ class IngestionService:
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
logger.info(f"--- ✅ BATCH IMPORT BEENDET ---")
return results
return {
"status": "success",
"processed": processed_count,
"success": success_count,
"virtuals_added": len(self.symmetry_buffer)
}
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
"""
Transformiert eine Markdown-Datei.
Schreibt Notes/Chunks/Explicit Edges sofort (Phase 1).
Gibt potenzielle Symmetrien für Phase 2 zurück.
Befüllt den Symmetrie-Puffer für Phase 2.
"""
apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False)
@ -171,23 +184,22 @@ class IngestionService:
hash_normalize = kwargs.get("hash_normalize", "canonical")
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
virtual_candidates = []
# 1. Parse & Lifecycle Gate
try:
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}, []
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"}, []
return {**result, "error": f"Validation failed: {str(e)}"}
ingest_cfg = self.registry.get("ingestion_settings", {})
ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"])
current_status = fm.get("status", "draft").lower().strip()
if current_status in ignore_list:
return {**result, "status": "skipped", "reason": "lifecycle_filter"}, []
return {**result, "status": "skipped", "reason": "lifecycle_filter"}
# 2. Payload & Change Detection
note_type = resolve_note_type(self.registry, fm.get("type"))
@ -208,10 +220,10 @@ class IngestionService:
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss):
return {**result, "status": "unchanged", "note_id": note_id}, []
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}, []
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 3. Deep Processing (Chunking, Validation, Embedding)
try:
@ -232,6 +244,7 @@ class IngestionService:
is_valid = await validate_edge_candidate(
ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
)
# Fix (v3.3.2): Sicherer Zugriff via .get() verhindert Crash
t_id = cand.get('target_id') or cand.get('note_id') or "Unknown"
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
if is_valid: new_pool.append(cand)
@ -249,7 +262,7 @@ class IngestionService:
include_note_scope_refs=note_scope_refs
)
# PHASE 1: Authority-Check & Kanonisierung
# --- WP-24c: Symmetrie-Injektion (Authority Implementation) ---
explicit_edges = []
for e in raw_edges:
target_raw = e.get("target_id")
@ -261,14 +274,14 @@ class IngestionService:
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
# Echte explizite Kante für Phase 1
# Echte physische Kante markieren (Phase 1)
e.update({
"kind": resolved_kind, "target_id": target_id,
"origin_note_id": note_id, "virtual": False, "confidence": 1.0
})
explicit_edges.append(e)
# Symmetrie-Kandidat für Phase 2 vorbereiten
# Symmetrie-Kandidat für Phase 2 puffern
inv_kind = edge_registry.get_inverse(resolved_kind)
if inv_kind and target_id != note_id:
v_edge = e.copy()
@ -277,28 +290,33 @@ class IngestionService:
"virtual": True, "provenance": "structure", "confidence": 1.0,
"origin_note_id": note_id
})
virtual_candidates.append(v_edge)
self.symmetry_buffer.append(v_edge)
# 4. DB Upsert (Phase 1)
# 4. DB Upsert (Phase 1: Authority)
if apply:
if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id)
upsert_batch(self.client, f"{self.prefix}_notes", points_for_note(self.prefix, note_pl, None, self.dim)[1])
# Speichern der Haupt-Note
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts)
if chunk_pls and vecs:
upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
if explicit_edges:
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1])
e_pts = points_for_edges(self.prefix, explicit_edges)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.")
return {
"path": file_path, "status": "success", "changed": True, "note_id": note_id,
"chunks_count": len(chunk_pls), "edges_count": len(explicit_edges)
}, virtual_candidates
}
except Exception as e:
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
return {**result, "error": str(e)}, []
return {**result, "error": str(e)}
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Erstellt eine Note aus einem Textstream."""
@ -307,5 +325,4 @@ class IngestionService:
with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1)
res, _ = await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
return res
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)