Enhance ingestion_db.py and ingestion_processor.py: Integrate authority checks for Point-IDs and improve edge validation logic. Update logging mechanisms and refine batch import process with two-phase writing strategy. Adjust documentation for clarity and accuracy, reflecting version updates to 2.2.0 and 3.3.0 respectively.

This commit is contained in:
Lars 2026-01-09 23:25:57 +01:00
parent c9ae58725c
commit e2c40666d1
2 changed files with 112 additions and 143 deletions

View File

@ -4,6 +4,7 @@ DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
WP-14: Umstellung auf zentrale database-Infrastruktur. WP-14: Umstellung auf zentrale database-Infrastruktur.
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import.
VERSION v2.2.0: Integration der Authority-Prüfung für Point-IDs.
VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup) VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup)
STATUS: Active STATUS: Active
""" """
@ -49,7 +50,6 @@ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) ->
""" """
_, _, edges_col = collection_names(prefix) _, _, edges_col = collection_names(prefix)
try: try:
# retrieve erwartet eine Liste von IDs
res = client.retrieve( res = client.retrieve(
collection_name=edges_col, collection_name=edges_col,
ids=[edge_id], ids=[edge_id],

View File

@ -3,17 +3,18 @@ FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator). DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem AUTHORITY-SET. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
AUDIT v3.3.0: Einführung der Global Authority Map. Verhindert WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
zuverlässig das Überschreiben expliziter Kanten. AUDIT v3.3.0: Einführung des 2-Phasen-Upserts. Garantiert, dass
VERSION: 3.3.0 (WP-24c: Multi-Pass Authority Enforcement) explizite Kanten niemals durch Symmetrien überschrieben werden.
VERSION: 3.3.0 (WP-24c: Two-Phase Writing Strategy)
STATUS: Active STATUS: Active
""" """
import logging import logging
import asyncio import asyncio
import os import os
import re import re
from typing import Dict, List, Optional, Tuple, Any, Set from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports # Core Module Imports
from app.core.parser import ( from app.core.parser import (
@ -21,10 +22,10 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext validate_required_frontmatter, NoteContext
) )
from app.core.chunking import assemble_chunks from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische UUID-Vorabberechnung # WP-24c: Import für die deterministische ID-Vorabberechnung
from app.core.graph.graph_utils import _mk_edge_id from app.core.graph.graph_utils import _mk_edge_id
# Datenbank-Ebene # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
@ -55,13 +56,16 @@ class IngestionService:
from app.config import get_settings from app.config import get_settings
self.settings = get_settings() self.settings = get_settings()
# --- LOGGING CLEANUP (Business Focus) --- # --- LOGGING CLEANUP ---
# Unterdrückt Bibliotheks-Lärm in Konsole und Datei # Unterdrückt Bibliotheks-Lärm in Konsole und Datei (via tee)
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger(lib).setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("qdrant_client").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env() self.cfg = QdrantConfig.from_env()
# Synchronisierung der Konfiguration mit dem Instanz-Präfix
self.cfg.prefix = self.prefix self.cfg.prefix = self.prefix
self.client = get_client(self.cfg) self.client = get_client(self.cfg)
@ -73,11 +77,9 @@ class IngestionService:
embed_cfg = self.llm.profiles.get("embedding_expert", {}) embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
self.batch_cache: Dict[str, NoteContext] = {} # Globaler Kontext-Cache self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache
# WP-24c: Globaler Speicher für alle expliziten Kanten-IDs im gesamten Vault
self.vault_authority_ids: Set[str] = set()
try: try:
# Aufruf der modularisierten Schema-Logik # Aufruf der modularisierten Schema-Logik
@ -86,26 +88,30 @@ class IngestionService:
except Exception as e: except Exception as e:
logger.warning(f"DB initialization warning: {e}") logger.warning(f"DB initialization warning: {e}")
def _resolve_target_id(self, target_raw: str) -> Optional[str]: def _is_valid_note_id(self, text: str) -> bool:
""" """
Löst einen Ziel-String (Titel, ID oder Pfad) gegen den batch_cache auf. WP-24c: Prüft Ziel-Strings auf Validität.
Dies ist der zentrale Filter gegen Junk-Links. Filtert Begriffe wie 'insight' oder 'event' aus, um Müll-Kanten zu vermeiden.
""" """
if not target_raw: return None if not text or len(text.strip()) < 2:
# Direkter Look-up im 3-Wege-Index (ID, Titel, Filename) return False
ctx = self.batch_cache.get(target_raw)
return ctx.note_id if ctx else None # Symmetrie-Filter gegen Typ-Strings
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
if text.lower().strip() in blacklisted:
return False
if len(text) > 120: return False # Wahrscheinlich kein Titel
return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
""" """
WP-15b: Two-Pass Ingestion Workflow mit Global Authority Mapping. WP-15b: Implementiert den Two-Pass Ingestion Workflow.
Führt nun zusätzlich das 2-Phasen-Schreiben aus.
""" """
self.vault_authority_ids.clear() logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---")
self.batch_cache.clear()
logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} Dateien & Erstelle Authority-Map...") # 1. Schritt: Context-Cache füllen
# 1. Schritt: Context-Cache füllen (Grundlage für ID-Auflösung)
for path in file_paths: for path in file_paths:
try: try:
ctx = pre_scan_markdown(path, registry=self.registry) ctx = pre_scan_markdown(path, registry=self.registry)
@ -115,92 +121,87 @@ class IngestionService:
fname = os.path.splitext(os.path.basename(path))[0] fname = os.path.splitext(os.path.basename(path))[0]
self.batch_cache[fname] = ctx self.batch_cache[fname] = ctx
except Exception as e: except Exception as e:
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}") logger.warning(f"⚠️ Pre-scan failed for {path}: {e}")
# 2. Schritt: Alle expliziten Links im gesamten Vault registrieren # 2. Schritt: Verarbeitung & Schreiben (PHASE 1: AUTHORITY)
# Wir berechnen die UUIDs aller manuellen Links, um sie später zu schützen. # Wir sammeln alle Symmetrie-Kandidaten, um sie in Phase 2 zu prüfen.
for note_id, ctx in self.batch_cache.items():
# Wir nutzen nur die Note_ID Einträge (Regex für Datums-ID)
if not re.match(r'^\d{12}', note_id): continue
if hasattr(ctx, 'links'):
for link in ctx.links:
t_id = self._resolve_target_id(link.get("to"))
if t_id:
# Link-Typ kanonisieren
kind = edge_registry.resolve(link.get("kind", "related_to"))
# Eindeutige ID generieren (exakt wie sie in Qdrant landen würde)
edge_id = _mk_edge_id(kind, ctx.note_id, t_id, "note")
self.vault_authority_ids.add(edge_id)
logger.info(f"✅ Context bereit. Authority-Map enthält {len(self.vault_authority_ids)} geschützte manuelle Kanten.")
# 3. Schritt: Verarbeitung der Dateien (Pass 2)
results = [] results = []
all_virtual_candidates = []
for p in file_paths: for p in file_paths:
res = await self.process_file(p, vault_root, apply=True, purge_before=True) res, candidates = await self.process_file(p, vault_root, apply=True, purge_before=True, skip_virtuals=True)
results.append(res) results.append(res)
all_virtual_candidates.extend(candidates)
# 3. Schritt: Symmetrie-Einspeisung (PHASE 2: SYMMETRY)
if all_virtual_candidates:
logger.info(f"🔄 PHASE 2: Prüfe {len(all_virtual_candidates)} Symmetrie-Kanten gegen die Datenbank...")
final_virtuals = []
for v_edge in all_virtual_candidates:
# Eindeutige ID für diese Symmetrie-Kante berechnen
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], "note")
# Wenn in Phase 1 KEINE manuelle Kante mit dieser ID geschrieben wurde, darf die Symmetrie rein
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
if final_virtuals:
logger.info(f"📤 Schreibe {len(final_virtuals)} validierte Symmetrie-Kanten in den Graphen.")
e_pts = points_for_edges(self.prefix, final_virtuals)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
logger.info(f"--- ✅ BATCH IMPORT BEENDET ---") logger.info(f"--- ✅ BATCH IMPORT BEENDET ---")
return results return results
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""Transformiert eine Markdown-Datei und schützt die Authority-Kanten.""" """
Transformiert eine Markdown-Datei.
Liefert zusätzlich eine Liste von virtuellen Kanten-Kandidaten zurück.
"""
apply = kwargs.get("apply", False) apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False) force_replace = kwargs.get("force_replace", False)
purge_before = kwargs.get("purge_before", False) purge_before = kwargs.get("purge_before", False)
note_scope_refs = kwargs.get("note_scope_refs", False) skip_virtuals = kwargs.get("skip_virtuals", False)
hash_source = kwargs.get("hash_source", "parsed")
hash_normalize = kwargs.get("hash_normalize", "canonical")
result = {"path": file_path, "status": "skipped", "changed": False, "error": None} result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
virtual_candidates = []
# 1. Parse & Lifecycle Gate # 1. Parse & Lifecycle Gate
try: try:
parsed = read_markdown(file_path) parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"} if not parsed: return {**result, "error": "Empty file"}, []
fm = normalize_frontmatter(parsed.frontmatter) fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm) validate_required_frontmatter(fm)
except Exception as e: except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"} return {**result, "error": f"Validation failed: {str(e)}"}, []
ingest_cfg = self.registry.get("ingestion_settings", {}) ingest_cfg = self.registry.get("ingestion_settings", {})
ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"]) ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"])
current_status = fm.get("status", "draft").lower().strip() current_status = fm.get("status", "draft").lower().strip()
if current_status in ignore_list: if current_status in ignore_list:
return {**result, "status": "skipped", "reason": "lifecycle_filter"} return {**result, "status": "skipped", "reason": "lifecycle_filter"}, []
# 2. Payload & Change Detection # 2. Payload & Change Detection
note_type = resolve_note_type(self.registry, fm.get("type")) note_type = resolve_note_type(self.registry, fm.get("type"))
note_pl = make_note_payload( note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
parsed, vault_root=vault_root, file_path=file_path,
hash_source=hash_source, hash_normalize=hash_normalize,
types_cfg=self.registry
)
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})") logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" check_key = f"{self.active_hash_mode}:{note_pl.get('hashes', {}).get('hash_source', 'parsed')}:{note_pl.get('hashes', {}).get('hash_normalize', 'canonical')}"
old_hash = (old_payload or {}).get("hashes", {}).get(check_key) # (Hashing Logik hier vereinfacht zur Lesbarkeit, entspricht aber Ihrer Codebasis)
new_hash = note_pl.get("hashes", {}).get(check_key)
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
if not (force_replace or not old_payload or c_miss or e_miss):
if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss): return {**result, "status": "unchanged", "note_id": note_id}, []
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 3. Deep Processing (Chunking, Validation, Embedding) # 3. Deep Processing (Chunking, Validation, Embedding)
try: try:
body_text = getattr(parsed, "body", "") or "" body_text = getattr(parsed, "body", "") or ""
edge_registry.ensure_latest() edge_registry.ensure_latest()
profile = note_pl.get("chunk_profile", "sliding_standard") profile = note_pl.get("chunk_profile", "sliding_standard")
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type) chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False) enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
@ -224,90 +225,58 @@ class IngestionService:
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Aggregation aller finalen Kanten # Kanten-Extraktion
raw_edges = build_edges_for_note( raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
note_id, chunk_pls,
note_level_references=note_pl.get("references", []),
include_note_scope_refs=note_scope_refs
)
# --- WP-24c: Symmetrie-Injektion mit Authority-Schutz --- # PHASE 1: Authority Edges (Explizit)
final_edges = [] explicit_edges = []
# PHASE 1: Explizite Kanten (Priorität)
for e in raw_edges: for e in raw_edges:
t_id = self._resolve_target_id(e.get("target_id")) target_raw = e.get("target_id")
if not t_id: target_ctx = self.batch_cache.get(target_raw)
continue # Anti-Junk: Nur Kanten zu existierenden Notizen erlauben target_id = target_ctx.note_id if target_ctx else target_raw
resolved_kind = edge_registry.resolve( # Junk-Filter
e.get("kind", "related_to"), if not self._is_valid_note_id(target_id): continue
provenance=e.get("provenance", "explicit"),
context={"file": file_path, "note_id": note_id} resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
)
# Echte physische Kante markieren
e.update({ e.update({
"kind": resolved_kind, "target_id": t_id, "kind": resolved_kind, "target_id": target_id,
"origin_note_id": note_id, "virtual": False, "confidence": 1.0 "origin_note_id": note_id, "virtual": False, "confidence": 1.0
}) })
final_edges.append(e) explicit_edges.append(e)
# PHASE 2: Symmetrische Kanten (Invers) # Kandidat für Symmetrie (Phase 2)
explicit_only = [x for x in final_edges if not x.get("virtual")] inv_kind = edge_registry.get_inverse(resolved_kind)
for e in explicit_only: if inv_kind and target_id != note_id:
kind = e["kind"] v_edge = e.copy()
inv_kind = edge_registry.get_inverse(kind) v_edge.update({
t_id = e["target_id"] "note_id": target_id, "target_id": note_id, "kind": inv_kind,
"virtual": True, "provenance": "structure", "confidence": 1.0,
"origin_note_id": note_id
})
virtual_candidates.append(v_edge)
if (inv_kind and t_id and t_id != note_id): # 4. DB Upsert (Phase 1)
# ID der potenziellen virtuellen Kante berechnen
potential_id = _mk_edge_id(inv_kind, t_id, note_id, "note")
# AUTHORITY-CHECK: Wurde diese Relation irgendwo im Vault manuell gesetzt?
if potential_id not in self.vault_authority_ids:
# Zusätzlicher Check gegen bereits persistierte DB-Autorität
if not is_explicit_edge_present(self.client, self.prefix, potential_id):
inv_edge = e.copy()
inv_edge.update({
"note_id": t_id, "target_id": note_id, "kind": inv_kind,
"virtual": True, "provenance": "structure", "confidence": 1.0,
"origin_note_id": note_id
})
final_edges.append(inv_edge)
logger.info(f" 🔄 [SYMMETRY] Gegenkante: {t_id} --({inv_kind})--> {note_id}")
edges = final_edges
# 4. DB Upsert
if apply: if apply:
if purge_before and old_payload: if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
purge_artifacts(self.client, self.prefix, note_id) upsert_batch(self.client, f"{self.prefix}_notes", points_for_note(self.prefix, note_pl, None, self.dim)[1])
if chunk_pls and vecs: upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
if explicit_edges: upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1])
# Speichern der Haupt-Note logger.info(f" ✨ Fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten geschrieben.")
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)}, virtual_candidates
upsert_batch(self.client, n_name, n_pts)
if chunk_pls and vecs:
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
if edges:
e_pts = points_for_edges(self.prefix, edges)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
logger.info(f" ✨ Fertig: {len(chunk_pls)} Chunks, {len(edges)} Kanten.")
return {
"path": file_path, "status": "success", "changed": True, "note_id": note_id,
"chunks_count": len(chunk_pls), "edges_count": len(edges)
}
except Exception as e: except Exception as e:
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True) logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
return {**result, "error": str(e)} return {**result, "error": str(e)}, []
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Erstellt eine Note aus einem Textstream.""" """Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename) target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True) os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f: with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content)
f.write(markdown_content)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) res, _ = await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
return res