Update ingestion_processor.py to version 3.4.3: Remove incompatible edge_registry initialization, maintain strict two-phase strategy, and fix ID generation issues. Enhance logging and comments for clarity, ensuring compatibility and improved functionality in the ingestion workflow.

This commit is contained in:
Lars 2026-01-10 14:02:10 +01:00
parent b0f4309a29
commit 8fd7ef804d

View File

@ -4,10 +4,11 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.4.2: Strikte 2-Phasen-Strategie (Authority-First).
Lösung des Ghost-ID Problems & Pydantic-Crash Fix.
Zentralisierte ID-Generierung zur Vermeidung von Duplikaten.
VERSION: 3.4.2 (WP-24c: Unified ID Orchestration)
AUDIT v3.4.3:
- Entfernung des inkompatiblen edge_registry.initialize Aufrufs.
- Beibehaltung der strikten 2-Phasen-Strategie (Authority-First).
- Fix für das Steinzeitaxt-Problem via zentralisierter ID-Logik.
VERSION: 3.4.3 (WP-24c: Compatibility Fix)
STATUS: Active
"""
import logging
@ -22,8 +23,8 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext
)
from app.core.chunking import assemble_chunks
# WP-24c: Import der zentralen Identitäts-Logik und Pfad-Getter
from app.core.graph.graph_utils import _mk_edge_id, get_vocab_path, get_schema_path
# WP-24c: Import der zentralen Identitäts-Logik
from app.core.graph.graph_utils import _mk_edge_id
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
@ -57,15 +58,10 @@ class IngestionService:
self.settings = get_settings()
# --- LOGGING CLEANUP ---
# Unterdrückt Bibliotheks-Lärm, erhält aber inhaltliche Service-Logs
for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]:
logging.getLogger(lib).setLevel(logging.WARNING)
# WP-24c: Explizite Initialisierung der Registry mit .env Pfaden
edge_registry.initialize(
vocab_path=get_vocab_path(),
schema_path=get_schema_path()
)
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = self.prefix
@ -104,7 +100,7 @@ class IngestionService:
async def run_batch(self, file_paths: List[str], vault_root: str) -> Dict[str, Any]:
"""
WP-15b: Phase 1 des Two-Pass Ingestion Workflows.
WP-15b: Phase 1 des Two-Pass Workflows.
Verarbeitet Batches und schreibt NUR Nutzer-Autorität (explizite Kanten).
"""
self.batch_cache.clear()
@ -142,7 +138,6 @@ class IngestionService:
"""
WP-24c: Führt PHASE 2 (Globale Symmetrie-Injektion) aus.
Wird am Ende des gesamten Imports aufgerufen.
Sorgt dafür, dass virtuelle Kanten niemals Nutzer-Autorität überschreiben.
"""
if not self.symmetry_buffer:
return {"status": "skipped", "reason": "buffer_empty"}
@ -153,13 +148,13 @@ class IngestionService:
src, tgt, kind = v_edge.get("note_id"), v_edge.get("target_id"), v_edge.get("kind")
if not src or not tgt: continue
# WP-Fix v3.4.2: NUTZUNG DER ZENTRALEN FUNKTION STATT MANUELLEM STRING
# WP-Fix: Nutzung der zentralisierten ID-Logik aus graph_utils
try:
v_id = _mk_edge_id(kind, src, tgt, "note")
except ValueError:
continue
# AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante in der DB existiert
# AUTHORITY-CHECK: Nur schreiben, wenn keine manuelle Kante existiert
if not is_explicit_edge_present(self.client, self.prefix, v_id):
final_virtuals.append(v_edge)
logger.info(f" 🔄 [SYMMETRY] Add inverse: {src} --({kind})--> {tgt}")
@ -178,7 +173,6 @@ class IngestionService:
"""
Transformiert eine Markdown-Datei (Phase 1).
Schreibt Notes/Chunks/Explicit Edges sofort.
Befüllt den Symmetrie-Puffer für Phase 2.
"""
apply = kwargs.get("apply", False)
force_replace = kwargs.get("force_replace", False)
@ -191,7 +185,6 @@ class IngestionService:
if ".trash" in file_path or any(part.startswith('.') for part in file_path.split(os.sep)):
return {**result, "status": "skipped", "reason": "ignored_folder"}
# Datei einlesen und validieren
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty file"}
fm = normalize_frontmatter(parsed.frontmatter)
@ -205,7 +198,7 @@ class IngestionService:
logger.info(f"📄 Bearbeite: '{note_id}'")
# Change Detection & Fragment-Prüfung
# Change Detection
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
if not (force_replace or not old_payload or c_miss or e_miss):
@ -214,7 +207,7 @@ class IngestionService:
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# Deep Processing & MoE (LLM Validierung)
# Chunks & MoE
profile = note_pl.get("chunk_profile", "sliding_standard")
note_type = resolve_note_type(self.registry, fm.get("type"))
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
@ -234,17 +227,15 @@ class IngestionService:
new_pool.append(cand)
ch.candidate_pool = new_pool
# Embeddings erzeugen
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Kanten-Extraktion mit strikter Cache-Resolution (Fix für Ghost-IDs)
# Kanten-Extraktion
raw_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []))
explicit_edges = []
for e in raw_edges:
t_raw = e.get("target_id")
# Kanonisierung: Link-Auflösung über den globalen Cache
t_ctx = self.batch_cache.get(t_raw)
t_id = t_ctx.note_id if t_ctx else t_raw
@ -254,20 +245,14 @@ class IngestionService:
e.update({"kind": resolved_kind, "target_id": t_id, "origin_note_id": note_id, "virtual": False})
explicit_edges.append(e)
# Symmetrie-Gegenkante für Phase 2 puffern
# Symmetrie puffern
inv_kind = edge_registry.get_inverse(resolved_kind)
if inv_kind and t_id != note_id:
v_edge = e.copy()
v_edge.update({
"note_id": t_id,
"target_id": note_id,
"kind": inv_kind,
"virtual": True,
"origin_note_id": note_id
})
v_edge.update({"note_id": t_id, "target_id": note_id, "kind": inv_kind, "virtual": True, "origin_note_id": note_id})
self.symmetry_buffer.append(v_edge)
# DB Upsert (Phase 1: Authority Commitment)
# DB Upsert
if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id)
col_n, pts_n = points_for_note(self.prefix, note_pl, None, self.dim)
@ -289,7 +274,7 @@ class IngestionService:
return {**result, "status": "error", "error": str(e)}
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Erstellt eine Note aus einem Textstream und triggert die Ingestion."""
"""Erstellt eine Note aus einem Textstream."""
target_path = os.path.join(vault_root, folder, filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f: