Update the logging statement to provide additional context during the ingestion process by including the normalized file path and note title. This change aims to improve traceability and debugging capabilities in the ingestion workflow.
580 lines
32 KiB
Python
580 lines
32 KiB
Python
"""
|
|
FILE: app/core/ingestion/ingestion_processor.py
|
|
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
|
|
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
|
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
|
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
|
AUDIT v4.2.4:
|
|
- GOLD-STANDARD v4.2.4: Hash-basierte Change-Detection (MINDNET_CHANGE_DETECTION_MODE).
|
|
- Wiederherstellung des iterativen Abgleichs basierend auf Inhalts-Hashes.
|
|
- Phase 2 verwendet exakt dieselbe ID-Generierung wie Phase 1 (inkl. target_section).
|
|
- Authority-Check in Phase 2 prüft mit konsistenter ID-Generierung.
|
|
- Eliminiert Duplikate durch inkonsistente ID-Generierung (Steinzeitaxt-Problem).
|
|
VERSION: 4.2.4 (WP-24c: Hash-Integrität)
|
|
STATUS: Active
|
|
"""
|
|
import logging
|
|
import asyncio
|
|
import os
|
|
import re
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
# Core Module Imports
|
|
from app.core.parser import (
|
|
read_markdown, pre_scan_markdown, normalize_frontmatter,
|
|
validate_required_frontmatter, NoteContext
|
|
)
|
|
from app.core.chunking import assemble_chunks
|
|
# WP-24c: Import der zentralen Identitäts-Logik
|
|
from app.core.graph.graph_utils import _mk_edge_id
|
|
|
|
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
|
|
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
|
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
|
|
from qdrant_client.http import models as rest
|
|
|
|
# Services
|
|
from app.services.embeddings_client import EmbeddingsClient
|
|
from app.services.edge_registry import registry as edge_registry
|
|
from app.services.llm_service import LLMService
|
|
|
|
# Package-Interne Imports (Refactoring WP-14)
|
|
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
|
|
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts, is_explicit_edge_present
|
|
from .ingestion_validation import validate_edge_candidate
|
|
from .ingestion_note_payload import make_note_payload
|
|
from .ingestion_chunk_payload import make_chunk_payloads
|
|
|
|
# Fallback für Edges (Struktur-Verknüpfung)
|
|
try:
|
|
from app.core.graph.graph_derive_edges import build_edges_for_note
|
|
except ImportError:
|
|
def build_edges_for_note(*args, **kwargs): return []
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class IngestionService:
|
|
def __init__(self, collection_prefix: str = None):
|
|
"""Initialisiert den Service und nutzt die neue database-Infrastruktur."""
|
|
from app.config import get_settings
|
|
self.settings = get_settings()
|
|
|
|
# --- LOGGING CLEANUP ---
|
|
# Unterdrückt Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs
|
|
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
|
|
logging.getLogger(lib).setLevel(logging.WARNING)
|
|
|
|
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
|
|
self.cfg = QdrantConfig.from_env()
|
|
self.cfg.prefix = self.prefix
|
|
self.client = get_client(self.cfg)
|
|
|
|
self.registry = load_type_registry()
|
|
self.embedder = EmbeddingsClient()
|
|
self.llm = LLMService()
|
|
|
|
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
|
|
embed_cfg = self.llm.profiles.get("embedding_expert", {})
|
|
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
|
|
|
|
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
|
|
|
|
# WP-15b: Kontext-Gedächtnis für ID-Auflösung (Globaler Cache)
|
|
self.batch_cache: Dict[str, NoteContext] = {}
|
|
|
|
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
|
|
self.symmetry_buffer: List[Dict[str, Any]] = []
|
|
|
|
try:
|
|
ensure_collections(self.client, self.prefix, self.dim)
|
|
ensure_payload_indexes(self.client, self.prefix)
|
|
except Exception as e:
|
|
logger.warning(f"DB initialization warning: {e}")
|
|
|
|
def _persist_rejected_edges(self, note_id: str, rejected_edges: List[Dict[str, Any]]) -> None:
|
|
"""
|
|
WP-24c v4.5.9: Persistiert abgelehnte Kanten für Audit-Zwecke.
|
|
|
|
Schreibt rejected_edges in eine JSONL-Datei im _system Ordner oder logs/rejected_edges.log.
|
|
Dies ermöglicht die Analyse der Ablehnungsgründe und Verbesserung der Validierungs-Logik.
|
|
|
|
Args:
|
|
note_id: ID der Note, zu der die abgelehnten Kanten gehören
|
|
rejected_edges: Liste von abgelehnten Edge-Dicts
|
|
"""
|
|
if not rejected_edges:
|
|
return
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
|
|
# WP-24c v4.5.9: Erstelle Log-Verzeichnis falls nicht vorhanden
|
|
log_dir = "logs"
|
|
if not os.path.exists(log_dir):
|
|
os.makedirs(log_dir)
|
|
|
|
log_file = os.path.join(log_dir, "rejected_edges.log")
|
|
|
|
# WP-24c v4.5.9: Schreibe als JSONL (eine Kante pro Zeile)
|
|
try:
|
|
with open(log_file, "a", encoding="utf-8") as f:
|
|
for edge in rejected_edges:
|
|
log_entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"note_id": note_id,
|
|
"edge": {
|
|
"kind": edge.get("kind", "unknown"),
|
|
"source_id": edge.get("source_id", "unknown"),
|
|
"target_id": edge.get("target_id") or edge.get("to", "unknown"),
|
|
"scope": edge.get("scope", "unknown"),
|
|
"provenance": edge.get("provenance", "unknown"),
|
|
"rule_id": edge.get("rule_id", "unknown"),
|
|
"confidence": edge.get("confidence", 0.0),
|
|
"target_section": edge.get("target_section")
|
|
}
|
|
}
|
|
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
|
|
|
|
logger.debug(f"📝 [AUDIT] {len(rejected_edges)} abgelehnte Kanten für '{note_id}' in {log_file} gespeichert")
|
|
except Exception as e:
|
|
logger.error(f"❌ [AUDIT] Fehler beim Speichern der rejected_edges: {e}")
|
|
|
|
def _is_valid_id(self, text: Optional[str]) -> bool:
|
|
"""WP-24c: Prüft IDs auf fachliche Validität (Ghost-ID Schutz)."""
|
|
if not text or not isinstance(text, str) or len(text.strip()) < 2:
|
|
return False
|
|
blacklisted = {"none", "unknown", "insight", "source", "task", "project", "person", "concept"}
|
|
if text.lower().strip() in blacklisted:
|
|
return False
|
|
return True
|
|
|
|
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
|
|
"""
|
|
WP-15b: Phase 1 des Two-Pass Workflows.
|
|
Verarbeitet Batches und schreibt NUR Nutzer-Autorität (explizite Kanten).
|
|
"""
|
|
self.batch_cache.clear()
|
|
logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---")
|
|
|
|
# 1. Schritt: Pre-Scan (Context-Cache füllen)
|
|
for path in file_paths:
|
|
try:
|
|
ctx = pre_scan_markdown(path, registry=self.registry)
|
|
if ctx:
|
|
self.batch_cache[ctx.note_id] = ctx
|
|
self.batch_cache[ctx.title] = ctx
|
|
self.batch_cache[os.path.splitext(os.path.basename(path))[0]] = ctx
|
|
except Exception as e:
|
|
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
|
|
|
|
# 2. Schritt: Batch Processing (Authority Only)
|
|
processed_count = 0
|
|
success_count = 0
|
|
for p in file_paths:
|
|
processed_count += 1
|
|
res = await self.process_file(p, vault_root, apply=True, purge_before=True)
|
|
if res.get("status") == "success":
|
|
success_count += 1
|
|
|
|
logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---")
|
|
return {
|
|
"status": "success",
|
|
"processed": processed_count,
|
|
"success": success_count,
|
|
"buffered_symmetries": len(self.symmetry_buffer)
|
|
}
|
|
|
|
async def commit_vault_symmetries(self) -> Dict[str, Any]:
|
|
"""
|
|
WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus.
|
|
Wird am Ende des gesamten Imports aufgerufen.
|
|
"""
|
|
if not self.symmetry_buffer:
|
|
return {"status": "skipped", "reason": "buffer_empty"}
|
|
|
|
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrien gegen Live-DB...")
|
|
final_virtuals = []
|
|
for v_edge in self.symmetry_buffer:
|
|
# WP-24c v4.1.0: Korrekte Extraktion der Identitäts-Parameter
|
|
src = v_edge.get("source_id") or v_edge.get("note_id") # source_id hat Priorität
|
|
tgt = v_edge.get("target_id")
|
|
kind = v_edge.get("kind")
|
|
scope = v_edge.get("scope", "note")
|
|
target_section = v_edge.get("target_section") # WP-24c v4.1.0: target_section berücksichtigen
|
|
|
|
if not all([src, tgt, kind]):
|
|
continue
|
|
|
|
# WP-24c v4.1.0: Nutzung der zentralisierten ID-Logik aus graph_utils
|
|
# GOLD-STANDARD v4.1.0: ID-Generierung muss absolut synchron zu Phase 1 sein
|
|
# - Wenn target_section vorhanden, muss es in die ID einfließen
|
|
# - Dies stellt sicher, dass der Authority-Check korrekt funktioniert
|
|
try:
|
|
v_id = _mk_edge_id(kind, src, tgt, scope, target_section=target_section)
|
|
except ValueError:
|
|
continue
|
|
|
|
# AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante existiert
|
|
# Prüft mit exakt derselben ID, die in Phase 1 verwendet wurde (inkl. target_section)
|
|
if not is_explicit_edge_present(self.client, self.prefix, v_id):
|
|
final_virtuals.append(v_edge)
|
|
section_info = f" (section: {target_section})" if target_section else ""
|
|
logger.info(f" 🔄 [SYMMETRY] Add inverse: {src} --({kind})--> {tgt}{section_info}")
|
|
else:
|
|
logger.info(f" 🛡️ [PROTECTED] Manuelle Kante gefunden. Symmetrie für {kind} unterdrückt.")
|
|
|
|
if final_virtuals:
|
|
col, pts = points_for_edges(self.prefix, final_virtuals)
|
|
upsert_batch(self.client, col, pts, wait=True)
|
|
|
|
count = len(final_virtuals)
|
|
self.symmetry_buffer.clear()
|
|
return {"status": "success", "added": count}
|
|
|
|
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Transformiert eine Markdown-Datei (Phase 1).
|
|
Schreibt Notes/Chunks/Explicit Edges sofort.
|
|
"""
|
|
apply = kwargs.get("apply", False)
|
|
force_replace = kwargs.get("force_replace", False)
|
|
purge_before = kwargs.get("purge_before", False)
|
|
|
|
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
|
|
|
|
try:
|
|
# Ordner-Filter (.trash / .obsidian)
|
|
if ".trash" in file_path or any(part.startswith('.') for part in file_path.split(os.sep)):
|
|
return {**result, "status": "skipped", "reason": "ignored_folder"}
|
|
|
|
# WP-24c v4.5.9: Path-Normalization für konsistente Hash-Prüfung
|
|
# Normalisiere file_path zu absolutem Pfad für konsistente Verarbeitung
|
|
normalized_file_path = os.path.abspath(file_path) if not os.path.isabs(file_path) else file_path
|
|
|
|
parsed = read_markdown(normalized_file_path)
|
|
if not parsed: return {**result, "error": "Empty file"}
|
|
fm = normalize_frontmatter(parsed.frontmatter)
|
|
validate_required_frontmatter(fm)
|
|
|
|
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=normalized_file_path, types_cfg=self.registry)
|
|
note_id = note_pl.get("note_id")
|
|
|
|
if not note_id:
|
|
return {**result, "status": "error", "error": "missing_id"}
|
|
|
|
logger.info(f"📄 Bearbeite: '{note_id}' | Pfad: {normalized_file_path} | Title: {note_pl.get('title', 'N/A')}")
|
|
|
|
# WP-24c v4.5.9: Strikte Change Detection (Hash-basierte Inhaltsprüfung)
|
|
# Prüft Hash VOR der Verarbeitung, um redundante Ingestion zu vermeiden
|
|
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
|
|
|
|
# WP-24c v4.5.9-DEBUG: Erweiterte Diagnose-Logs für Change-Detection (INFO-Level für Sichtbarkeit)
|
|
logger.info(f"🔍 [CHANGE-DETECTION] Start für '{note_id}': force_replace={force_replace}, old_payload={old_payload is not None}")
|
|
|
|
content_changed = True
|
|
hash_match = False
|
|
if old_payload and not force_replace:
|
|
# Nutzt die über MINDNET_CHANGE_DETECTION_MODE gesteuerte Genauigkeit
|
|
# Mapping: 'full' -> 'full:parsed:canonical', 'body' -> 'body:parsed:canonical'
|
|
h_key = f"{self.active_hash_mode or 'full'}:parsed:canonical"
|
|
new_h = note_pl.get("hashes", {}).get(h_key)
|
|
old_h = old_payload.get("hashes", {}).get(h_key)
|
|
|
|
# WP-24c v4.5.9-DEBUG: Detaillierte Hash-Diagnose (INFO-Level)
|
|
logger.info(f"🔍 [CHANGE-DETECTION] Hash-Vergleich für '{note_id}':")
|
|
logger.info(f" -> Hash-Key: '{h_key}'")
|
|
logger.info(f" -> Active Hash-Mode: '{self.active_hash_mode or 'full'}'")
|
|
logger.info(f" -> New Hash vorhanden: {bool(new_h)}")
|
|
logger.info(f" -> Old Hash vorhanden: {bool(old_h)}")
|
|
if new_h:
|
|
logger.info(f" -> New Hash (erste 32 Zeichen): {new_h[:32]}...")
|
|
if old_h:
|
|
logger.info(f" -> Old Hash (erste 32 Zeichen): {old_h[:32]}...")
|
|
logger.info(f" -> Verfügbare Hash-Keys in new: {list(note_pl.get('hashes', {}).keys())}")
|
|
logger.info(f" -> Verfügbare Hash-Keys in old: {list(old_payload.get('hashes', {}).keys())}")
|
|
|
|
if new_h and old_h:
|
|
hash_match = (new_h == old_h)
|
|
if hash_match:
|
|
content_changed = False
|
|
logger.info(f"🔍 [CHANGE-DETECTION] ✅ Hash identisch für '{note_id}': {h_key} = {new_h[:16]}...")
|
|
else:
|
|
logger.warning(f"🔍 [CHANGE-DETECTION] ❌ Hash geändert für '{note_id}': alt={old_h[:16]}..., neu={new_h[:16]}...")
|
|
# Finde erste unterschiedliche Position
|
|
diff_pos = next((i for i, (a, b) in enumerate(zip(new_h, old_h)) if a != b), None)
|
|
if diff_pos is not None:
|
|
logger.info(f" -> Hash-Unterschied: Erste unterschiedliche Position: {diff_pos}")
|
|
else:
|
|
logger.info(f" -> Hash-Unterschied: Längen unterschiedlich (new={len(new_h)}, old={len(old_h)})")
|
|
|
|
# WP-24c v4.5.9-DEBUG: Logge Hash-Input für Diagnose
|
|
# WICHTIG: _get_hash_source_content benötigt ein Dictionary, nicht das ParsedNote-Objekt!
|
|
from app.core.ingestion.ingestion_note_payload import _get_hash_source_content, _as_dict
|
|
hash_mode = self.active_hash_mode or 'full'
|
|
# Konvertiere parsed zu Dictionary für _get_hash_source_content
|
|
parsed_dict = _as_dict(parsed)
|
|
hash_input = _get_hash_source_content(parsed_dict, hash_mode)
|
|
logger.info(f" -> Hash-Input (erste 200 Zeichen): {hash_input[:200]}...")
|
|
logger.info(f" -> Hash-Input Länge: {len(hash_input)}")
|
|
|
|
# WP-24c v4.5.9-DEBUG: Vergleiche auch Body-Länge und Frontmatter
|
|
# Verwende parsed.body statt note_pl.get("body")
|
|
new_body = str(getattr(parsed, "body", "") or "").strip()
|
|
old_body = str(old_payload.get("body", "")).strip() if old_payload else ""
|
|
logger.info(f" -> Body-Länge: new={len(new_body)}, old={len(old_body)}")
|
|
if len(new_body) != len(old_body):
|
|
logger.warning(f" -> ⚠️ Body-Länge unterschiedlich! Mögliche Ursache: Parsing-Unterschiede")
|
|
|
|
# Verwende parsed.frontmatter statt note_pl.get("frontmatter")
|
|
new_fm = getattr(parsed, "frontmatter", {}) or {}
|
|
old_fm = old_payload.get("frontmatter", {}) if old_payload else {}
|
|
logger.info(f" -> Frontmatter-Keys: new={sorted(new_fm.keys())}, old={sorted(old_fm.keys())}")
|
|
# Prüfe relevante Frontmatter-Felder
|
|
relevant_keys = ["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight", "split_level", "strict_heading_split"]
|
|
for key in relevant_keys:
|
|
new_val = new_fm.get(key) if isinstance(new_fm, dict) else getattr(new_fm, key, None)
|
|
old_val = old_fm.get(key) if isinstance(old_fm, dict) else None
|
|
if new_val != old_val:
|
|
logger.warning(f" -> ⚠️ Frontmatter '{key}' unterschiedlich: new={new_val}, old={old_val}")
|
|
else:
|
|
# WP-24c v4.5.9: Wenn Hash fehlt, als geändert behandeln (Sicherheit)
|
|
logger.warning(f"⚠️ [CHANGE-DETECTION] Hash fehlt für '{note_id}': new_h={bool(new_h)}, old_h={bool(old_h)}")
|
|
logger.info(f" -> Grund: Hash wird als 'geändert' behandelt, da Hash-Werte fehlen")
|
|
else:
|
|
if force_replace:
|
|
logger.info(f"🔍 [CHANGE-DETECTION] '{note_id}': force_replace=True -> überspringe Hash-Check")
|
|
elif not old_payload:
|
|
logger.warning(f"🔍 [CHANGE-DETECTION] '{note_id}': ⚠️ Keine alte Payload gefunden -> erste Verarbeitung oder gelöscht")
|
|
|
|
# WP-24c v4.5.9: Strikte Logik - überspringe komplett wenn Hash identisch
|
|
# WICHTIG: Artifact-Check NACH Hash-Check, da purge_before die Artefakte löschen kann
|
|
# Wenn Hash identisch ist, sind die Artefakte entweder vorhanden oder werden gerade neu geschrieben
|
|
if not force_replace and hash_match and old_payload:
|
|
# WP-24c v4.5.9: Hash identisch -> überspringe komplett (auch wenn Artefakte nach PURGE fehlen)
|
|
# Der Hash ist die autoritative Quelle für "Inhalt unverändert"
|
|
# Artefakte werden beim nächsten normalen Import wieder erstellt, wenn nötig
|
|
logger.info(f"⏭️ [SKIP] '{note_id}' unverändert (Hash identisch - überspringe komplett, auch wenn Artefakte fehlen)")
|
|
return {**result, "status": "unchanged", "note_id": note_id, "reason": "hash_identical"}
|
|
elif not force_replace and old_payload and not hash_match:
|
|
# WP-24c v4.5.9-DEBUG: Hash geändert - erlaube Verarbeitung
|
|
logger.info(f"🔍 [CHANGE-DETECTION] '{note_id}': Hash geändert -> erlaube Verarbeitung")
|
|
|
|
# WP-24c v4.5.9: Hash geändert oder keine alte Payload - prüfe Artefakte für normale Verarbeitung
|
|
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
|
|
logger.info(f"🔍 [CHANGE-DETECTION] '{note_id}': Artifact-Check: c_miss={c_miss}, e_miss={e_miss}")
|
|
|
|
if not apply:
|
|
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
|
|
|
# Chunks & MoE
|
|
profile = note_pl.get("chunk_profile", "sliding_standard")
|
|
note_type = resolve_note_type(self.registry, fm.get("type"))
|
|
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
|
|
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
|
|
chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg)
|
|
|
|
# WP-24c v4.5.8: Validierung in Chunk-Schleife entfernt
|
|
# Alle candidate: Kanten werden jetzt in Phase 3 (nach build_edges_for_note) validiert
|
|
# Dies stellt sicher, dass auch Note-Scope Kanten aus LLM-Validierungs-Zonen geprüft werden
|
|
# Der candidate_pool wird unverändert weitergegeben, damit build_edges_for_note alle Kanten erkennt
|
|
# WP-24c v4.5.8: Nur ID-Validierung bleibt (Ghost-ID Schutz), keine LLM-Validierung mehr hier
|
|
for ch in chunks:
|
|
new_pool = []
|
|
for cand in getattr(ch, "candidate_pool", []):
|
|
# WP-24c v4.5.8: Nur ID-Validierung (Ghost-ID Schutz)
|
|
t_id = cand.get('target_id') or cand.get('to') or cand.get('note_id')
|
|
if not self._is_valid_id(t_id):
|
|
continue
|
|
# WP-24c v4.5.8: Alle Kanten gehen durch - LLM-Validierung erfolgt in Phase 3
|
|
new_pool.append(cand)
|
|
ch.candidate_pool = new_pool
|
|
|
|
# chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
|
|
# v4.2.8 Fix C: Explizite Übergabe des Profil-Namens für den Chunk-Payload
|
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry, chunk_profile=profile)
|
|
|
|
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
|
|
|
|
# WP-24c v4.2.0: Kanten-Extraktion mit Note-Scope Zonen Support
|
|
# Übergabe des Original-Markdown-Texts für Note-Scope Zonen-Extraktion
|
|
markdown_body = getattr(parsed, "body", "")
|
|
raw_edges = build_edges_for_note(
|
|
note_id,
|
|
chunk_pls,
|
|
note_level_references=note_pl.get("references", []),
|
|
markdown_body=markdown_body
|
|
)
|
|
|
|
# WP-24c v4.5.8: Phase 3 - Finaler Validierungs-Gate für candidate: Kanten
|
|
# Prüfe alle Kanten mit rule_id ODER provenance beginnend mit "candidate:"
|
|
# Dies schließt alle Kandidaten ein, unabhängig von ihrer Herkunft (global_pool, explicit:callout, etc.)
|
|
|
|
# WP-24c v4.5.8: Kontext-Optimierung für Note-Scope Kanten
|
|
# Aggregiere den gesamten Note-Text für bessere Validierungs-Entscheidungen
|
|
note_text = markdown_body or " ".join([c.get("text", "") or c.get("window", "") for c in chunk_pls])
|
|
# Erstelle eine Note-Summary aus den wichtigsten Chunks (für bessere Kontext-Qualität)
|
|
note_summary = " ".join([c.get("window", "") or c.get("text", "") for c in chunk_pls[:5]]) # Top 5 Chunks
|
|
|
|
validated_edges = []
|
|
rejected_edges = []
|
|
|
|
for e in raw_edges:
|
|
rule_id = e.get("rule_id", "")
|
|
provenance = e.get("provenance", "")
|
|
|
|
# WP-24c v4.5.8: Trigger-Kriterium - rule_id ODER provenance beginnt mit "candidate:"
|
|
is_candidate = (rule_id and rule_id.startswith("candidate:")) or (provenance and provenance.startswith("candidate:"))
|
|
|
|
if is_candidate:
|
|
# Extrahiere target_id für Validierung (aus verschiedenen möglichen Feldern)
|
|
target_id = e.get("target_id") or e.get("to")
|
|
if not target_id:
|
|
# Fallback: Versuche aus Payload zu extrahieren
|
|
payload = e.get("extra", {}) if isinstance(e.get("extra"), dict) else {}
|
|
target_id = payload.get("target_id") or payload.get("to")
|
|
|
|
if not target_id:
|
|
logger.warning(f"⚠️ [PHASE 3] Keine target_id gefunden für Kante: {e}")
|
|
rejected_edges.append(e)
|
|
continue
|
|
|
|
kind = e.get("kind", "related_to")
|
|
source_id = e.get("source_id", note_id)
|
|
scope = e.get("scope", "chunk")
|
|
|
|
# WP-24c v4.5.8: Kontext-Optimierung für Note-Scope Kanten
|
|
# Für scope: note verwende Note-Summary oder gesamten Note-Text
|
|
# Für scope: chunk verwende den spezifischen Chunk-Text (falls verfügbar)
|
|
if scope == "note":
|
|
validation_text = note_summary or note_text
|
|
context_info = "Note-Scope (aggregiert)"
|
|
else:
|
|
# Für Chunk-Scope: Versuche Chunk-Text zu finden, sonst Note-Text
|
|
chunk_id = e.get("chunk_id") or source_id
|
|
chunk_text = None
|
|
for ch in chunk_pls:
|
|
if ch.get("chunk_id") == chunk_id or ch.get("id") == chunk_id:
|
|
chunk_text = ch.get("text") or ch.get("window", "")
|
|
break
|
|
validation_text = chunk_text or note_text
|
|
context_info = f"Chunk-Scope ({chunk_id})"
|
|
|
|
# Erstelle Edge-Dict für Validierung (kompatibel mit validate_edge_candidate)
|
|
edge_for_validation = {
|
|
"kind": kind,
|
|
"to": target_id, # validate_edge_candidate erwartet "to"
|
|
"target_id": target_id,
|
|
"provenance": provenance if not provenance.startswith("candidate:") else provenance.replace("candidate:", "").strip(),
|
|
"confidence": e.get("confidence", 0.9)
|
|
}
|
|
|
|
logger.info(f"🚀 [PHASE 3] Validierung: {source_id} -> {target_id} ({kind}) | Scope: {scope} | Kontext: {context_info}")
|
|
|
|
# WP-24c v4.5.8: Validiere gegen optimierten Kontext
|
|
is_valid = await validate_edge_candidate(
|
|
chunk_text=validation_text,
|
|
edge=edge_for_validation,
|
|
batch_cache=self.batch_cache,
|
|
llm_service=self.llm,
|
|
profile_name="ingest_validator"
|
|
)
|
|
|
|
if is_valid:
|
|
# WP-24c v4.5.8: Entferne candidate: Präfix (Kante wird zum Fakt)
|
|
new_rule_id = rule_id.replace("candidate:", "").strip() if rule_id else provenance.replace("candidate:", "").strip() if provenance.startswith("candidate:") else provenance
|
|
if not new_rule_id:
|
|
new_rule_id = e.get("provenance", "explicit").replace("candidate:", "").strip()
|
|
|
|
# Aktualisiere rule_id und provenance im Edge
|
|
e["rule_id"] = new_rule_id
|
|
if provenance.startswith("candidate:"):
|
|
e["provenance"] = provenance.replace("candidate:", "").strip()
|
|
|
|
validated_edges.append(e)
|
|
logger.info(f"✅ [PHASE 3] VERIFIED: {source_id} -> {target_id} ({kind}) | rule_id: {new_rule_id}")
|
|
else:
|
|
# WP-24c v4.5.8: Kante ablehnen (nicht zu validated_edges hinzufügen)
|
|
rejected_edges.append(e)
|
|
logger.info(f"🚫 [PHASE 3] REJECTED: {source_id} -> {target_id} ({kind})")
|
|
else:
|
|
# WP-24c v4.5.8: Keine candidate: Kante -> direkt übernehmen
|
|
validated_edges.append(e)
|
|
|
|
# WP-24c v4.5.8: Phase 3 abgeschlossen - rejected_edges werden NICHT weiterverarbeitet
|
|
# WP-24c v4.5.9: Persistierung von rejected_edges für Audit-Zwecke
|
|
if rejected_edges:
|
|
logger.info(f"🚫 [PHASE 3] {len(rejected_edges)} Kanten abgelehnt und werden nicht in die DB geschrieben")
|
|
self._persist_rejected_edges(note_id, rejected_edges)
|
|
|
|
# WP-24c v4.5.8: Verwende validated_edges statt raw_edges für weitere Verarbeitung
|
|
# Nur verified Kanten (ohne candidate: Präfix) werden in Phase 2 (Symmetrie) verarbeitet
|
|
explicit_edges = []
|
|
for e in validated_edges:
|
|
t_raw = e.get("target_id")
|
|
t_ctx = self.batch_cache.get(t_raw)
|
|
t_id = t_ctx.note_id if t_ctx else t_raw
|
|
|
|
if not self._is_valid_id(t_id): continue
|
|
|
|
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance="explicit")
|
|
# WP-24c v4.1.0: target_section aus dem Edge-Payload extrahieren und beibehalten
|
|
target_section = e.get("target_section")
|
|
e.update({
|
|
"kind": resolved_kind,
|
|
"relation": resolved_kind, # Konsistenz: kind und relation identisch
|
|
"target_id": t_id,
|
|
"source_id": e.get("source_id") or note_id, # Sicherstellen, dass source_id gesetzt ist
|
|
"origin_note_id": note_id,
|
|
"virtual": False
|
|
})
|
|
explicit_edges.append(e)
|
|
|
|
# Symmetrie puffern (WP-24c v4.1.0: Korrekte Symmetrie-Integrität)
|
|
inv_kind = edge_registry.get_inverse(resolved_kind)
|
|
if inv_kind and t_id != note_id:
|
|
# GOLD-STANDARD v4.1.0: Symmetrie-Integrität
|
|
v_edge = {
|
|
"note_id": t_id, # Besitzer-Wechsel: Symmetrie gehört zum Link-Ziel
|
|
"source_id": t_id, # Neue Quelle ist das Link-Ziel
|
|
"target_id": note_id, # Ziel ist die ursprüngliche Quelle
|
|
"kind": inv_kind, # Inverser Kanten-Typ
|
|
"relation": inv_kind, # Konsistenz: kind und relation identisch
|
|
"scope": "note", # Symmetrien sind immer Note-Level
|
|
"virtual": True,
|
|
"origin_note_id": note_id, # Tracking: Woher kommt die Symmetrie
|
|
}
|
|
# target_section beibehalten, falls vorhanden (für Section-Links)
|
|
if target_section:
|
|
v_edge["target_section"] = target_section
|
|
self.symmetry_buffer.append(v_edge)
|
|
|
|
# DB Upsert
|
|
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
|
|
|
|
col_n, pts_n = points_for_note(self.prefix, note_pl, None, self.dim)
|
|
upsert_batch(self.client, col_n, pts_n, wait=True)
|
|
|
|
if chunk_pls and vecs:
|
|
col_c, pts_c = points_for_chunks(self.prefix, chunk_pls, vecs)
|
|
upsert_batch(self.client, col_c, pts_c, wait=True)
|
|
|
|
if explicit_edges:
|
|
col_e, pts_e = points_for_edges(self.prefix, explicit_edges)
|
|
upsert_batch(self.client, col_e, pts_e, wait=True)
|
|
|
|
logger.info(f" ✨ Phase 1 fertig: {len(explicit_edges)} explizite Kanten für '{note_id}'.")
|
|
return {"status": "success", "note_id": note_id}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
|
|
return {**result, "status": "error", "error": str(e)}
|
|
|
|
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
|
"""Erstellt eine Note aus einem Textstream."""
|
|
target_path = os.path.join(vault_root, folder, filename)
|
|
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
|
with open(target_path, "w", encoding="utf-8") as f:
|
|
f.write(markdown_content)
|
|
await asyncio.sleep(0.1)
|
|
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) |