Refactor ingestion_db.py and ingestion_processor.py: Enhance documentation for clarity, improve symmetry injection logic, and refine artifact purging process. Update versioning to 3.3.5 to reflect changes in functionality and maintainability, ensuring robust handling of explicit edges and authority checks.

This commit is contained in:
Lars 2026-01-10 07:25:43 +01:00
parent 29e334625e
commit 3f528f2184
2 changed files with 85 additions and 146 deletions

View File

@ -14,7 +14,7 @@ from app.core.database import collection_names
logger = logging.getLogger(__name__)
def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]:
"""Holt die Metadaten einer Note aus Qdrant."""
"""Holt die Metadaten einer Note aus Qdrant via Scroll."""
notes_col, _, _ = collection_names(prefix)
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
@ -25,7 +25,7 @@ def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optio
return None
def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]:
"""Prüft auf vorhandene Chunks und Edges."""
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges für eine Note."""
_, chunks_col, edges_col = collection_names(prefix)
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
@ -38,12 +38,11 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
"""
WP-24c: Prüft, ob eine Kante mit der gegebenen ID bereits als 'explizit' existiert.
Verhindert das Überschreiben von manuellem Wissen durch Symmetrie-Kanten.
WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert.
Verhindert das Überschreiben von manuellem Wissen durch Symmetrien.
"""
_, _, edges_col = collection_names(prefix)
try:
# retrieve ist der schnellste Weg, um einen Punkt via ID zu laden
res = client.retrieve(collection_name=edges_col, ids=[edge_id], with_payload=True)
if res and not res[0].payload.get("virtual", False):
return True
@ -52,13 +51,12 @@ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) ->
return False
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
"""Löscht Artefakte basierend auf ihrer Herkunft (Origin)."""
"""Löscht Artefakte basierend auf ihrer Herkunft (Origin-Purge)."""
_, chunks_col, edges_col = collection_names(prefix)
try:
chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter))
# Origin-basiertes Löschen schützt fremde inverse Kanten
edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))])
client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter))
logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.")

View File

@ -4,10 +4,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.3.2: 2-Phasen-Schreibstrategie & API-Kompatibilitäts Fix.
Garantiert Datenhoheit expliziter Kanten.
VERSION: 3.3.2 (WP-24c: Authority-First Batch Orchestration)
AUDIT v3.3.5: 2-Phasen-Strategie (Phase 2 erst nach allen Batches).
API-Fix für Dictionary-Rückgabe. Vollständiger Umfang.
VERSION: 3.3.5 (WP-24c: Global Symmetry Commitment)
STATUS: Active
"""
import logging
@ -22,7 +21,7 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext
)
from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische ID-Vorabberechnung
# WP-24c: Import für die deterministische UUID-Vorabberechnung
from app.core.graph.graph_utils import _mk_edge_id
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
@ -52,12 +51,11 @@ logger = logging.getLogger(__name__)
class IngestionService:
def __init__(self, collection_prefix: str = None):
"""Initialisiert den Service und nutzt die neue database-Infrastruktur."""
"""Initialisiert den Service und bereinigt das Logging."""
from app.config import get_settings
self.settings = get_settings()
# --- LOGGING CLEANUP (Business Focus) ---
# Unterdrückt Bibliotheks-Lärm in Konsole und Datei (via tee)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("qdrant_client").setLevel(logging.WARNING)
@ -65,7 +63,6 @@ class IngestionService:
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
# Synchronisierung der Konfiguration mit dem Instanz-Präfix
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
@ -73,58 +70,44 @@ class IngestionService:
self.embedder = EmbeddingsClient()
self.llm = LLMService()
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
# WP-15b: Kontext-Gedächtnis für ID-Auflösung
self.batch_cache: Dict[str, NoteContext] = {}
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion nach Persistierung)
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
self.symmetry_buffer: List[Dict[str, Any]] = []
try:
# Aufruf der modularisierten Schema-Logik
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB initialization warning: {e}")
def _is_valid_note_id(self, text: str) -> bool:
"""
WP-24c: Prüft Ziel-Strings auf fachliche Validität.
Verhindert Müll-Kanten zu System-Platzhaltern.
"""
if not text or len(text.strip()) < 2:
return False
# Blacklist für Begriffe, die keine echten Notizen sind
"""WP-24c: Verhindert Müll-Kanten zu System-Platzhaltern."""
if not text or len(text.strip()) < 2: return False
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
if text.lower().strip() in blacklisted:
return False
# Längere Titel zulassen (z.B. für Hubs), aber keine ganzen Sätze
if text.lower().strip() in blacklisted: return False
if len(text) > 200: return False
return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
"""
WP-15b: Two-Pass Ingestion Workflow mit 2-Phasen-Schreibstrategie.
WP-15b: Two-Pass Ingestion Workflow (PHASE 1).
Fix: Gibt Dictionary zurück, um Kompatibilität zum Importer-Script zu wahren.
"""
self.batch_cache.clear()
self.symmetry_buffer.clear()
logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---")
logger.info(f"--- 🔍 START BATCH (Phase 1) ---")
# 1. Schritt: Pre-Scan (Context-Cache füllen)
# 1. Pre-Scan (Context-Cache füllen)
for path in file_paths:
try:
ctx = pre_scan_markdown(path, registry=self.registry)
if ctx:
# Look-up Index für Note_IDs und Titel
self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx
fname = os.path.splitext(os.path.basename(path))[0]
@ -132,8 +115,7 @@ class IngestionService:
except Exception as e:
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
# 2. Schritt: PROCESSING (PHASE 1: AUTHORITY)
# Verarbeitet alle Dateien und schreibt NUR explizite Kanten in die DB.
# 2. Schritt: PROCESSING (NUR AUTHORITY)
processed_count = 0
success_count = 0
for p in file_paths:
@ -142,108 +124,87 @@ class IngestionService:
if res.get("status") == "success":
success_count += 1
# 3. Schritt: SYMMETRY INJECTION (PHASE 2)
# Erst jetzt, wo alle manuellen Kanten in Qdrant liegen, schreiben wir die Symmetrien.
if self.symmetry_buffer:
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen Live-DB...")
final_virtuals = []
for v_edge in self.symmetry_buffer:
# Eindeutige ID der potenziellen Symmetrie-Kante berechnen
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note"))
# Nur schreiben, wenn Qdrant sagt: "Keine manuelle Kante für diese ID vorhanden"
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
else:
logger.debug(f" 🛡️ Symmetrie unterdrückt (Manuelle Kante existiert): {v_id}")
if final_virtuals:
logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten.")
e_pts = points_for_edges(self.prefix, final_virtuals)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
logger.info(f"--- ✅ BATCH IMPORT BEENDET ---")
logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---")
return {
"status": "success",
"processed": processed_count,
"success": success_count,
"virtuals_added": len(self.symmetry_buffer)
"buffered_virtuals": len(self.symmetry_buffer)
}
async def commit_vault_symmetries(self) -> Dict[str, Any]:
"""
WP-24c: Führt PHASE 2 für den gesamten Vault aus.
Wird nach allen run_batch Aufrufen einmalig getriggert.
"""
if not self.symmetry_buffer:
return {"status": "skipped", "reason": "buffer_empty"}
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen die Instance-of-Truth...")
final_virtuals = []
for v_edge in self.symmetry_buffer:
# ID der potenziellen Symmetrie berechnen
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note"))
# Nur schreiben, wenn KEINE manuelle Kante in der DB existiert
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
else:
logger.debug(f" 🛡️ Schutz: Manuelle Kante verhindert Symmetrie {v_id}")
added_count = 0
if final_virtuals:
logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten.")
e_pts = points_for_edges(self.prefix, final_virtuals)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
added_count = len(final_virtuals)
self.symmetry_buffer.clear() # Puffer leeren
return {"status": "success", "added": added_count}
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
"""
Transformiert eine Markdown-Datei.
Schreibt Notes/Chunks/Explicit Edges sofort (Phase 1).
Befüllt den Symmetrie-Puffer für Phase 2.
"""
"""Transformiert Datei und befüllt den Symmetry-Buffer."""
apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False)
purge_before = kwargs.get("purge_before", False)
note_scope_refs = kwargs.get("note_scope_refs", False)
hash_source = kwargs.get("hash_source", "parsed")
hash_normalize = kwargs.get("hash_normalize", "canonical")
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
# 1. Parse & Lifecycle Gate
try:
# --- ORDNER-FILTER (.trash) ---
if any(part.startswith('.') for part in file_path.split(os.sep)):
return {**result, "status": "skipped", "reason": "hidden_folder"}
ingest_cfg = self.registry.get("ingestion_settings", {})
ignore_folders = ingest_cfg.get("ignore_folders", [".trash", ".obsidian", "templates"])
if any(folder in file_path for folder in ignore_folders):
return {**result, "status": "skipped", "reason": "folder_blacklist"}
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"}
note_type = resolve_note_type(self.registry, fm.get("type"))
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
note_id = note_pl["note_id"]
ingest_cfg = self.registry.get("ingestion_settings", {})
ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"])
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
current_status = fm.get("status", "draft").lower().strip()
if current_status in ignore_list:
return {**result, "status": "skipped", "reason": "lifecycle_filter"}
# Change Detection
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
if not (force_replace or not old_payload or c_miss or e_miss):
return {**result, "status": "unchanged", "note_id": note_id}
# 2. Payload & Change Detection
note_type = resolve_note_type(self.registry, fm.get("type"))
note_pl = make_note_payload(
parsed, vault_root=vault_root, file_path=file_path,
hash_source=hash_source, hash_normalize=hash_normalize,
types_cfg=self.registry
)
note_id = note_pl["note_id"]
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
new_hash = note_pl.get("hashes", {}).get(check_key)
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss):
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 3. Deep Processing (Chunking, Validation, Embedding)
try:
body_text = getattr(parsed, "body", "") or ""
edge_registry.ensure_latest()
# Deep Processing & MoE
profile = note_pl.get("chunk_profile", "sliding_standard")
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg)
chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg)
# --- WP-25a: MoE Semantische Kanten-Validierung ---
for ch in chunks:
new_pool = []
for cand in getattr(ch, "candidate_pool", []):
if cand.get("provenance") == "global_pool" and enable_smart:
is_valid = await validate_edge_candidate(
ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
)
if cand.get("provenance") == "global_pool" and chunk_cfg.get("enable_smart_edge_allocation"):
is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm)
t_id = cand.get('target_id') or cand.get('note_id') or "Unknown"
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
if is_valid: new_pool.append(cand)
@ -254,30 +215,20 @@ class IngestionService:
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Aggregation aller Kanten
raw_edges = build_edges_for_note(
note_id, chunk_pls,
note_level_references=note_pl.get("references", []),
include_note_scope_refs=note_scope_refs
)
# --- WP-24c: Symmetrie-Injektion (Authority Implementation) ---
# Kanten-Logik (Kanonisierung)
raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
explicit_edges = []
for e in raw_edges:
target_raw = e.get("target_id")
# ID-Resolution über den Context-Cache (Titel -> Note_ID)
target_ctx = self.batch_cache.get(target_raw)
target_id = target_ctx.note_id if target_ctx else target_raw
t_ctx = self.batch_cache.get(target_raw)
target_id = t_ctx.note_id if t_ctx else target_raw
if not self._is_valid_note_id(target_id): continue
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
# Echte physische Kante markieren (Phase 1)
e.update({
"kind": resolved_kind, "target_id": target_id,
"origin_note_id": note_id, "virtual": False, "confidence": 1.0
})
e.update({"kind": resolved_kind, "target_id": target_id, "origin_note_id": note_id, "virtual": False, "confidence": 1.0})
explicit_edges.append(e)
# Symmetrie-Kandidat für Phase 2 puffern
@ -291,28 +242,19 @@ class IngestionService:
})
self.symmetry_buffer.append(v_edge)
# 4. DB Upsert (Phase 1: Authority)
# 4. DB Upsert (Phase 1: Authority Only)
if apply:
if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id)
# Speichern der Haupt-Note
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts)
if chunk_pls and vecs:
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
if explicit_edges:
e_pts = points_for_edges(self.prefix, explicit_edges)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1])
logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.")
return {
"path": file_path, "status": "success", "changed": True, "note_id": note_id,
"chunks_count": len(chunk_pls), "edges_count": len(explicit_edges)
}
return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)}
except Exception as e:
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
return {**result, "error": str(e)}
@ -321,7 +263,6 @@ class IngestionService:
"""Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)