Refactor ingestion_db.py and ingestion_processor.py: Enhance documentation and logging clarity, integrate cloud resilience and error handling, and improve artifact purging logic. Update versioning to 3.3.6 to reflect changes in functionality, including strict phase separation and authority checks for explicit edges.
This commit is contained in:
parent
7953acf3ee
commit
57656bbaaf
|
|
@ -1,7 +1,11 @@
|
||||||
"""
|
"""
|
||||||
FILE: app/core/ingestion/ingestion_db.py
|
FILE: app/core/ingestion/ingestion_db.py
|
||||||
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
|
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
|
||||||
|
WP-14: Umstellung auf zentrale database-Infrastruktur.
|
||||||
|
WP-20/22: Integration von Cloud-Resilienz und Fehlerbehandlung.
|
||||||
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
|
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
|
||||||
|
Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import.
|
||||||
|
Integration der Authority-Prüfung für Point-IDs zur Symmetrie-Validierung.
|
||||||
VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup)
|
VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup)
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
"""
|
"""
|
||||||
|
|
@ -9,6 +13,8 @@ import logging
|
||||||
from typing import Optional, Tuple, List
|
from typing import Optional, Tuple, List
|
||||||
from qdrant_client import QdrantClient
|
from qdrant_client import QdrantClient
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
|
|
||||||
|
# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz
|
||||||
from app.core.database import collection_names
|
from app.core.database import collection_names
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -39,7 +45,8 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[
|
||||||
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
|
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
|
||||||
"""
|
"""
|
||||||
WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert.
|
WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert.
|
||||||
Verhindert das Überschreiben von manuellem Wissen durch Symmetrien.
|
Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen
|
||||||
|
durch virtuelle Symmetrie-Kanten zu verhindern.
|
||||||
"""
|
"""
|
||||||
_, _, edges_col = collection_names(prefix)
|
_, _, edges_col = collection_names(prefix)
|
||||||
try:
|
try:
|
||||||
|
|
@ -51,12 +58,11 @@ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) ->
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
|
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
|
||||||
"""Löscht Artefakte basierend auf ihrer Herkunft (Origin-Purge)."""
|
"""WP-24c: Selektives Löschen von Artefakten (Origin-Purge)."""
|
||||||
_, chunks_col, edges_col = collection_names(prefix)
|
_, chunks_col, edges_col = collection_names(prefix)
|
||||||
try:
|
try:
|
||||||
chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter))
|
client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter))
|
||||||
|
|
||||||
edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))])
|
edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))])
|
||||||
client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter))
|
client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter))
|
||||||
logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.")
|
logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.")
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,11 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
|
||||||
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
|
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
|
||||||
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
||||||
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
||||||
AUDIT v3.3.5: 2-Phasen-Strategie (Phase 2 erst nach allen Batches).
|
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
||||||
API-Fix für Dictionary-Rückgabe. Vollständiger Umfang.
|
AUDIT v3.3.6: Strikte Phasentrennung (Phase 2 global am Ende).
|
||||||
VERSION: 3.3.5 (WP-24c: Global Symmetry Commitment)
|
Fix für .trash-Folder und Pydantic 'None'-Crash.
|
||||||
|
Vollständige Wiederherstellung des Business-Loggings.
|
||||||
|
VERSION: 3.3.6 (WP-24c: Full Transparency Orchestration)
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
|
@ -51,15 +53,14 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class IngestionService:
|
class IngestionService:
|
||||||
def __init__(self, collection_prefix: str = None):
|
def __init__(self, collection_prefix: str = None):
|
||||||
"""Initialisiert den Service und bereinigt das Logging."""
|
"""Initialisiert den Service und bereinigt das technische Logging."""
|
||||||
from app.config import get_settings
|
from app.config import get_settings
|
||||||
self.settings = get_settings()
|
self.settings = get_settings()
|
||||||
|
|
||||||
# --- LOGGING CLEANUP (Business Focus) ---
|
# --- LOGGING CLEANUP (Business Focus) ---
|
||||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
# Unterdrückt HTTP-Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs
|
||||||
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
|
||||||
logging.getLogger("qdrant_client").setLevel(logging.WARNING)
|
logging.getLogger(lib).setLevel(logging.WARNING)
|
||||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
|
||||||
|
|
||||||
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
|
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
|
||||||
self.cfg = QdrantConfig.from_env()
|
self.cfg = QdrantConfig.from_env()
|
||||||
|
|
@ -70,25 +71,31 @@ class IngestionService:
|
||||||
self.embedder = EmbeddingsClient()
|
self.embedder = EmbeddingsClient()
|
||||||
self.llm = LLMService()
|
self.llm = LLMService()
|
||||||
|
|
||||||
|
# WP-25a: Dimensionen über das LLM-Profil auflösen
|
||||||
embed_cfg = self.llm.profiles.get("embedding_expert", {})
|
embed_cfg = self.llm.profiles.get("embedding_expert", {})
|
||||||
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
|
self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE
|
||||||
|
|
||||||
|
# Festlegen des Change-Detection Modus
|
||||||
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
|
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
|
||||||
|
|
||||||
# WP-15b: Kontext-Gedächtnis für ID-Auflösung
|
# WP-15b: Kontext-Gedächtnis für ID-Auflösung (Global)
|
||||||
self.batch_cache: Dict[str, NoteContext] = {}
|
self.batch_cache: Dict[str, NoteContext] = {}
|
||||||
|
|
||||||
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
|
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion am Ende des gesamten Imports)
|
||||||
self.symmetry_buffer: List[Dict[str, Any]] = []
|
self.symmetry_buffer: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Schema-Prüfung und Initialisierung
|
||||||
ensure_collections(self.client, self.prefix, self.dim)
|
ensure_collections(self.client, self.prefix, self.dim)
|
||||||
ensure_payload_indexes(self.client, self.prefix)
|
ensure_payload_indexes(self.client, self.prefix)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"DB initialization warning: {e}")
|
logger.warning(f"DB initialization warning: {e}")
|
||||||
|
|
||||||
def _is_valid_note_id(self, text: str) -> bool:
|
def _is_valid_note_id(self, text: str) -> bool:
|
||||||
"""WP-24c: Verhindert Müll-Kanten zu System-Platzhaltern."""
|
"""
|
||||||
|
WP-24c: Prüft Ziel-Strings auf fachliche Validität.
|
||||||
|
Verhindert Müll-Kanten zu reinen System-Platzhaltern.
|
||||||
|
"""
|
||||||
if not text or len(text.strip()) < 2: return False
|
if not text or len(text.strip()) < 2: return False
|
||||||
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
|
blacklisted = {"insight", "event", "source", "task", "project", "person", "concept", "related_to", "referenced_by"}
|
||||||
if text.lower().strip() in blacklisted: return False
|
if text.lower().strip() in blacklisted: return False
|
||||||
|
|
@ -98,12 +105,13 @@ class IngestionService:
|
||||||
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
|
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
WP-15b: Two-Pass Ingestion Workflow (PHASE 1).
|
WP-15b: Two-Pass Ingestion Workflow (PHASE 1).
|
||||||
Fix: Gibt Dictionary zurück, um Kompatibilität zum Importer-Script zu wahren.
|
Füllt den Cache und verarbeitet Dateien batchweise.
|
||||||
|
Gibt ein Dictionary zurück, um Kompatibilität zum Orchestrator zu wahren.
|
||||||
"""
|
"""
|
||||||
self.batch_cache.clear()
|
self.batch_cache.clear()
|
||||||
logger.info(f"--- 🔍 START BATCH (Phase 1) ---")
|
logger.info(f"--- 🔍 START BATCH PHASE 1 ({len(file_paths)} Dateien) ---")
|
||||||
|
|
||||||
# 1. Pre-Scan (Context-Cache füllen)
|
# 1. Schritt: Pre-Scan (Context-Cache befüllen)
|
||||||
for path in file_paths:
|
for path in file_paths:
|
||||||
try:
|
try:
|
||||||
ctx = pre_scan_markdown(path, registry=self.registry)
|
ctx = pre_scan_markdown(path, registry=self.registry)
|
||||||
|
|
@ -115,7 +123,7 @@ class IngestionService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
|
logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}")
|
||||||
|
|
||||||
# 2. Schritt: PROCESSING (NUR AUTHORITY)
|
# 2. Schritt: Batch-Verarbeitung (Authority Only)
|
||||||
processed_count = 0
|
processed_count = 0
|
||||||
success_count = 0
|
success_count = 0
|
||||||
for p in file_paths:
|
for p in file_paths:
|
||||||
|
|
@ -134,36 +142,43 @@ class IngestionService:
|
||||||
|
|
||||||
async def commit_vault_symmetries(self) -> Dict[str, Any]:
|
async def commit_vault_symmetries(self) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
WP-24c: Führt PHASE 2 für den gesamten Vault aus.
|
WP-24c: Führt PHASE 2 (Symmetrie-Injektion) für den gesamten Vault aus.
|
||||||
Wird nach allen run_batch Aufrufen einmalig getriggert.
|
Wird nach Abschluss aller Batches einmalig aufgerufen.
|
||||||
|
Vergleicht gepufferte Kanten gegen die Instance-of-Truth in Qdrant.
|
||||||
"""
|
"""
|
||||||
if not self.symmetry_buffer:
|
if not self.symmetry_buffer:
|
||||||
|
logger.info("⏭️ Symmetrie-Puffer ist leer. Keine Aktion erforderlich.")
|
||||||
return {"status": "skipped", "reason": "buffer_empty"}
|
return {"status": "skipped", "reason": "buffer_empty"}
|
||||||
|
|
||||||
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen die Instance-of-Truth...")
|
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen die Instance-of-Truth...")
|
||||||
final_virtuals = []
|
final_virtuals = []
|
||||||
for v_edge in self.symmetry_buffer:
|
for v_edge in self.symmetry_buffer:
|
||||||
# ID der potenziellen Symmetrie berechnen
|
# Deterministische ID der potenziellen Symmetrie berechnen
|
||||||
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note"))
|
v_id = _mk_edge_id(v_edge["kind"], v_edge["note_id"], v_edge["target_id"], v_edge.get("scope", "note"))
|
||||||
|
|
||||||
# Nur schreiben, wenn KEINE manuelle Kante in der DB existiert
|
# AUTHORITY-CHECK: Nur schreiben, wenn KEINE manuelle Kante in der DB existiert
|
||||||
if not is_explicit_edge_present(self.client, self.prefix, v_id):
|
if not is_explicit_edge_present(self.client, self.prefix, v_id):
|
||||||
final_virtuals.append(v_edge)
|
final_virtuals.append(v_edge)
|
||||||
|
# Detailliertes Logging für volle Transparenz
|
||||||
|
logger.info(f" 🔄 [SYMMETRY] Add inverse: {v_edge['note_id']} --({v_edge['kind']})--> {v_edge['target_id']}")
|
||||||
else:
|
else:
|
||||||
logger.debug(f" 🛡️ Schutz: Manuelle Kante verhindert Symmetrie {v_id}")
|
logger.debug(f" 🛡️ Schutz: Manuelle Kante belegt ID {v_id}. Symmetrie verworfen.")
|
||||||
|
|
||||||
added_count = 0
|
added_count = 0
|
||||||
if final_virtuals:
|
if final_virtuals:
|
||||||
logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten.")
|
logger.info(f"📤 Schreibe {len(final_virtuals)} validierte Symmetrie-Kanten in den Graphen.")
|
||||||
e_pts = points_for_edges(self.prefix, final_virtuals)[1]
|
e_pts = points_for_edges(self.prefix, final_virtuals)[1]
|
||||||
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
|
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
|
||||||
added_count = len(final_virtuals)
|
added_count = len(final_virtuals)
|
||||||
|
|
||||||
self.symmetry_buffer.clear() # Puffer leeren
|
self.symmetry_buffer.clear() # Puffer nach erfolgreichem Commit leeren
|
||||||
return {"status": "success", "added": added_count}
|
return {"status": "success", "added": added_count}
|
||||||
|
|
||||||
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
|
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
|
||||||
"""Transformiert Datei und befüllt den Symmetry-Buffer."""
|
"""
|
||||||
|
Transformiert eine Markdown-Datei in Phase 1 (Authority First).
|
||||||
|
Implementiert Ordner-Blacklists, Pydantic-Safety und MoE-Validierung.
|
||||||
|
"""
|
||||||
apply = kwargs.get("apply", False)
|
apply = kwargs.get("apply", False)
|
||||||
force_replace = kwargs.get("force_replace", False)
|
force_replace = kwargs.get("force_replace", False)
|
||||||
purge_before = kwargs.get("purge_before", False)
|
purge_before = kwargs.get("purge_before", False)
|
||||||
|
|
@ -171,7 +186,7 @@ class IngestionService:
|
||||||
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
|
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# --- ORDNER-FILTER (.trash) ---
|
# --- ORDNER-FILTER (Fix für .trash und .obsidian Junk) ---
|
||||||
if any(part.startswith('.') for part in file_path.split(os.sep)):
|
if any(part.startswith('.') for part in file_path.split(os.sep)):
|
||||||
return {**result, "status": "skipped", "reason": "hidden_folder"}
|
return {**result, "status": "skipped", "reason": "hidden_folder"}
|
||||||
|
|
||||||
|
|
@ -180,58 +195,80 @@ class IngestionService:
|
||||||
if any(folder in file_path for folder in ignore_folders):
|
if any(folder in file_path for folder in ignore_folders):
|
||||||
return {**result, "status": "skipped", "reason": "folder_blacklist"}
|
return {**result, "status": "skipped", "reason": "folder_blacklist"}
|
||||||
|
|
||||||
|
# Datei einlesen und validieren
|
||||||
parsed = read_markdown(file_path)
|
parsed = read_markdown(file_path)
|
||||||
if not parsed: return {**result, "error": "Empty file"}
|
if not parsed: return {**result, "error": "Empty file"}
|
||||||
fm = normalize_frontmatter(parsed.frontmatter)
|
fm = normalize_frontmatter(parsed.frontmatter)
|
||||||
|
validate_required_frontmatter(fm)
|
||||||
|
|
||||||
note_type = resolve_note_type(self.registry, fm.get("type"))
|
note_type = resolve_note_type(self.registry, fm.get("type"))
|
||||||
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
|
note_pl = make_note_payload(parsed, vault_root=vault_root, file_path=file_path, types_cfg=self.registry)
|
||||||
note_id = note_pl["note_id"]
|
note_id = note_pl.get("note_id")
|
||||||
|
|
||||||
|
# --- FIX: Guard Clause gegen 'None' IDs (Verhindert Pydantic Crash) ---
|
||||||
|
if not note_id:
|
||||||
|
logger.warning(f" ⚠️ Fehlende note_id in '{file_path}'. Datei wird ignoriert.")
|
||||||
|
return {**result, "status": "error", "error": "missing_note_id"}
|
||||||
|
|
||||||
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
|
logger.info(f"📄 Bearbeite: '{note_id}' (Typ: {note_type})")
|
||||||
|
|
||||||
# Change Detection
|
# Change Detection & Fragment-Prüfung
|
||||||
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
|
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
|
||||||
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
|
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
|
||||||
if not (force_replace or not old_payload or c_miss or e_miss):
|
if not (force_replace or not old_payload or c_miss or e_miss):
|
||||||
return {**result, "status": "unchanged", "note_id": note_id}
|
return {**result, "status": "unchanged", "note_id": note_id}
|
||||||
|
|
||||||
# Deep Processing & MoE
|
if not apply:
|
||||||
|
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
||||||
|
|
||||||
|
# Chunks erzeugen und semantisch validieren (MoE)
|
||||||
profile = note_pl.get("chunk_profile", "sliding_standard")
|
profile = note_pl.get("chunk_profile", "sliding_standard")
|
||||||
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
|
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
|
||||||
|
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
|
||||||
chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg)
|
chunks = await assemble_chunks(note_id, getattr(parsed, "body", ""), note_type, config=chunk_cfg)
|
||||||
|
|
||||||
for ch in chunks:
|
for ch in chunks:
|
||||||
new_pool = []
|
new_pool = []
|
||||||
for cand in getattr(ch, "candidate_pool", []):
|
for cand in getattr(ch, "candidate_pool", []):
|
||||||
if cand.get("provenance") == "global_pool" and chunk_cfg.get("enable_smart_edge_allocation"):
|
if cand.get("provenance") == "global_pool" and enable_smart:
|
||||||
|
# Detailliertes Business-Logging für LLM-Aktivitäten
|
||||||
|
target_label = cand.get('target_id') or cand.get('note_id') or "Unknown"
|
||||||
|
logger.info(f" ⚖️ [VALIDATING] Relation to '{target_label}' via Expert-LLM...")
|
||||||
|
|
||||||
is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm)
|
is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm)
|
||||||
t_id = cand.get('target_id') or cand.get('note_id') or "Unknown"
|
|
||||||
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
|
logger.info(f" 🧠 [SMART EDGE] {target_label} -> {'✅ OK' if is_valid else '❌ SKIP'}")
|
||||||
if is_valid: new_pool.append(cand)
|
if is_valid: new_pool.append(cand)
|
||||||
else:
|
else:
|
||||||
new_pool.append(cand)
|
new_pool.append(cand)
|
||||||
ch.candidate_pool = new_pool
|
ch.candidate_pool = new_pool
|
||||||
|
|
||||||
|
# Embeddings und Payloads
|
||||||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
|
||||||
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
|
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
|
||||||
|
|
||||||
# Kanten-Logik (Kanonisierung)
|
# Kanten-Extraktion mit ID-Kanonisierung
|
||||||
raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
|
raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
|
||||||
|
|
||||||
explicit_edges = []
|
explicit_edges = []
|
||||||
for e in raw_edges:
|
for e in raw_edges:
|
||||||
target_raw = e.get("target_id")
|
target_raw = e.get("target_id")
|
||||||
t_ctx = self.batch_cache.get(target_raw)
|
# Auflösung von Titeln/Dateinamen zu echten IDs über den globalen Cache
|
||||||
target_id = t_ctx.note_id if t_ctx else target_raw
|
target_ctx = self.batch_cache.get(target_raw)
|
||||||
|
target_id = target_ctx.note_id if target_ctx else target_raw
|
||||||
|
|
||||||
if not self._is_valid_note_id(target_id): continue
|
if not self._is_valid_note_id(target_id): continue
|
||||||
|
|
||||||
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
|
resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
|
||||||
|
|
||||||
# Echte physische Kante markieren (Phase 1)
|
# Echte physische Kante markieren (Phase 1 Autorität)
|
||||||
e.update({"kind": resolved_kind, "target_id": target_id, "origin_note_id": note_id, "virtual": False, "confidence": 1.0})
|
e.update({
|
||||||
|
"kind": resolved_kind, "target_id": target_id,
|
||||||
|
"origin_note_id": note_id, "virtual": False, "confidence": 1.0
|
||||||
|
})
|
||||||
explicit_edges.append(e)
|
explicit_edges.append(e)
|
||||||
|
|
||||||
# Symmetrie-Kandidat für Phase 2 puffern
|
# Symmetrie-Kandidat für die globale Phase 2 puffern
|
||||||
inv_kind = edge_registry.get_inverse(resolved_kind)
|
inv_kind = edge_registry.get_inverse(resolved_kind)
|
||||||
if inv_kind and target_id != note_id:
|
if inv_kind and target_id != note_id:
|
||||||
v_edge = e.copy()
|
v_edge = e.copy()
|
||||||
|
|
@ -242,27 +279,28 @@ class IngestionService:
|
||||||
})
|
})
|
||||||
self.symmetry_buffer.append(v_edge)
|
self.symmetry_buffer.append(v_edge)
|
||||||
|
|
||||||
# 4. DB Upsert (Phase 1: Authority Only)
|
# 4. DB Upsert (Phase 1: Authority Commitment)
|
||||||
if apply:
|
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
|
||||||
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
|
|
||||||
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
||||||
upsert_batch(self.client, n_name, n_pts)
|
upsert_batch(self.client, n_name, n_pts)
|
||||||
if chunk_pls and vecs:
|
if chunk_pls and vecs:
|
||||||
upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
|
upsert_batch(self.client, f"{self.prefix}_chunks", points_for_chunks(self.prefix, chunk_pls, vecs)[1])
|
||||||
if explicit_edges:
|
if explicit_edges:
|
||||||
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1])
|
upsert_batch(self.client, f"{self.prefix}_edges", points_for_edges(self.prefix, explicit_edges)[1])
|
||||||
|
|
||||||
logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.")
|
logger.info(f" ✨ Phase 1 fertig: {len(chunk_pls)} Chunks, {len(explicit_edges)} explizite Kanten.")
|
||||||
return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)}
|
return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
|
logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True)
|
||||||
return {**result, "error": str(e)}
|
return {**result, "status": "error", "error": str(e)}
|
||||||
|
|
||||||
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
||||||
"""Erstellt eine Note aus einem Textstream."""
|
"""Erstellt eine Note aus einem Textstream."""
|
||||||
target_path = os.path.join(vault_root, folder, filename)
|
target_path = os.path.join(vault_root, folder, filename)
|
||||||
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
||||||
with open(target_path, "w", encoding="utf-8") as f: f.write(markdown_content)
|
with open(target_path, "w", encoding="utf-8") as f:
|
||||||
|
f.write(markdown_content)
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
|
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
|
||||||
Loading…
Reference in New Issue
Block a user