Update ingestion_processor.py to version 3.3.0: Integrate global authority mapping and enhance two-pass ingestion workflow. Improve logging mechanisms and edge validation logic, ensuring robust handling of explicit edges and authority protection. Adjust documentation for clarity and accuracy.

This commit is contained in:
Lars 2026-01-09 23:04:19 +01:00
parent 4318395c83
commit c9ae58725c

View File

@ -3,18 +3,17 @@ FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator). DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem AUTHORITY-SET.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. AUDIT v3.3.0: Einführung der Global Authority Map. Verhindert
AUDIT v3.2.1: Fix für ID-Kanonisierung in Phase 1 & 2, zuverlässig das Überschreiben expliziter Kanten.
robuster Smart-Edge-Logger und Business-Logging. VERSION: 3.3.0 (WP-24c: Multi-Pass Authority Enforcement)
VERSION: 3.2.1 (WP-24c: Canonical Authority Protection)
STATUS: Active STATUS: Active
""" """
import logging import logging
import asyncio import asyncio
import os import os
import re import re
from typing import Dict, List, Optional, Tuple, Any from typing import Dict, List, Optional, Tuple, Any, Set
# Core Module Imports # Core Module Imports
from app.core.parser import ( from app.core.parser import (
@ -22,10 +21,10 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext validate_required_frontmatter, NoteContext
) )
from app.core.chunking import assemble_chunks from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische ID-Vorabberechnung # WP-24c: Import für die deterministische UUID-Vorabberechnung
from app.core.graph.graph_utils import _mk_edge_id from app.core.graph.graph_utils import _mk_edge_id
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene # Datenbank-Ebene
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
@ -57,15 +56,12 @@ class IngestionService:
self.settings = get_settings() self.settings = get_settings()
# --- LOGGING CLEANUP (Business Focus) --- # --- LOGGING CLEANUP (Business Focus) ---
# Unterdrückt Bibliotheks-Lärm in Konsole und Datei (via tee) # Unterdrückt Bibliotheks-Lärm in Konsole und Datei
logging.getLogger("httpx").setLevel(logging.WARNING) for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger(lib).setLevel(logging.WARNING)
logging.getLogger("qdrant_client").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env() self.cfg = QdrantConfig.from_env()
# Synchronisierung der Konfiguration mit dem Instanz-Präfix
self.cfg.prefix = self.prefix self.cfg.prefix = self.prefix
self.client = get_client(self.cfg) self.client = get_client(self.cfg)
@ -77,12 +73,11 @@ class IngestionService:
embed_cfg = self.llm.profiles.get("embedding_expert", {}) embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache self.batch_cache: Dict[str, NoteContext] = {} # Globaler Kontext-Cache
# WP-24c: Laufzeit-Speicher für explizite Kanten-IDs im aktuellen Batch # WP-24c: Globaler Speicher für alle expliziten Kanten-IDs im gesamten Vault
self.processed_explicit_ids = set() self.vault_authority_ids: Set[str] = set()
try: try:
# Aufruf der modularisierten Schema-Logik # Aufruf der modularisierten Schema-Logik
@ -91,45 +86,56 @@ class IngestionService:
except Exception as e: except Exception as e:
logger.warning(f"DB initialization warning: {e}") logger.warning(f"DB initialization warning: {e}")
def _is_valid_note_id(self, text: str, provenance: str = "explicit") -> bool: def _resolve_target_id(self, target_raw: str) -> Optional[str]:
""" """
WP-24c: Prüft Ziel-Strings auf Validität. Löst einen Ziel-String (Titel, ID oder Pfad) gegen den batch_cache auf.
User-Links (explicit) werden weniger gefiltert als System-Symmetrien. Dies ist der zentrale Filter gegen Junk-Links.
""" """
if not text or len(text.strip()) < 2: if not target_raw: return None
return False # Direkter Look-up im 3-Wege-Index (ID, Titel, Filename)
ctx = self.batch_cache.get(target_raw)
# Nur System-Kanten (Symmetrie) filtern wir gegen die Typ-Blacklist return ctx.note_id if ctx else None
if provenance != "explicit":
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
if text.lower().strip() in blacklisted:
return False
if len(text) > 150: return False # Vermutlich ein ganzer Satz
return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
""" """
WP-15b: Two-Pass Ingestion Workflow. WP-15b: Two-Pass Ingestion Workflow mit Global Authority Mapping.
Pass 1: Pre-Scan füllt den Context-Cache.
Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung.
""" """
self.processed_explicit_ids.clear() self.vault_authority_ids.clear()
logger.info(f"--- 🔍 START BATCH IMPORT ({len(file_paths)} Dateien) ---") self.batch_cache.clear()
logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} Dateien & Erstelle Authority-Map...")
# 1. Schritt: Context-Cache füllen (Grundlage für ID-Auflösung)
for path in file_paths: for path in file_paths:
try: try:
# Übergabe der Registry für dynamische Scan-Tiefe
ctx = pre_scan_markdown(path, registry=self.registry) ctx = pre_scan_markdown(path, registry=self.registry)
if ctx: if ctx:
# Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname)
self.batch_cache[ctx.note_id] = ctx self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx self.batch_cache[ctx.title] = ctx
fname = os.path.splitext(os.path.basename(path))[0] fname = os.path.splitext(os.path.basename(path))[0]
self.batch_cache[fname] = ctx self.batch_cache[fname] = ctx
except Exception as e: except Exception as e:
logger.warning(f"⚠️ Pre-scan failed for {path}: {e}") logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
# 2. Schritt: Alle expliziten Links im gesamten Vault registrieren
# Wir berechnen die UUIDs aller manuellen Links, um sie später zu schützen.
for note_id, ctx in self.batch_cache.items():
# Wir nutzen nur die Note_ID Einträge (Regex für Datums-ID)
if not re.match(r'^\d{12}', note_id): continue
if hasattr(ctx, 'links'):
for link in ctx.links:
t_id = self._resolve_target_id(link.get("to"))
if t_id:
# Link-Typ kanonisieren
kind = edge_registry.resolve(link.get("kind", "related_to"))
# Eindeutige ID generieren (exakt wie sie in Qdrant landen würde)
edge_id = _mk_edge_id(kind, ctx.note_id, t_id, "note")
self.vault_authority_ids.add(edge_id)
logger.info(f"✅ Context bereit. Authority-Map enthält {len(self.vault_authority_ids)} geschützte manuelle Kanten.")
# 3. Schritt: Verarbeitung der Dateien (Pass 2)
results = [] results = []
for p in file_paths: for p in file_paths:
res = await self.process_file(p, vault_root, apply=True, purge_before=True) res = await self.process_file(p, vault_root, apply=True, purge_before=True)
@ -139,7 +145,7 @@ class IngestionService:
return results return results
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
"""Transformiert eine Markdown-Datei in den Graphen.""" """Transformiert eine Markdown-Datei und schützt die Authority-Kanten."""
apply = kwargs.get("apply", False) apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False) force_replace = kwargs.get("force_replace", False)
purge_before = kwargs.get("purge_before", False) purge_before = kwargs.get("purge_before", False)
@ -174,7 +180,6 @@ class IngestionService:
) )
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
# BUSINESS LOG: Aktuelle Notiz
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})") logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
@ -209,9 +214,8 @@ class IngestionService:
is_valid = await validate_edge_candidate( is_valid = await validate_edge_candidate(
ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator" ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
) )
# Fix v3.2.1: Robuste ID-Auflösung für den Logger label = cand.get('target_id') or cand.get('note_id') or "Unknown"
t_label = cand.get('target_id') or cand.get('note_id') or cand.get('to') or "Unknown" logger.info(f" 🧠 [SMART EDGE] {label} -> {'✅ OK' if is_valid else '❌ SKIP'}")
logger.info(f" 🧠 [SMART EDGE] {t_label} -> {'✅ OK' if is_valid else '❌ SKIP'}")
if is_valid: new_pool.append(cand) if is_valid: new_pool.append(cand)
else: else:
new_pool.append(cand) new_pool.append(cand)
@ -220,39 +224,31 @@ class IngestionService:
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Aggregation aller finalen Kanten (Edges) # Aggregation aller finalen Kanten
raw_edges = build_edges_for_note( raw_edges = build_edges_for_note(
note_id, chunk_pls, note_id, chunk_pls,
note_level_references=note_pl.get("references", []), note_level_references=note_pl.get("references", []),
include_note_scope_refs=note_scope_refs include_note_scope_refs=note_scope_refs
) )
# --- WP-24c: Symmetrie-Injektion (Authority Implementation) --- # --- WP-24c: Symmetrie-Injektion mit Authority-Schutz ---
final_edges = [] final_edges = []
# PHASE 1: Alle expliziten Kanten registrieren # PHASE 1: Explizite Kanten (Priorität)
for e in raw_edges: for e in raw_edges:
target_raw = e.get("target_id") t_id = self._resolve_target_id(e.get("target_id"))
if not self._is_valid_note_id(target_raw, provenance="explicit"): if not t_id:
continue continue # Anti-Junk: Nur Kanten zu existierenden Notizen erlauben
resolved_kind = edge_registry.resolve( resolved_kind = edge_registry.resolve(
e.get("kind", "related_to"), e.get("kind", "related_to"),
provenance=e.get("provenance", "explicit"), provenance=e.get("provenance", "explicit"),
context={"file": file_path, "note_id": note_id} context={"file": file_path, "note_id": note_id}
) )
e["kind"] = resolved_kind e.update({
e["origin_note_id"] = note_id "kind": resolved_kind, "target_id": t_id,
e["virtual"] = False "origin_note_id": note_id, "virtual": False, "confidence": 1.0
e["confidence"] = e.get("confidence", 1.0) })
# Fix v3.2.1: Kanonisierung der Target-ID vor der Registrierung!
# Nur wenn wir hier die echte Note-ID nutzen, erkennt Phase 2 die Kollision.
t_ctx = self.batch_cache.get(target_raw)
t_canonical = t_ctx.note_id if t_ctx else target_raw
edge_id = _mk_edge_id(resolved_kind, note_id, t_canonical, e.get("scope", "note"))
self.processed_explicit_ids.add(edge_id)
final_edges.append(e) final_edges.append(e)
# PHASE 2: Symmetrische Kanten (Invers) # PHASE 2: Symmetrische Kanten (Invers)
@ -260,40 +256,33 @@ class IngestionService:
for e in explicit_only: for e in explicit_only:
kind = e["kind"] kind = e["kind"]
inv_kind = edge_registry.get_inverse(kind) inv_kind = edge_registry.get_inverse(kind)
target_raw = e.get("target_id") t_id = e["target_id"]
target_ctx = self.batch_cache.get(target_raw)
target_id = target_ctx.note_id if target_ctx else target_raw
if (inv_kind and target_id and target_id != note_id and self._is_valid_note_id(target_id, provenance="structure")): if (inv_kind and t_id and t_id != note_id):
# ID der potenziellen virtuellen Kante # ID der potenziellen virtuellen Kante berechnen
potential_id = _mk_edge_id(inv_kind, target_id, note_id, e.get("scope", "note")) potential_id = _mk_edge_id(inv_kind, t_id, note_id, "note")
is_in_batch = potential_id in self.processed_explicit_ids # AUTHORITY-CHECK: Wurde diese Relation irgendwo im Vault manuell gesetzt?
if potential_id not in self.vault_authority_ids:
# Real-Time DB Check (Sync) # Zusätzlicher Check gegen bereits persistierte DB-Autorität
is_in_db = False if not is_explicit_edge_present(self.client, self.prefix, potential_id):
if not is_in_batch:
is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id)
if not is_in_batch and not is_in_db:
if (inv_kind != kind or kind not in ["related_to", "references"]):
inv_edge = e.copy() inv_edge = e.copy()
inv_edge.update({ inv_edge.update({
"note_id": target_id, "target_id": note_id, "kind": inv_kind, "note_id": t_id, "target_id": note_id, "kind": inv_kind,
"virtual": True, "provenance": "structure", "confidence": 1.0, "virtual": True, "provenance": "structure", "confidence": 1.0,
"origin_note_id": note_id "origin_note_id": note_id
}) })
final_edges.append(inv_edge) final_edges.append(inv_edge)
logger.info(f" 🔄 [SYMMETRY] Gegenkante: {target_id} --({inv_kind})--> {note_id}") logger.info(f" 🔄 [SYMMETRY] Gegenkante: {t_id} --({inv_kind})--> {note_id}")
edges = final_edges edges = final_edges
# 4. DB Upsert via modularisierter Points-Logik # 4. DB Upsert
if apply: if apply:
if purge_before and old_payload: if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id) purge_artifacts(self.client, self.prefix, note_id)
# Speichern # Speichern der Haupt-Note
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts) upsert_batch(self.client, n_name, n_pts)