Update ingestion_db.py and ingestion_processor.py: Refine documentation and enhance logging mechanisms. Improve edge validation logic with robust ID resolution and clarify comments for better understanding. Version updates to 2.2.1 and 3.2.1 respectively.

This commit is contained in:
Lars 2026-01-09 22:35:04 +01:00
parent 00264a9653
commit 4318395c83
2 changed files with 25 additions and 18 deletions

View File

@ -4,7 +4,6 @@ DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
WP-14: Umstellung auf zentrale database-Infrastruktur. WP-14: Umstellung auf zentrale database-Infrastruktur.
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import.
VERSION v2.2.0: Integration der Authority-Prüfung für Point-IDs.
VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup) VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup)
STATUS: Active STATUS: Active
""" """
@ -50,6 +49,7 @@ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) ->
""" """
_, _, edges_col = collection_names(prefix) _, _, edges_col = collection_names(prefix)
try: try:
# retrieve erwartet eine Liste von IDs
res = client.retrieve( res = client.retrieve(
collection_name=edges_col, collection_name=edges_col,
ids=[edge_id], ids=[edge_id],
@ -83,7 +83,7 @@ def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
# Dies umfasst: # Dies umfasst:
# - Alle ausgehenden Kanten (A -> B) # - Alle ausgehenden Kanten (A -> B)
# - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A) # - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A)
# Fremde virtuelle Kanten (C -> A) bleiben erhalten, da deren origin_note_id == C ist. # Fremde inverse Kanten (C -> A) bleiben erhalten.
edges_filter = rest.Filter(must=[ edges_filter = rest.Filter(must=[
rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id)) rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))
]) ])

View File

@ -5,9 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.2.0: Fix für KeyError 'target_id', TypeError (Sync-Check) AUDIT v3.2.1: Fix für ID-Kanonisierung in Phase 1 & 2,
und Business-Centric Logging. robuster Smart-Edge-Logger und Business-Logging.
VERSION: 3.2.0 (WP-24c: Stability & Full Feature Set) VERSION: 3.2.1 (WP-24c: Canonical Authority Protection)
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -56,8 +56,8 @@ class IngestionService:
from app.config import get_settings from app.config import get_settings
self.settings = get_settings() self.settings = get_settings()
# --- LOGGING CLEANUP --- # --- LOGGING CLEANUP (Business Focus) ---
# Unterdrückt Bibliotheks-Lärm in Konsole und Datei # Unterdrückt Bibliotheks-Lärm in Konsole und Datei (via tee)
logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("qdrant_client").setLevel(logging.WARNING) logging.getLogger("qdrant_client").setLevel(logging.WARNING)
@ -99,18 +99,18 @@ class IngestionService:
if not text or len(text.strip()) < 2: if not text or len(text.strip()) < 2:
return False return False
# Symmetrie-Filter gegen Typ-Strings # Nur System-Kanten (Symmetrie) filtern wir gegen die Typ-Blacklist
if provenance != "explicit": if provenance != "explicit":
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"} blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
if text.lower().strip() in blacklisted: if text.lower().strip() in blacklisted:
return False return False
if len(text) > 150: return False # Wahrscheinlich kein Titel if len(text) > 150: return False # Vermutlich ein ganzer Satz
return True return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
""" """
WP-15b: Implementiert den Two-Pass Ingestion Workflow. WP-15b: Two-Pass Ingestion Workflow.
Pass 1: Pre-Scan füllt den Context-Cache. Pass 1: Pre-Scan füllt den Context-Cache.
Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung. Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung.
""" """
@ -122,7 +122,7 @@ class IngestionService:
# Übergabe der Registry für dynamische Scan-Tiefe # Übergabe der Registry für dynamische Scan-Tiefe
ctx = pre_scan_markdown(path, registry=self.registry) ctx = pre_scan_markdown(path, registry=self.registry)
if ctx: if ctx:
# Mehrfache Indizierung für robusten Look-up # Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname)
self.batch_cache[ctx.note_id] = ctx self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx self.batch_cache[ctx.title] = ctx
fname = os.path.splitext(os.path.basename(path))[0] fname = os.path.splitext(os.path.basename(path))[0]
@ -174,6 +174,7 @@ class IngestionService:
) )
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
# BUSINESS LOG: Aktuelle Notiz
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})") logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
@ -208,9 +209,9 @@ class IngestionService:
is_valid = await validate_edge_candidate( is_valid = await validate_edge_candidate(
ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator" ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
) )
# Fix v3.2.0: Sicherer Zugriff via .get() verhindert Crash bei fehlender target_id # Fix v3.2.1: Robuste ID-Auflösung für den Logger
t_id = cand.get('target_id') or cand.get('note_id') or "Unknown" t_label = cand.get('target_id') or cand.get('note_id') or cand.get('to') or "Unknown"
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}") logger.info(f" 🧠 [SMART EDGE] {t_label} -> {'✅ OK' if is_valid else '❌ SKIP'}")
if is_valid: new_pool.append(cand) if is_valid: new_pool.append(cand)
else: else:
new_pool.append(cand) new_pool.append(cand)
@ -245,8 +246,12 @@ class IngestionService:
e["virtual"] = False e["virtual"] = False
e["confidence"] = e.get("confidence", 1.0) e["confidence"] = e.get("confidence", 1.0)
# Registrierung für Batch-Authority # Fix v3.2.1: Kanonisierung der Target-ID vor der Registrierung!
edge_id = _mk_edge_id(resolved_kind, note_id, target_raw, e.get("scope", "note")) # Nur wenn wir hier die echte Note-ID nutzen, erkennt Phase 2 die Kollision.
t_ctx = self.batch_cache.get(target_raw)
t_canonical = t_ctx.note_id if t_ctx else target_raw
edge_id = _mk_edge_id(resolved_kind, note_id, t_canonical, e.get("scope", "note"))
self.processed_explicit_ids.add(edge_id) self.processed_explicit_ids.add(edge_id)
final_edges.append(e) final_edges.append(e)
@ -260,11 +265,12 @@ class IngestionService:
target_id = target_ctx.note_id if target_ctx else target_raw target_id = target_ctx.note_id if target_ctx else target_raw
if (inv_kind and target_id and target_id != note_id and self._is_valid_note_id(target_id, provenance="structure")): if (inv_kind and target_id and target_id != note_id and self._is_valid_note_id(target_id, provenance="structure")):
# ID der potenziellen virtuellen Kante
potential_id = _mk_edge_id(inv_kind, target_id, note_id, e.get("scope", "note")) potential_id = _mk_edge_id(inv_kind, target_id, note_id, e.get("scope", "note"))
is_in_batch = potential_id in self.processed_explicit_ids is_in_batch = potential_id in self.processed_explicit_ids
# Real-Time DB Check (Ohne 'await', da sync) # Real-Time DB Check (Sync)
is_in_db = False is_in_db = False
if not is_in_batch: if not is_in_batch:
is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id) is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id)
@ -312,6 +318,7 @@ class IngestionService:
"""Erstellt eine Note aus einem Textstream.""" """Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename) target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True) os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content) with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)