Update ingestion_db.py, ingestion_processor.py, and import_markdown.py: Enhance documentation and logging clarity, improve artifact purging and symmetry injection logic, and implement stricter authority checks. Update versioning to 2.6.0 and 3.3.7 to reflect changes in functionality and maintain compatibility with the ingestion service.

This commit is contained in:
Lars 2026-01-10 08:06:07 +01:00
parent 57656bbaaf
commit ec89d83916
3 changed files with 117 additions and 91 deletions

View File

@ -2,11 +2,11 @@
FILE: app/core/ingestion/ingestion_db.py
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
WP-14: Umstellung auf zentrale database-Infrastruktur.
WP-20/22: Integration von Cloud-Resilienz und Fehlerbehandlung.
WP-20/22: Cloud-Resilienz und Fehlerbehandlung.
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import.
Integration der Authority-Prüfung für Point-IDs zur Symmetrie-Validierung.
VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup)
VERSION: 2.2.1 (WP-24c: Robust Authority Lookup)
STATUS: Active
"""
import logging
@ -45,26 +45,57 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
"""
WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert.
Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen
durch virtuelle Symmetrie-Kanten zu verhindern.
Wird vom IngestionProcessor in Phase 2 genutzt, um das Überschreiben
von manuellem Wissen durch virtuelle Symmetrie-Kanten zu verhindern.
"""
if not edge_id: return False
_, _, edges_col = collection_names(prefix)
try:
res = client.retrieve(collection_name=edges_col, ids=[edge_id], with_payload=True)
if res and not res[0].payload.get("virtual", False):
# retrieve ist der schnellste Weg, um einen spezifischen Punkt via ID zu laden
res = client.retrieve(
collection_name=edges_col,
ids=[edge_id],
with_payload=True
)
# Wenn der Punkt existiert und NICHT virtuell ist, handelt es sich um eine Nutzer-Autorität
if res and len(res) > 0:
payload = res[0].payload
if not payload.get("virtual", False):
return True
return False
except Exception:
except Exception as e:
logger.debug(f"Authority check for {edge_id} failed: {e}")
return False
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
"""WP-24c: Selektives Löschen von Artefakten (Origin-Purge)."""
"""
WP-24c: Selektives Löschen von Artefakten vor einem Re-Import.
Implementiert das Origin-Purge-Prinzip zur Sicherung der bidirektionalen Graph-Integrität.
"""
_, chunks_col, edges_col = collection_names(prefix)
try:
chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter))
edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))])
client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter))
# 1. Chunks löschen (immer fest an die note_id gebunden)
chunks_filter = rest.Filter(must=[
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
])
client.delete(
collection_name=chunks_col,
points_selector=rest.FilterSelector(filter=chunks_filter)
)
# 2. WP-24c: Kanten löschen (HERKUNFTS-BASIERT via origin_note_id)
# Wir löschen alle Kanten, die von DIESER Note erzeugt wurden.
edges_filter = rest.Filter(must=[
rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))
])
client.delete(
collection_name=edges_col,
points_selector=rest.FilterSelector(filter=edges_filter)
)
logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.")
except Exception as e:
logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}")

View File

@ -5,10 +5,10 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.3.6: Strikte Phasentrennung (Phase 2 global am Ende).
Fix für .trash-Folder und Pydantic 'None'-Crash.
Vollständige Wiederherstellung des Business-Loggings.
VERSION: 3.3.6 (WP-24c: Full Transparency Orchestration)
AUDIT v3.3.7: Strikte globale Phasentrennung.
Fix für Pydantic Crash (None-ID Guard Clauses).
Erzwingung der Konsistenz (wait=True).
VERSION: 3.3.7 (WP-24c: Strict Authority Commitment)
STATUS: Active
"""
import logging
@ -26,7 +26,7 @@ from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische UUID-Vorabberechnung
from app.core.graph.graph_utils import _mk_edge_id
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest
@ -53,12 +53,12 @@ logger = logging.getLogger(__name__)
class IngestionService:
def __init__(self, collection_prefix: str = None):
"""Initialisiert den Service und bereinigt das technische Logging."""
"""Initialisiert den Service und nutzt die neue database-Infrastruktur."""
from app.config import get_settings
self.settings = get_settings()
# --- LOGGING CLEANUP (Business Focus) ---
# Unterdrückt HTTP-Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs
# Unterdrückt technische Bibliotheks-Header, erhält aber inhaltliche Service-Logs
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
logging.getLogger(lib).setLevel(logging.WARNING)
@ -71,47 +71,49 @@ class IngestionService:
self.embedder = EmbeddingsClient()
self.llm = LLMService()
# WP-25a: Dimensionen über das LLM-Profil auflösen
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
# Festlegen des Change-Detection Modus
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
# WP-15b: Kontext-Gedächtnis für ID-Auflösung (Global)
# WP-15b: Kontext-Gedächtnis für ID-Auflösung
self.batch_cache: Dict[str, NoteContext] = {}
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion nach dem gesamten Import)
self.symmetry_buffer: List[Dict[str, Any]] = []
try:
# Schema-Prüfung und Initialisierung
# Aufruf der modularisierten Schema-Logik
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB initialization warning: {e}")
def _is_valid_note_id(self, text: str) -> bool:
def _is_valid_note_id(self, text: Optional[str]) -> bool:
"""
WP-24c: Prüft Ziel-Strings auf fachliche Validität.
Verhindert Müll-Kanten zu reinen System-Platzhaltern.
Verhindert Müll-Kanten zu System-Platzhaltern.
"""
if not text or len(text.strip()) < 2: return False
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
if text.lower().strip() in blacklisted: return False
if not text or not isinstance(text, str) or len(text.strip()) < 2:
return False
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by", "none", "unknown"}
if text.lower().strip() in blacklisted:
return False
if len(text) > 200: return False
return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
"""
WP-15b: Two-Pass Ingestion Workflow (PHASE 1).
Füllt den Cache und verarbeitet Dateien batchweise.
Gibt ein Dictionary zurück, um Kompatibilität zum Orchestrator zu wahren.
Verarbeitet Batches und schreibt NUR Nutzer-Autorität in die DB.
"""
self.batch_cache.clear()
logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---")
# 1. Schritt: Pre-Scan (Context-Cache befüllen)
# 1. Schritt: Pre-Scan (Context-Cache füllen)
for path in file_paths:
try:
ctx = pre_scan_markdown(path, registry=self.registry)
@ -123,7 +125,7 @@ class IngestionService:
except Exception as e:
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
# 2. Schritt: Batch-Verarbeitung (Authority Only)
# 2. Schritt: PROCESSING
processed_count = 0
success_count = 0
for p in file_paths:
@ -132,52 +134,56 @@ class IngestionService:
if res.get("status") == "success":
success_count += 1
logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---")
logger.info(f"--- ✅ Batch Phase 1 beendet ({success_count}/{processed_count}) ---")
return {
"status": "success",
"processed": processed_count,
"success": success_count,
"buffered_virtuals": len(self.symmetry_buffer)
"buffered_symmetries": len(self.symmetry_buffer)
}
async def commit_vault_symmetries(self) -> Dict[str, Any]:
"""
WP-24c: Führt PHASE 2 (Symmetrie-Injektion) für den gesamten Vault aus.
Wird nach Abschluss aller Batches einmalig aufgerufen.
Vergleicht gepufferte Kanten gegen die Instance-of-Truth in Qdrant.
WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus.
Wird einmalig am Ende des gesamten Imports aufgerufen.
Sorgt dafür, dass virtuelle Kanten erst NACH der Nutzer-Autorität geschrieben werden.
"""
if not self.symmetry_buffer:
logger.info("⏭️ Symmetrie-Puffer ist leer. Keine Aktion erforderlich.")
logger.info("⏭️ Symmetrie-Puffer leer. Keine Aktion erforderlich.")
return {"status": "skipped", "reason": "buffer_empty"}
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen die Instance-of-Truth...")
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Vorschläge gegen Live-DB...")
final_virtuals = []
for v_edge in self.symmetry_buffer:
# Deterministische ID der potenziellen Symmetrie berechnen
# Sicherheits-Check: Keine Kanten ohne Ziele zulassen
if not v_edge.get("target_id") or v_edge.get("target_id") == "None":
continue
# ID der potenziellen Symmetrie berechnen
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note"))
# AUTHORITY-CHECK: Nur schreiben, wenn KEINE manuelle Kante in der DB existiert
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
# Detailliertes Logging für volle Transparenz
logger.info(f" 🔄 [SYMMETRY] Add inverse: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}")
logger.info(f" 🔄 [SYMMETRY] Erzeuge Gegenkante: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}")
else:
logger.debug(f" 🛡️ Schutz: Manuelle Kante belegt ID {v_id}. Symmetrie verworfen.")
added_count = 0
if final_virtuals:
logger.info(f"📤 Schreibe {len(final_virtuals)} validierte Symmetrie-Kanten in den Graphen.")
logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten in Qdrant.")
e_pts = points_for_edges(self.prefix, final_virtuals)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
upsert_batch(self.client, f"{self.prefix}_edges", e_pts, wait=True)
added_count = len(final_virtuals)
self.symmetry_buffer.clear() # Puffer nach erfolgreichem Commit leeren
self.symmetry_buffer.clear() # Puffer leeren
return {"status": "success", "added": added_count}
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
"""
Transformiert eine Markdown-Datei in Phase 1 (Authority First).
Implementiert Ordner-Blacklists, Pydantic-Safety und MoE-Validierung.
Transformiert eine Markdown-Datei.
Schreibt Notes/Chunks/Explicit Edges sofort (Phase 1).
Befüllt den Symmetrie-Puffer für die globale Phase 2.
"""
apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False)
@ -186,7 +192,7 @@ class IngestionService:
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
try:
# --- ORDNER-FILTER (Fix für .trash und .obsidian Junk) ---
# --- ORDNER-FILTER (.trash) ---
if any(part.startswith('.') for part in file_path.split(os.sep)):
return {**result, "status": "skipped", "reason": "hidden_folder"}
@ -195,7 +201,6 @@ class IngestionService:
if any(folder in file_path for folder in ignore_folders):
return {**result, "status": "skipped", "reason": "folder_blacklist"}
# Datei einlesen und validieren
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
@ -205,7 +210,7 @@ class IngestionService:
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
note_id = note_pl.get("note_id")
# --- FIX: Guard Clause gegen 'None' IDs (Verhindert Pydantic Crash) ---
# --- GUARD CLAUSE: Fehlende IDs verhindern PointStruct-Crash ---
if not note_id:
logger.warning(f" ⚠️ Fehlende note_id in '{file_path}'. Datei wird ignoriert.")
return {**result, "status": "error", "error": "missing_note_id"}
@ -221,7 +226,7 @@ class IngestionService:
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# Chunks erzeugen und semantisch validieren (MoE)
# Deep Processing & MoE (LLM Validierung)
profile = note_pl.get("chunk_profile", "sliding_standard")
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
@ -230,45 +235,45 @@ class IngestionService:
for ch in chunks:
new_pool = []
for cand in getattr(ch, "candidate_pool", []):
# --- GUARD: Ungültige Ziele im Candidate-Pool filtern ---
t_id = cand.get('target_id') or cand.get('note_id')
if not self._is_valid_note_id(t_id):
continue
if cand.get("provenance") == "global_pool" and enable_smart:
# Detailliertes Business-Logging für LLM-Aktivitäten
target_label = cand.get('target_id') or cand.get('note_id') or "Unknown"
logger.info(f" ⚖️ [VALIDATING] Relation to '{target_label}' via Expert-LLM...")
logger.info(f" ⚖️ [VALIDATING] Relation to '{t_id}' via Expert-LLM...")
is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm)
logger.info(f" 🧠 [SMART EDGE] {target_label} -> {'✅ OK' if is_valid else '❌ SKIP'}")
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
if is_valid: new_pool.append(cand)
else:
new_pool.append(cand)
ch.candidate_pool = new_pool
# Embeddings und Payloads
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Kanten-Extraktion mit ID-Kanonisierung
# Kanten-Logik (Kanonisierung via batch_cache)
raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
explicit_edges = []
for e in raw_edges:
target_raw = e.get("target_id")
# Auflösung von Titeln/Dateinamen zu echten IDs über den globalen Cache
target_ctx = self.batch_cache.get(target_raw)
target_id = target_ctx.note_id if target_ctx else target_raw
# ID-Resolution über den Context-Cache
t_ctx = self.batch_cache.get(target_raw)
target_id = t_ctx.note_id if t_ctx else target_raw
if not self._is_valid_note_id(target_id): continue
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
# Echte physische Kante markieren (Phase 1 Autorität)
# Echte physische Kante markieren (Phase 1 Authority)
e.update({
"kind": resolved_kind, "target_id": target_id,
"origin_note_id": note_id, "virtual": False, "confidence": 1.0
})
explicit_edges.append(e)
# Symmetrie-Kandidat für die globale Phase 2 puffern
# Symmetrie-Kandidat puffern
inv_kind = edge_registry.get_inverse(resolved_kind)
if inv_kind and target_id != note_id:
v_edge = e.copy()
@ -287,7 +292,8 @@ class IngestionService:
if chunk_pls and vecs:
upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
if explicit_edges:
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1])
# Wichtig: wait=True stellt sicher, dass die Kanten in Phase 2 searchable sind
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1], wait=True)
logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.")
return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)}
@ -300,7 +306,6 @@ class IngestionService:
"""Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)

View File

@ -2,19 +2,11 @@
# -*- coding: utf-8 -*-
"""
FILE: scripts/import_markdown.py
VERSION: 2.5.0 (2026-01-10)
VERSION: 2.6.0 (2026-01-10)
STATUS: Active (Core)
COMPATIBILITY: IngestionProcessor v3.3.5+
Zweck:
-------
Hauptwerkzeug zum Importieren von Markdown-Dateien aus einem Vault in Qdrant.
Implementiert die globale 2-Phasen-Schreibstrategie.
Änderungen v2.5.0:
------------------
- Globale Phasentrennung: commit_vault_symmetries() wird erst am Ende aufgerufen.
- Erweiterter Ordner-Filter: Schließt .trash und andere Systemordner aus.
COMPATIBILITY: IngestionProcessor v3.3.7+
Zweck: Hauptwerkzeug zum Importieren von Markdown-Dateien.
Implementiert die globale 2-Phasen-Schreibstrategie.
"""
import asyncio
import os
@ -24,10 +16,8 @@ import sys
from pathlib import Path
from dotenv import load_dotenv
# Setzt das Level global auf INFO
# Root Logger Setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
# Stelle sicher, dass das Root-Verzeichnis im Python-Pfad ist
sys.path.append(os.getcwd())
from app.core.ingestion import IngestionService
@ -41,14 +31,13 @@ async def main_async(args):
logger.error(f"Vault path does not exist: {vault_path}")
return
# 1. Service initialisieren
logger.info(f"Initializing IngestionService (Prefix: {args.prefix})")
service = IngestionService(collection_prefix=args.prefix)
logger.info(f"Scanning {vault_path}...")
all_files = list(vault_path.rglob("*.md"))
# --- ORDNER-FILTER ---
# --- GLOBALER ORDNER-FILTER ---
files = []
ignore_folders = [".trash", ".obsidian", ".sync", "templates", "_system"]
for f in all_files:
@ -74,7 +63,7 @@ async def main_async(args):
except Exception: pass
# =========================================================================
# PHASE 1: Batch-Import (Explicit Edges only)
# PHASE 1: Batch-Import (Notes & Explicit Edges)
# =========================================================================
stats = {"processed": 0, "skipped": 0, "errors": 0}
sem = asyncio.Semaphore(5)
@ -82,6 +71,7 @@ async def main_async(args):
async def process_with_limit(f_path):
async with sem:
try:
# Nutzt process_file (v3.3.7)
return await service.process_file(
file_path=str(f_path), vault_root=str(vault_path),
force_replace=args.force, apply=args.apply, purge_before=True
@ -101,15 +91,15 @@ async def main_async(args):
else: stats["skipped"] += 1
# =========================================================================
# PHASE 2: Global Symmetry Injection
# PHASE 2: Global Symmetry Injection (Nach Abschluss aller Batches)
# =========================================================================
if args.apply:
logger.info(f"🔄 [Phase 2] Starting global symmetry injection...")
logger.info(f"🔄 [Phase 2] Starting global symmetry injection for the entire vault...")
sym_res = await service.commit_vault_symmetries()
if sym_res.get("status") == "success":
logger.info(f"Added {sym_res.get('added', 0)} protected symmetry edges.")
logger.info(f"Finished global symmetry injection. Added: {sym_res.get('added', 0)}")
logger.info(f"Done. Final Stats: {stats}")
logger.info(f"Final Stats: {stats}")
def main():
load_dotenv()