mindnet/app/core/ingestion/ingestion_processor.py
Lars 7cb8fd6602 Enhance logging in ingestion_processor.py for improved change detection diagnostics
Add detailed debug and warning logs to the change detection process, providing insights into hash comparisons and artifact checks. This update aims to facilitate better traceability and debugging during ingestion, particularly when handling hash changes and missing hashes. The changes ensure that the ingestion workflow is more transparent and easier to troubleshoot.
2026-01-12 08:08:29 +01:00

545 lines
29 KiB
Python

"""
FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v4.2.4:
- GOLD-STANDARD v4.2.4: Hash-basierte Change-Detection (MINDNET_CHANGE_DETECTION_MODE).
- Wiederherstellung des iterativen Abgleichs basierend auf Inhalts-Hashes.
- Phase 2 verwendet exakt dieselbe ID-Generierung wie Phase 1 (inkl. target_section).
- Authority-Check in Phase 2 prüft mit konsistenter ID-Generierung.
- Eliminiert Duplikate durch inkonsistente ID-Generierung (Steinzeitaxt-Problem).
VERSION: 4.2.4 (WP-24c: Hash-Integrität)
STATUS: Active
"""
import logging
import asyncio
import os
import re
from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
from app.core.parser import (
read_markdown, pre_scan_markdown, normalize_frontmatter,
validate_required_frontmatter, NoteContext
)
from app.core.chunking import assemble_chunks
# WP-24c: Import der zentralen Identitäts-Logik
from app.core.graph.graph_utils import _mk_edge_id
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest
# Services
from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService
# Package-Interne Imports (Refactoring WP-14)
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts, is_explicit_edge_present
from .ingestion_validation import validate_edge_candidate
from .ingestion_note_payload import make_note_payload
from .ingestion_chunk_payload import make_chunk_payloads
# Fallback für Edges (Struktur-Verknüpfung)
try:
from app.core.graph.graph_derive_edges import build_edges_for_note
except ImportError:
def build_edges_for_note(*args, **kwargs): return []
logger = logging.getLogger(__name__)
class IngestionService:
def __init__(self, collection_prefix: str = None):
"""Initialisiert den Service und nutzt die neue database-Infrastruktur."""
from app.config import get_settings
self.settings = get_settings()
# --- LOGGING CLEANUP ---
# Unterdrückt Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
logging.getLogger(lib).setLevel(logging.WARNING)
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
self.registry = load_type_registry()
self.embedder = EmbeddingsClient()
self.llm = LLMService()
# WP-25a: Auflösung der Dimension über das Embedding-Profil (MoE)
embed_cfg = self.llm.profiles.get("embedding_expert", {})
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
# WP-15b: Kontext-Gedächtnis für ID-Auflösung (Globaler Cache)
self.batch_cache: Dict[str, NoteContext] = {}
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
self.symmetry_buffer: List[Dict[str, Any]] = []
try:
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB initialization warning: {e}")
def _persist_rejected_edges(self, note_id: str, rejected_edges: List[Dict[str, Any]]) -> None:
"""
WP-24c v4.5.9: Persistiert abgelehnte Kanten für Audit-Zwecke.
Schreibt rejected_edges in eine JSONL-Datei im _system Ordner oder logs/rejected_edges.log.
Dies ermöglicht die Analyse der Ablehnungsgründe und Verbesserung der Validierungs-Logik.
Args:
note_id: ID der Note, zu der die abgelehnten Kanten gehören
rejected_edges: Liste von abgelehnten Edge-Dicts
"""
if not rejected_edges:
return
import json
import os
from datetime import datetime
# WP-24c v4.5.9: Erstelle Log-Verzeichnis falls nicht vorhanden
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, "rejected_edges.log")
# WP-24c v4.5.9: Schreibe als JSONL (eine Kante pro Zeile)
try:
with open(log_file, "a", encoding="utf-8") as f:
for edge in rejected_edges:
log_entry = {
"timestamp": datetime.now().isoformat(),
"note_id": note_id,
"edge": {
"kind": edge.get("kind", "unknown"),
"source_id": edge.get("source_id", "unknown"),
"target_id": edge.get("target_id") or edge.get("to", "unknown"),
"scope": edge.get("scope", "unknown"),
"provenance": edge.get("provenance", "unknown"),
"rule_id": edge.get("rule_id", "unknown"),
"confidence": edge.get("confidence", 0.0),
"target_section": edge.get("target_section")
}
}
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
logger.debug(f"📝 [AUDIT] {len(rejected_edges)} abgelehnte Kanten für '{note_id}' in {log_file} gespeichert")
except Exception as e:
logger.error(f"❌ [AUDIT] Fehler beim Speichern der rejected_edges: {e}")
def _is_valid_id(self, text: Optional[str]) -> bool:
"""WP-24c: Prüft IDs auf fachliche Validität (Ghost-ID Schutz)."""
if not text or not isinstance(text, str) or len(text.strip()) < 2:
return False
blacklisted = {"none", "unknown", "insight", "source", "task", "project", "person", "concept"}
if text.lower().strip() in blacklisted:
return False
return True
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
"""
WP-15b: Phase 1 des Two-Pass Workflows.
Verarbeitet Batches und schreibt NUR Nutzer-Autorität (explizite Kanten).
"""
self.batch_cache.clear()
logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---")
# 1. Schritt: Pre-Scan (Context-Cache füllen)
for path in file_paths:
try:
ctx = pre_scan_markdown(path, registry=self.registry)
if ctx:
self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx
self.batch_cache[os.path.splitext(os.path.basename(path))[0]] = ctx
except Exception as e:
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
# 2. Schritt: Batch Processing (Authority Only)
processed_count = 0
success_count = 0
for p in file_paths:
processed_count += 1
res = await self.process_file(p, vault_root, apply=True, purge_before=True)
if res.get("status") == "success":
success_count += 1
logger.info(f"--- ✅ Batch Phase 1 abgeschlossen ({success_count}/{processed_count}) ---")
return {
"status": "success",
"processed": processed_count,
"success": success_count,
"buffered_symmetries": len(self.symmetry_buffer)
}
async def commit_vault_symmetries(self) -> Dict[str, Any]:
"""
WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus.
Wird am Ende des gesamten Imports aufgerufen.
"""
if not self.symmetry_buffer:
return {"status": "skipped", "reason": "buffer_empty"}
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrien gegen Live-DB...")
final_virtuals = []
for v_edge in self.symmetry_buffer:
# WP-24c v4.1.0: Korrekte Extraktion der Identitäts-Parameter
src = v_edge.get("source_id") or v_edge.get("note_id") # source_id hat Priorität
tgt = v_edge.get("target_id")
kind = v_edge.get("kind")
scope = v_edge.get("scope", "note")
target_section = v_edge.get("target_section") # WP-24c v4.1.0: target_section berücksichtigen
if not all([src, tgt, kind]):
continue
# WP-24c v4.1.0: Nutzung der zentralisierten ID-Logik aus graph_utils
# GOLD-STANDARD v4.1.0: ID-Generierung muss absolut synchron zu Phase 1 sein
# - Wenn target_section vorhanden, muss es in die ID einfließen
# - Dies stellt sicher, dass der Authority-Check korrekt funktioniert
try:
v_id = _mk_edge_id(kind, src, tgt, scope, target_section=target_section)
except ValueError:
continue
# AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante existiert
# Prüft mit exakt derselben ID, die in Phase 1 verwendet wurde (inkl. target_section)
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
section_info = f" (section: {target_section})" if target_section else ""
logger.info(f" 🔄 [SYMMETRY] Add inverse: {src} --({kind})--> {tgt}{section_info}")
else:
logger.info(f" 🛡️ [PROTECTED] Manuelle Kante gefunden. Symmetrie für {kind} unterdrückt.")
if final_virtuals:
col, pts = points_for_edges(self.prefix, final_virtuals)
upsert_batch(self.client, col, pts, wait=True)
count = len(final_virtuals)
self.symmetry_buffer.clear()
return {"status": "success", "added": count}
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
"""
Transformiert eine Markdown-Datei (Phase 1).
Schreibt Notes/Chunks/Explicit Edges sofort.
"""
apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False)
purge_before = kwargs.get("purge_before", False)
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
try:
# Ordner-Filter (.trash / .obsidian)
if ".trash" in file_path or any(part.startswith('.') for part in file_path.split(os.sep)):
return {**result, "status": "skipped", "reason": "ignored_folder"}
# WP-24c v4.5.9: Path-Normalization für konsistente Hash-Prüfung
# Normalisiere file_path zu absolutem Pfad für konsistente Verarbeitung
normalized_file_path = os.path.abspath(file_path) if not os.path.isabs(file_path) else file_path
parsed = read_markdown(normalized_file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=normalized_file_path, types_cfg=self.registry)
note_id = note_pl.get("note_id")
if not note_id:
return {**result, "status": "error", "error": "missing_id"}
logger.info(f"📄 Bearbeite: '{note_id}'")
# WP-24c v4.5.9: Strikte Change Detection (Hash-basierte Inhaltsprüfung)
# Prüft Hash VOR der Verarbeitung, um redundante Ingestion zu vermeiden
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
# WP-24c v4.5.9-DEBUG: Erweiterte Diagnose-Logs für Change-Detection
logger.debug(f"🔍 [CHANGE-DETECTION] Start für '{note_id}': force_replace={force_replace}, old_payload={old_payload is not None}")
content_changed = True
hash_match = False
if old_payload and not force_replace:
# Nutzt die über MINDNET_CHANGE_DETECTION_MODE gesteuerte Genauigkeit
# Mapping: 'full' -> 'full:parsed:canonical', 'body' -> 'body:parsed:canonical'
h_key = f"{self.active_hash_mode or 'full'}:parsed:canonical"
new_h = note_pl.get("hashes", {}).get(h_key)
old_h = old_payload.get("hashes", {}).get(h_key)
# WP-24c v4.5.9-DEBUG: Detaillierte Hash-Diagnose
logger.debug(f"🔍 [CHANGE-DETECTION] Hash-Vergleich für '{note_id}':")
logger.debug(f" -> Hash-Key: '{h_key}'")
logger.debug(f" -> Active Hash-Mode: '{self.active_hash_mode or 'full'}'")
logger.debug(f" -> New Hash vorhanden: {bool(new_h)}")
logger.debug(f" -> Old Hash vorhanden: {bool(old_h)}")
if new_h:
logger.debug(f" -> New Hash (erste 32 Zeichen): {new_h[:32]}...")
if old_h:
logger.debug(f" -> Old Hash (erste 32 Zeichen): {old_h[:32]}...")
logger.debug(f" -> Verfügbare Hash-Keys in new: {list(note_pl.get('hashes', {}).keys())}")
logger.debug(f" -> Verfügbare Hash-Keys in old: {list(old_payload.get('hashes', {}).keys())}")
if new_h and old_h:
hash_match = (new_h == old_h)
if hash_match:
content_changed = False
logger.debug(f"🔍 [CHANGE-DETECTION] ✅ Hash identisch für '{note_id}': {h_key} = {new_h[:16]}...")
else:
logger.debug(f"🔍 [CHANGE-DETECTION] ❌ Hash geändert für '{note_id}': alt={old_h[:16]}..., neu={new_h[:16]}...")
logger.debug(f" -> Hash-Unterschied: Erste unterschiedliche Position: {next((i for i, (a, b) in enumerate(zip(new_h, old_h)) if a != b), 'keine')}")
else:
# WP-24c v4.5.9: Wenn Hash fehlt, als geändert behandeln (Sicherheit)
logger.warning(f"⚠️ [CHANGE-DETECTION] Hash fehlt für '{note_id}': new_h={bool(new_h)}, old_h={bool(old_h)}")
logger.debug(f" -> Grund: Hash wird als 'geändert' behandelt, da Hash-Werte fehlen")
else:
if force_replace:
logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': force_replace=True -> überspringe Hash-Check")
elif not old_payload:
logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': Keine alte Payload gefunden -> erste Verarbeitung oder gelöscht")
# WP-24c v4.5.9: Strikte Logik - überspringe komplett wenn Hash identisch
# WICHTIG: Artifact-Check NACH Hash-Check, da purge_before die Artefakte löschen kann
# Wenn Hash identisch ist, sind die Artefakte entweder vorhanden oder werden gerade neu geschrieben
if not force_replace and hash_match and old_payload:
# WP-24c v4.5.9: Hash identisch -> überspringe komplett (auch wenn Artefakte nach PURGE fehlen)
# Der Hash ist die autoritative Quelle für "Inhalt unverändert"
# Artefakte werden beim nächsten normalen Import wieder erstellt, wenn nötig
logger.info(f"⏭️ [SKIP] '{note_id}' unverändert (Hash identisch - überspringe komplett, auch wenn Artefakte fehlen)")
return {**result, "status": "unchanged", "note_id": note_id, "reason": "hash_identical"}
elif not force_replace and old_payload and not hash_match:
# WP-24c v4.5.9-DEBUG: Hash geändert - erlaube Verarbeitung
logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': Hash geändert -> erlaube Verarbeitung")
# WP-24c v4.5.9: Hash geändert oder keine alte Payload - prüfe Artefakte für normale Verarbeitung
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': Artifact-Check: c_miss={c_miss}, e_miss={e_miss}")
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# Chunks & MoE
profile = note_pl.get("chunk_profile", "sliding_standard")
note_type = resolve_note_type(self.registry, fm.get("type"))
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg)
# WP-24c v4.5.8: Validierung in Chunk-Schleife entfernt
# Alle candidate: Kanten werden jetzt in Phase 3 (nach build_edges_for_note) validiert
# Dies stellt sicher, dass auch Note-Scope Kanten aus LLM-Validierungs-Zonen geprüft werden
# Der candidate_pool wird unverändert weitergegeben, damit build_edges_for_note alle Kanten erkennt
# WP-24c v4.5.8: Nur ID-Validierung bleibt (Ghost-ID Schutz), keine LLM-Validierung mehr hier
for ch in chunks:
new_pool = []
for cand in getattr(ch, "candidate_pool", []):
# WP-24c v4.5.8: Nur ID-Validierung (Ghost-ID Schutz)
t_id = cand.get('target_id') or cand.get('to') or cand.get('note_id')
if not self._is_valid_id(t_id):
continue
# WP-24c v4.5.8: Alle Kanten gehen durch - LLM-Validierung erfolgt in Phase 3
new_pool.append(cand)
ch.candidate_pool = new_pool
# chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
# v4.2.8 Fix C: Explizite Übergabe des Profil-Namens für den Chunk-Payload
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry, chunk_profile=profile)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# WP-24c v4.2.0: Kanten-Extraktion mit Note-Scope Zonen Support
# Übergabe des Original-Markdown-Texts für Note-Scope Zonen-Extraktion
markdown_body = getattr(parsed, "body", "")
raw_edges = build_edges_for_note(
note_id,
chunk_pls,
note_level_references=note_pl.get("references", []),
markdown_body=markdown_body
)
# WP-24c v4.5.8: Phase 3 - Finaler Validierungs-Gate für candidate: Kanten
# Prüfe alle Kanten mit rule_id ODER provenance beginnend mit "candidate:"
# Dies schließt alle Kandidaten ein, unabhängig von ihrer Herkunft (global_pool, explicit:callout, etc.)
# WP-24c v4.5.8: Kontext-Optimierung für Note-Scope Kanten
# Aggregiere den gesamten Note-Text für bessere Validierungs-Entscheidungen
note_text = markdown_body or " ".join([c.get("text", "") or c.get("window", "") for c in chunk_pls])
# Erstelle eine Note-Summary aus den wichtigsten Chunks (für bessere Kontext-Qualität)
note_summary = " ".join([c.get("window", "") or c.get("text", "") for c in chunk_pls[:5]]) # Top 5 Chunks
validated_edges = []
rejected_edges = []
for e in raw_edges:
rule_id = e.get("rule_id", "")
provenance = e.get("provenance", "")
# WP-24c v4.5.8: Trigger-Kriterium - rule_id ODER provenance beginnt mit "candidate:"
is_candidate = (rule_id and rule_id.startswith("candidate:")) or (provenance and provenance.startswith("candidate:"))
if is_candidate:
# Extrahiere target_id für Validierung (aus verschiedenen möglichen Feldern)
target_id = e.get("target_id") or e.get("to")
if not target_id:
# Fallback: Versuche aus Payload zu extrahieren
payload = e.get("extra", {}) if isinstance(e.get("extra"), dict) else {}
target_id = payload.get("target_id") or payload.get("to")
if not target_id:
logger.warning(f"⚠️ [PHASE 3] Keine target_id gefunden für Kante: {e}")
rejected_edges.append(e)
continue
kind = e.get("kind", "related_to")
source_id = e.get("source_id", note_id)
scope = e.get("scope", "chunk")
# WP-24c v4.5.8: Kontext-Optimierung für Note-Scope Kanten
# Für scope: note verwende Note-Summary oder gesamten Note-Text
# Für scope: chunk verwende den spezifischen Chunk-Text (falls verfügbar)
if scope == "note":
validation_text = note_summary or note_text
context_info = "Note-Scope (aggregiert)"
else:
# Für Chunk-Scope: Versuche Chunk-Text zu finden, sonst Note-Text
chunk_id = e.get("chunk_id") or source_id
chunk_text = None
for ch in chunk_pls:
if ch.get("chunk_id") == chunk_id or ch.get("id") == chunk_id:
chunk_text = ch.get("text") or ch.get("window", "")
break
validation_text = chunk_text or note_text
context_info = f"Chunk-Scope ({chunk_id})"
# Erstelle Edge-Dict für Validierung (kompatibel mit validate_edge_candidate)
edge_for_validation = {
"kind": kind,
"to": target_id, # validate_edge_candidate erwartet "to"
"target_id": target_id,
"provenance": provenance if not provenance.startswith("candidate:") else provenance.replace("candidate:", "").strip(),
"confidence": e.get("confidence", 0.9)
}
logger.info(f"🚀 [PHASE 3] Validierung: {source_id} -> {target_id} ({kind}) | Scope: {scope} | Kontext: {context_info}")
# WP-24c v4.5.8: Validiere gegen optimierten Kontext
is_valid = await validate_edge_candidate(
chunk_text=validation_text,
edge=edge_for_validation,
batch_cache=self.batch_cache,
llm_service=self.llm,
profile_name="ingest_validator"
)
if is_valid:
# WP-24c v4.5.8: Entferne candidate: Präfix (Kante wird zum Fakt)
new_rule_id = rule_id.replace("candidate:", "").strip() if rule_id else provenance.replace("candidate:", "").strip() if provenance.startswith("candidate:") else provenance
if not new_rule_id:
new_rule_id = e.get("provenance", "explicit").replace("candidate:", "").strip()
# Aktualisiere rule_id und provenance im Edge
e["rule_id"] = new_rule_id
if provenance.startswith("candidate:"):
e["provenance"] = provenance.replace("candidate:", "").strip()
validated_edges.append(e)
logger.info(f"✅ [PHASE 3] VERIFIED: {source_id} -> {target_id} ({kind}) | rule_id: {new_rule_id}")
else:
# WP-24c v4.5.8: Kante ablehnen (nicht zu validated_edges hinzufügen)
rejected_edges.append(e)
logger.info(f"🚫 [PHASE 3] REJECTED: {source_id} -> {target_id} ({kind})")
else:
# WP-24c v4.5.8: Keine candidate: Kante -> direkt übernehmen
validated_edges.append(e)
# WP-24c v4.5.8: Phase 3 abgeschlossen - rejected_edges werden NICHT weiterverarbeitet
# WP-24c v4.5.9: Persistierung von rejected_edges für Audit-Zwecke
if rejected_edges:
logger.info(f"🚫 [PHASE 3] {len(rejected_edges)} Kanten abgelehnt und werden nicht in die DB geschrieben")
self._persist_rejected_edges(note_id, rejected_edges)
# WP-24c v4.5.8: Verwende validated_edges statt raw_edges für weitere Verarbeitung
# Nur verified Kanten (ohne candidate: Präfix) werden in Phase 2 (Symmetrie) verarbeitet
explicit_edges = []
for e in validated_edges:
t_raw = e.get("target_id")
t_ctx = self.batch_cache.get(t_raw)
t_id = t_ctx.note_id if t_ctx else t_raw
if not self._is_valid_id(t_id): continue
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance="explicit")
# WP-24c v4.1.0: target_section aus dem Edge-Payload extrahieren und beibehalten
target_section = e.get("target_section")
e.update({
"kind": resolved_kind,
"relation": resolved_kind, # Konsistenz: kind und relation identisch
"target_id": t_id,
"source_id": e.get("source_id") or note_id, # Sicherstellen, dass source_id gesetzt ist
"origin_note_id": note_id,
"virtual": False
})
explicit_edges.append(e)
# Symmetrie puffern (WP-24c v4.1.0: Korrekte Symmetrie-Integrität)
inv_kind = edge_registry.get_inverse(resolved_kind)
if inv_kind and t_id != note_id:
# GOLD-STANDARD v4.1.0: Symmetrie-Integrität
v_edge = {
"note_id": t_id, # Besitzer-Wechsel: Symmetrie gehört zum Link-Ziel
"source_id": t_id, # Neue Quelle ist das Link-Ziel
"target_id": note_id, # Ziel ist die ursprüngliche Quelle
"kind": inv_kind, # Inverser Kanten-Typ
"relation": inv_kind, # Konsistenz: kind und relation identisch
"scope": "note", # Symmetrien sind immer Note-Level
"virtual": True,
"origin_note_id": note_id, # Tracking: Woher kommt die Symmetrie
}
# target_section beibehalten, falls vorhanden (für Section-Links)
if target_section:
v_edge["target_section"] = target_section
self.symmetry_buffer.append(v_edge)
# DB Upsert
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
col_n, pts_n = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, col_n, pts_n, wait=True)
if chunk_pls and vecs:
col_c, pts_c = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, col_c, pts_c, wait=True)
if explicit_edges:
col_e, pts_e = points_for_edges(self.prefix, explicit_edges)
upsert_batch(self.client, col_e, pts_e, wait=True)
logger.info(f" ✨ Phase 1 fertig: {len(explicit_edges)} explizite Kanten für '{note_id}'.")
return {"status": "success", "note_id": note_id}
except Exception as e:
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
return {**result, "status": "error", "error": str(e)}
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
await asyncio.sleep(0.1)
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)