Update ingestion_processor.py to version 3.1.8: Enhance ID validation to prevent HTTP 400 errors and improve edge generation robustness by excluding known system types. Refactor edge processing logic to ensure valid note IDs and streamline database interactions. Adjust versioning and documentation accordingly.
This commit is contained in:
parent
9cb08777fa
commit
72cf71fa87
|
|
@ -5,14 +5,15 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
|
|||
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
||||
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
||||
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
||||
AUDIT v3.1.7: Explicit Authority Enforcement. Verhindert durch interne
|
||||
ID-Registry und DB-Abgleich das Überschreiben manueller Kanten.
|
||||
VERSION: 3.1.7 (WP-24c: Strict Authority Protection)
|
||||
AUDIT v3.1.8: Fix für HTTP 400 (Bad Request) durch ID-Validierung
|
||||
und Schutz vor System-Typ Kollisionen.
|
||||
VERSION: 3.1.8 (WP-24c: Robust Symmetry & ID Validation)
|
||||
STATUS: Active
|
||||
"""
|
||||
import logging
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
|
||||
# Core Module Imports
|
||||
|
|
@ -27,7 +28,7 @@ from app.core.graph.graph_utils import _mk_edge_id
|
|||
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
|
||||
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
||||
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
|
||||
from qdrant_client.http import models as rest # Für Real-Time DB-Checks
|
||||
from qdrant_client.http import models as rest
|
||||
|
||||
# Services
|
||||
from app.services.embeddings_client import EmbeddingsClient
|
||||
|
|
@ -36,7 +37,7 @@ from app.services.llm_service import LLMService
|
|||
|
||||
# Package-Interne Imports
|
||||
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
|
||||
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts
|
||||
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts, is_explicit_edge_present
|
||||
from .ingestion_validation import validate_edge_candidate
|
||||
from .ingestion_note_payload import make_note_payload
|
||||
from .ingestion_chunk_payload import make_chunk_payloads
|
||||
|
|
@ -83,10 +84,35 @@ class IngestionService:
|
|||
except Exception as e:
|
||||
logger.warning(f"DB initialization warning: {e}")
|
||||
|
||||
def _is_valid_note_id(self, text: str) -> bool:
|
||||
"""
|
||||
Prüft, ob ein String eine plausible Note-ID oder ein gültiger Titel ist.
|
||||
Verhindert Symmetrie-Kanten zu Typ-Strings wie 'insight', 'event' oder 'source'.
|
||||
"""
|
||||
if not text or len(text.strip()) < 3:
|
||||
return False
|
||||
|
||||
# 1. Bekannte System-Typen oder Meta-Daten Begriffe ausschließen
|
||||
# Diese landen oft durch fehlerhafte Frontmatter-Einträge in der Referenz-Liste
|
||||
blacklisted = {
|
||||
"insight", "event", "source", "task", "project",
|
||||
"person", "concept", "value", "principle", "trip",
|
||||
"lesson", "decision", "requirement", "related_to"
|
||||
}
|
||||
clean_text = text.lower().strip()
|
||||
if clean_text in blacklisted:
|
||||
return False
|
||||
|
||||
# 2. Ausschluss von zu langen Textfragmenten (wahrscheinlich kein Titel/ID)
|
||||
if len(text) > 120:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
WP-15b: Implementiert den Two-Pass Ingestion Workflow.
|
||||
Pass 1: Pre-Scan füllt den Context-Cache (3-Wege-Indexierung).
|
||||
Pass 1: Pre-Scan füllt den Context-Cache.
|
||||
Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung.
|
||||
"""
|
||||
# Reset der Authority-Registry für den neuen Batch
|
||||
|
|
@ -109,26 +135,6 @@ class IngestionService:
|
|||
logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...")
|
||||
return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths]
|
||||
|
||||
async def _is_explicit_edge_in_db(self, edge_id: str) -> bool:
|
||||
"""
|
||||
WP-24c: Prüft via Point-ID, ob bereits eine explizite (manuelle) Kante in Qdrant liegt.
|
||||
Verhindert, dass virtuelle Symmetrien bestehendes Wissen überschreiben.
|
||||
"""
|
||||
edges_col = f"{self.prefix}_edges"
|
||||
try:
|
||||
# Direkte Punkt-Abfrage ist schneller als Scroll/Filter
|
||||
res = self.client.retrieve(
|
||||
collection_name=edges_col,
|
||||
ids=[edge_id],
|
||||
with_payload=True,
|
||||
with_vectors=False
|
||||
)
|
||||
if res and not res[0].payload.get("virtual", False):
|
||||
return True # Punkt existiert und ist NICHT virtuell
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
|
||||
"""Transformiert eine Markdown-Datei in den Graphen."""
|
||||
apply = kwargs.get("apply", False)
|
||||
|
|
@ -149,7 +155,7 @@ class IngestionService:
|
|||
except Exception as e:
|
||||
return {**result, "error": f"Validation failed: {str(e)}"}
|
||||
|
||||
# Dynamischer Lifecycle-Filter aus der Registry (WP-14)
|
||||
# Dynamischer Lifecycle-Filter
|
||||
ingest_cfg = self.registry.get("ingestion_settings", {})
|
||||
ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"])
|
||||
|
||||
|
|
@ -157,7 +163,7 @@ class IngestionService:
|
|||
if current_status in ignore_list:
|
||||
return {**result, "status": "skipped", "reason": "lifecycle_filter"}
|
||||
|
||||
# 2. Payload & Change Detection (Multi-Hash)
|
||||
# 2. Payload & Change Detection
|
||||
note_type = resolve_note_type(self.registry, fm.get("type"))
|
||||
note_pl = make_note_payload(
|
||||
parsed, vault_root=vault_root, file_path=file_path,
|
||||
|
|
@ -166,43 +172,37 @@ class IngestionService:
|
|||
)
|
||||
note_id = note_pl["note_id"]
|
||||
|
||||
# Abgleich mit der Datenbank (Qdrant)
|
||||
# Abgleich mit der Datenbank
|
||||
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
|
||||
|
||||
# Prüfung gegen den konfigurierten Hash-Modus (body vs. full)
|
||||
# Prüfung gegen den konfigurierten Hash-Modus
|
||||
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
|
||||
old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
|
||||
new_hash = note_pl.get("hashes", {}).get(check_key)
|
||||
|
||||
# Check ob Chunks oder Kanten in der DB fehlen (Reparatur-Modus)
|
||||
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
|
||||
|
||||
# Wenn Hash identisch und Artefakte vorhanden -> Skip
|
||||
if not (force_replace or not old_payload or old_hash != new_hash or c_miss or e_miss):
|
||||
return {**result, "status": "unchanged", "note_id": note_id}
|
||||
|
||||
if not apply:
|
||||
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
||||
|
||||
# 3. Deep Processing (Chunking, Validation, Embedding)
|
||||
# 3. Deep Processing
|
||||
try:
|
||||
body_text = getattr(parsed, "body", "") or ""
|
||||
edge_registry.ensure_latest()
|
||||
|
||||
# Profil-Auflösung via Registry
|
||||
profile = note_pl.get("chunk_profile", "sliding_standard")
|
||||
|
||||
chunk_cfg = get_chunk_config_by_profile(self.registry, profile, note_type)
|
||||
enable_smart = chunk_cfg.get("enable_smart_edge_allocation", False)
|
||||
|
||||
# WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor.
|
||||
chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg)
|
||||
|
||||
# Semantische Kanten-Validierung (Primärprüfung)
|
||||
# Semantische Kanten-Validierung
|
||||
for ch in chunks:
|
||||
new_pool = []
|
||||
for cand in getattr(ch, "candidate_pool", []):
|
||||
# WP-25a: Profilgesteuerte binäre Validierung
|
||||
if cand.get("provenance") == "global_pool" and enable_smart:
|
||||
is_valid = await validate_edge_candidate(
|
||||
ch.text,
|
||||
|
|
@ -214,20 +214,17 @@ class IngestionService:
|
|||
if is_valid:
|
||||
new_pool.append(cand)
|
||||
else:
|
||||
# Explizite Kanten (Wikilinks/Callouts) werden übernommen
|
||||
new_pool.append(cand)
|
||||
ch.candidate_pool = new_pool
|
||||
|
||||
# Payload-Erstellung für die Chunks
|
||||
chunk_pls = make_chunk_payloads(
|
||||
fm, note_pl["path"], chunks, file_path=file_path,
|
||||
types_cfg=self.registry
|
||||
)
|
||||
|
||||
# Vektorisierung der Fenster-Texte
|
||||
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
|
||||
|
||||
# Aggregation aller finalen Kanten (Edges)
|
||||
# Aggregation aller Kanten
|
||||
raw_edges = build_edges_for_note(
|
||||
note_id, chunk_pls,
|
||||
note_level_references=note_pl.get("references", []),
|
||||
|
|
@ -239,25 +236,29 @@ class IngestionService:
|
|||
|
||||
# PHASE 1: Alle expliziten Kanten vorverarbeiten und registrieren
|
||||
for e in raw_edges:
|
||||
target_raw = e.get("target_id")
|
||||
|
||||
# Robustheits-Check: Ist das Ziel eine valide Note-ID?
|
||||
if not self._is_valid_note_id(target_raw):
|
||||
continue
|
||||
|
||||
resolved_kind = edge_registry.resolve(
|
||||
e.get("kind", "related_to"),
|
||||
provenance=e.get("provenance", "explicit"),
|
||||
context={"file": file_path, "note_id": note_id}
|
||||
)
|
||||
e["kind"] = resolved_kind
|
||||
# Markierung der Herkunft für selektiven Purge
|
||||
e["origin_note_id"] = note_id
|
||||
e["virtual"] = False # Authority-Markierung für explizite Kanten
|
||||
e["virtual"] = False
|
||||
e["confidence"] = e.get("confidence", 1.0) # Volle Gewichtung
|
||||
|
||||
# Registrierung der ID im Laufzeit-Schutz (Authority)
|
||||
edge_id = _mk_edge_id(resolved_kind, note_id, e.get("target_id"), e.get("scope", "note"))
|
||||
edge_id = _mk_edge_id(resolved_kind, note_id, target_raw, e.get("scope", "note"))
|
||||
self.processed_explicit_ids.add(edge_id)
|
||||
|
||||
final_edges.append(e)
|
||||
|
||||
# PHASE 2: Symmetrische Kanten (Invers) mit Authority-Schutz erzeugen
|
||||
# Wir nutzen hierfür nur die expliziten Kanten aus Phase 1 als Basis
|
||||
explicit_only = [x for x in final_edges if not x.get("virtual")]
|
||||
|
||||
for e in explicit_only:
|
||||
|
|
@ -265,66 +266,56 @@ class IngestionService:
|
|||
inverse_kind = edge_registry.get_inverse(kind)
|
||||
target_raw = e.get("target_id")
|
||||
|
||||
# ID-Resolution: Finden der echten Note_ID im Cache
|
||||
# ID-Resolution
|
||||
target_ctx = self.batch_cache.get(target_raw)
|
||||
target_canonical_id = target_ctx.note_id if target_ctx else target_raw
|
||||
|
||||
# Validierung für Symmetrie-Erzeugung (Kein Self-Loop, Existenz der Inversen)
|
||||
if (inverse_kind and target_canonical_id and target_canonical_id != note_id):
|
||||
# Validierung für Symmetrie-Erzeugung (Kein Self-Loop, valide ID)
|
||||
if (inverse_kind and target_canonical_id and target_canonical_id != note_id and self._is_valid_note_id(target_canonical_id)):
|
||||
|
||||
# 1. ID der potenziellen virtuellen Kante berechnen
|
||||
# Wir nutzen exakt die Parameter, die auch points_for_edges nutzt
|
||||
potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note"))
|
||||
|
||||
# 2. AUTHORITY-CHECK A: Wurde diese Kante bereits explizit im aktuellen Batch registriert?
|
||||
# AUTHORITY-CHECK: Batch-Gedächtnis oder Datenbank
|
||||
is_in_batch = potential_id in self.processed_explicit_ids
|
||||
|
||||
# 3. AUTHORITY-CHECK B: Existiert sie bereits als explizit in der Datenbank?
|
||||
is_in_db = False
|
||||
if not is_in_batch:
|
||||
is_in_db = await self._is_explicit_edge_in_db(potential_id)
|
||||
# Real-Time DB Check verhindert 400 Bad Request durch vorherige ID-Validierung
|
||||
is_in_db = await is_explicit_edge_present(self.client, self.prefix, potential_id)
|
||||
|
||||
# 4. Filter: Nur anlegen, wenn KEINE explizite Autorität vorliegt
|
||||
# Keine Abwertung der Confidence auf Wunsch des Nutzers
|
||||
if not is_in_batch and not is_in_db:
|
||||
if (inverse_kind != kind or kind not in ["related_to", "references"]):
|
||||
inv_edge = e.copy()
|
||||
|
||||
# Richtungs-Umkehr
|
||||
inv_edge["note_id"] = target_canonical_id
|
||||
inv_edge["target_id"] = note_id
|
||||
inv_edge["kind"] = inverse_kind
|
||||
|
||||
# Metadaten für Struktur-Kante
|
||||
inv_edge["virtual"] = True
|
||||
inv_edge["provenance"] = "structure"
|
||||
inv_edge["confidence"] = e.get("confidence", 1.0) # Gewichtung bleibt gleich
|
||||
|
||||
# Lifecycle-Verankerung: Diese Kante gehört logisch zum Verursacher (Note A)
|
||||
inv_edge["origin_note_id"] = note_id
|
||||
|
||||
inv_edge.update({
|
||||
"note_id": target_canonical_id,
|
||||
"target_id": note_id,
|
||||
"kind": inverse_kind,
|
||||
"virtual": True,
|
||||
"provenance": "structure",
|
||||
"confidence": 1.0, # Gewichtung bleibt gleich laut Nutzerwunsch
|
||||
"origin_note_id": note_id
|
||||
})
|
||||
final_edges.append(inv_edge)
|
||||
logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}")
|
||||
|
||||
edges = final_edges
|
||||
|
||||
# 4. DB Upsert via modularisierter Points-Logik
|
||||
if purge_before and old_payload:
|
||||
purge_artifacts(self.client, self.prefix, note_id)
|
||||
|
||||
# Speichern der Haupt-Note
|
||||
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
||||
upsert_batch(self.client, n_name, n_pts)
|
||||
|
||||
# Speichern der Chunks
|
||||
if chunk_pls and vecs:
|
||||
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
|
||||
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
|
||||
|
||||
# Speichern der Kanten (inklusive der virtuellen Inversen)
|
||||
if edges:
|
||||
e_pts = points_for_edges(self.prefix, edges)[1]
|
||||
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
|
||||
# 4. DB Upsert
|
||||
if apply:
|
||||
if purge_before and old_payload:
|
||||
purge_artifacts(self.client, self.prefix, note_id)
|
||||
|
||||
# Speichern
|
||||
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
||||
upsert_batch(self.client, n_name, n_pts)
|
||||
|
||||
if chunk_pls and vecs:
|
||||
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
|
||||
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
|
||||
|
||||
if edges:
|
||||
e_pts = points_for_edges(self.prefix, edges)[1]
|
||||
upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
|
||||
|
||||
return {
|
||||
"path": file_path,
|
||||
|
|
@ -339,11 +330,10 @@ class IngestionService:
|
|||
return {**result, "error": str(e)}
|
||||
|
||||
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
||||
"""Erstellt eine Note aus einem Textstream und triggert die Ingestion."""
|
||||
"""Erstellt eine Note aus einem Textstream."""
|
||||
target_path = os.path.join(vault_root, folder, filename)
|
||||
os.makedirs(os.path.dirname(target_path), exist_ok=True)
|
||||
with open(target_path, "w", encoding="utf-8") as f:
|
||||
f.write(markdown_content)
|
||||
await asyncio.sleep(0.1)
|
||||
# Triggert sofortigen Import mit force_replace/purge_before
|
||||
return await self.process_file(file_path=target_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
|
||||
Loading…
Reference in New Issue
Block a user