Update qdrant_points.py, graph_utils.py, ingestion_db.py, ingestion_processor.py, and import_markdown.py: Enhance UUID generation for edge IDs, improve error handling, and refine documentation for clarity. Implement atomic consistency in batch upserts and ensure strict phase separation in the ingestion workflow. Update versioning to reflect changes in functionality and maintain compatibility with the ingestion service.
303 lines
14 KiB
Python
303 lines
14 KiB
Python
"""
|
|
FILE: app/core/ingestion/ingestion_processor.py
|
|
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
|
|
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
|
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
|
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
|
AUDIT v3.4.1: Strikte 2-Phasen-Strategie (Authority-First).
|
|
Lösung des Ghost-ID Problems via Cache-Resolution.
|
|
Fix für Pydantic 'None'-ID Crash.
|
|
VERSION: 3.4.1 (WP-24c: Robust Global Orchestration)
|
|
STATUS: Active
|
|
"""
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
import re
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
# Core Module Imports
|
|
from app.core.parser import (
|
|
read_markdown, pre_scan_markdown, normalize_frontmatter,
|
|
validate_required_frontmatter, NoteContext
|
|
)
|
|
from app.core.chunking import assemble_chunks
|
|
# WP-24c: Import für die deterministische ID-Vorabberechnung aus graph_utils
|
|
from app.core.graph.graph_utils import _mk_edge_id
|
|
|
|
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
|
|
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
|
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
|
|
from qdrant_client.http import models as rest
|
|
|
|
# Services
|
|
from app.services.embeddings_client import EmbeddingsClient
|
|
from app.services.edge_registry import registry as edge_registry
|
|
from app.services.llm_service import LLMService
|
|
|
|
# Package-Interne Imports (Refactoring WP-14)
|
|
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
|
|
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts, is_explicit_edge_present
|
|
from .ingestion_validation import validate_edge_candidate
|
|
from .ingestion_note_payload import make_note_payload
|
|
from .ingestion_chunk_payload import make_chunk_payloads
|
|
|
|
# Fallback für Edges (Struktur-Verknüpfung)
|
|
try:
|
|
from app.core.graph.graph_derive_edges import build_edges_for_note
|
|
except ImportError:
|
|
def build_edges_for_note(*args, **kwargs): return []
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class IngestionService:
|
|
def __init__(self, collection_prefix: str = None):
|
|
"""Initialisiert den Service und nutzt die neue database-Infrastruktur."""
|
|
from app.config import get_settings
|
|
self.settings = get_settings()
|
|
|
|
# --- LOGGING CLEANUP (Header-Noise unterdrücken, Business erhalten) ---
|
|
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
|
|
logging.getLogger(lib).setLevel(logging.WARNING)
|
|
|
|
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
|
|
self.cfg = QdrantConfig.from_env()
|
|
self.cfg.prefix = self.prefix
|
|
self.client = get_client(self.cfg)
|
|
|
|
self.registry = load_type_registry()
|
|
self.embedder = EmbeddingsClient()
|
|
self.llm = LLMService()
|
|
|
|
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
|
|
embed_cfg = self.llm.profiles.get("embedding_expert", {})
|
|
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
|
|
|
|
# Festlegen des Change-Detection Modus
|
|
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
|
|
|
|
# WP-15b: Kontext-Gedächtnis für ID-Auflösung (Globaler Cache)
|
|
self.batch_cache: Dict[str, NoteContext] = {}
|
|
|
|
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
|
|
self.symmetry_buffer: List[Dict[str, Any]] = []
|
|
|
|
try:
|
|
# Aufruf der modularisierten Schema-Logik
|
|
ensure_collections(self.client, self.prefix, self.dim)
|
|
ensure_payload_indexes(self.client, self.prefix)
|
|
except Exception as e:
|
|
logger.warning(f"DB initialization warning: {e}")
|
|
|
|
def _is_valid_id(self, text: Optional[str]) -> bool:
|
|
"""WP-24c: Prüft IDs auf fachliche Validität (Ghost-ID Schutz)."""
|
|
if not text or not isinstance(text, str) or len(text.strip()) < 2:
|
|
return False
|
|
blacklisted = {"none", "unknown", "insight", "source", "task", "project", "person", "concept"}
|
|
if text.lower().strip() in blacklisted:
|
|
return False
|
|
return True
|
|
|
|
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
|
|
"""
|
|
WP-15b: Phase 1 des Two-Pass Ingestion Workflows.
|
|
Verarbeitet Batches und schreibt NUR Nutzer-Autorität (explizite Kanten).
|
|
"""
|
|
self.batch_cache.clear()
|
|
logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---")
|
|
|
|
# 1. Schritt: Pre-Scan (Context-Cache füllen)
|
|
for path in file_paths:
|
|
try:
|
|
ctx = pre_scan_markdown(path, registry=self.registry)
|
|
if ctx:
|
|
self.batch_cache[ctx.note_id] = ctx
|
|
self.batch_cache[ctx.title] = ctx
|
|
# Auch Dateinamen ohne Endung auflösbar machen
|
|
self.batch_cache[os.path.splitext(os.path.basename(path))[0]] = ctx
|
|
except Exception as e:
|
|
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
|
|
|
|
# 2. Schritt: Batch Processing (Authority Only)
|
|
processed_count = 0
|
|
success_count = 0
|
|
for p in file_paths:
|
|
processed_count += 1
|
|
res = await self.process_file(p, vault_root, apply=True, purge_before=True)
|
|
if res.get("status") == "success":
|
|
success_count += 1
|
|
|
|
logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---")
|
|
return {
|
|
"status": "success",
|
|
"processed": processed_count,
|
|
"success": success_count,
|
|
"buffered_symmetries": len(self.symmetry_buffer)
|
|
}
|
|
|
|
async def commit_vault_symmetries(self) -> Dict[str, Any]:
|
|
"""
|
|
WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus.
|
|
Wird am Ende des gesamten Imports aufgerufen.
|
|
Sorgt dafür, dass virtuelle Kanten niemals Nutzer-Autorität überschreiben.
|
|
"""
|
|
if not self.symmetry_buffer:
|
|
logger.info("⏭️ Symmetrie-Puffer leer. Keine Aktion erforderlich.")
|
|
return {"status": "skipped", "reason": "buffer_empty"}
|
|
|
|
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrien gegen Live-DB...")
|
|
final_virtuals = []
|
|
for v_edge in self.symmetry_buffer:
|
|
src, tgt, kind = v_edge.get("note_id"), v_edge.get("target_id"), v_edge.get("kind")
|
|
if not src or not tgt: continue
|
|
|
|
# Deterministische ID berechnen (WP-24c Standard)
|
|
try:
|
|
v_id = _mk_edge_id(kind, src, tgt, "note")
|
|
except ValueError:
|
|
continue
|
|
|
|
# AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante in der DB existiert
|
|
if not is_explicit_edge_present(self.client, self.prefix, v_id):
|
|
final_virtuals.append(v_edge)
|
|
logger.info(f" 🔄 [SYMMETRY] Add inverse: {src} --({kind})--> {tgt}")
|
|
else:
|
|
logger.debug(f" 🛡️ Schutz: Manuelle Kante verhindert Symmetrie {v_id}")
|
|
|
|
if final_virtuals:
|
|
logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten in Qdrant.")
|
|
col, pts = points_for_edges(self.prefix, final_virtuals)
|
|
# Nutzt upsert_batch mit wait=True für atomare Konsistenz
|
|
upsert_batch(self.client, col, pts, wait=True)
|
|
|
|
count = len(final_virtuals)
|
|
self.symmetry_buffer.clear() # Puffer nach Commit leeren
|
|
return {"status": "success", "added": count}
|
|
|
|
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Transformiert eine Markdown-Datei (Phase 1).
|
|
Schreibt Notes/Chunks/Explicit Edges sofort.
|
|
Befüllt den Symmetrie-Puffer für Phase 2.
|
|
"""
|
|
apply = kwargs.get("apply", False)
|
|
force_replace = kwargs.get("force_replace", False)
|
|
purge_before = kwargs.get("purge_before", False)
|
|
|
|
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
|
|
|
|
try:
|
|
# Ordner-Filter (.trash / .obsidian)
|
|
if ".trash" in file_path or any(part.startswith('.') for part in file_path.split(os.sep)):
|
|
return {**result, "status": "skipped", "reason": "ignored_folder"}
|
|
|
|
# Datei einlesen und validieren
|
|
parsed = read_markdown(file_path)
|
|
if not parsed: return {**result, "error": "Empty file"}
|
|
fm = normalize_frontmatter(parsed.frontmatter)
|
|
validate_required_frontmatter(fm)
|
|
|
|
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
|
|
note_id = note_pl.get("note_id")
|
|
|
|
if not note_id:
|
|
logger.warning(f" ⚠️ Keine ID für {file_path}. Überspringe.")
|
|
return {**result, "status": "error", "error": "missing_id"}
|
|
|
|
logger.info(f"📄 Bearbeite: '{note_id}'")
|
|
|
|
# Change Detection & Fragment-Prüfung
|
|
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
|
|
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
|
|
if not (force_replace or not old_payload or c_miss or e_miss):
|
|
return {**result, "status": "unchanged", "note_id": note_id}
|
|
|
|
if not apply:
|
|
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
|
|
|
# Deep Processing & MoE (LLM Validierung)
|
|
profile = note_pl.get("chunk_profile", "sliding_standard")
|
|
note_type = resolve_note_type(self.registry, fm.get("type"))
|
|
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
|
|
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
|
|
chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg)
|
|
|
|
for ch in chunks:
|
|
new_pool = []
|
|
for cand in getattr(ch, "candidate_pool", []):
|
|
t_id = cand.get('target_id') or cand.get('note_id')
|
|
if not self._is_valid_id(t_id): continue
|
|
|
|
if cand.get("provenance") == "global_pool" and enable_smart:
|
|
# LLM Logging
|
|
logger.info(f" ⚖️ [VALIDATING] Relation to '{t_id}' via Experts...")
|
|
is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm)
|
|
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
|
|
if is_valid: new_pool.append(cand)
|
|
else:
|
|
new_pool.append(cand)
|
|
ch.candidate_pool = new_pool
|
|
|
|
# Embeddings erzeugen
|
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
|
|
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
|
|
|
|
# Kanten-Extraktion mit strikter Cache-Resolution (Fix für Ghost-IDs)
|
|
raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
|
|
|
|
explicit_edges = []
|
|
for e in raw_edges:
|
|
t_raw = e.get("target_id")
|
|
# Kanonisierung: Link-Auflösung über den globalen Cache
|
|
t_ctx = self.batch_cache.get(t_raw)
|
|
t_id = t_ctx.note_id if t_ctx else t_raw
|
|
|
|
if not self._is_valid_id(t_id): continue
|
|
|
|
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance="explicit")
|
|
e.update({"kind": resolved_kind, "target_id": t_id, "origin_note_id": note_id, "virtual": False})
|
|
explicit_edges.append(e)
|
|
|
|
# Symmetrie-Gegenkante für Phase 2 puffern
|
|
inv_kind = edge_registry.get_inverse(resolved_kind)
|
|
if inv_kind and t_id != note_id:
|
|
v_edge = e.copy()
|
|
v_edge.update({
|
|
"note_id": t_id,
|
|
"target_id": note_id,
|
|
"kind": inv_kind,
|
|
"virtual": True,
|
|
"origin_note_id": note_id
|
|
})
|
|
self.symmetry_buffer.append(v_edge)
|
|
|
|
# DB Upsert (Phase 1: Authority Commitment)
|
|
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
|
|
|
|
col_n, pts_n = points_for_note(self.prefix, note_pl, None, self.dim)
|
|
upsert_batch(self.client, col_n, pts_n, wait=True)
|
|
|
|
if chunk_pls and vecs:
|
|
col_c, pts_c = points_for_chunks(self.prefix, chunk_pls, vecs)
|
|
upsert_batch(self.client, col_c, pts_c, wait=True)
|
|
|
|
if explicit_edges:
|
|
col_e, pts_e = points_for_edges(self.prefix, explicit_edges)
|
|
# WICHTIG: wait=True garantiert, dass die Kanten indiziert sind, bevor Phase 2 prüft
|
|
upsert_batch(self.client, col_e, pts_e, wait=True)
|
|
|
|
logger.info(f" ✨ Phase 1 fertig: {len(explicit_edges)} explizite Kanten für '{note_id}'.")
|
|
return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
|
|
return {**result, "status": "error", "error": str(e)}
|
|
|
|
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
|
"""Erstellt eine Note aus einem Textstream und triggert die Ingestion."""
|
|
target_path = os.path.join(vault_root, folder, filename)
|
|
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
|
with open(target_path, "w", encoding="utf-8") as f:
|
|
f.write(markdown_content)
|
|
await asyncio.sleep(0.1)
|
|
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) |