Refactor ingestion_db.py and ingestion_processor.py: Simplify comments and documentation for clarity, enhance artifact purging logic to protect against accidental deletions, and improve symmetry injection process descriptions. Update versioning to reflect changes in functionality and maintainability.
This commit is contained in:
parent
114cea80de
commit
29e334625e
|
|
@ -1,10 +1,7 @@
|
|||
"""
|
||||
FILE: app/core/ingestion/ingestion_db.py
|
||||
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
|
||||
WP-14: Umstellung auf zentrale database-Infrastruktur.
|
||||
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
|
||||
Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import.
|
||||
VERSION v2.2.0: Integration der Authority-Prüfung für Point-IDs.
|
||||
VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup)
|
||||
STATUS: Active
|
||||
"""
|
||||
|
|
@ -12,14 +9,12 @@ import logging
|
|||
from typing import Optional, Tuple, List
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models as rest
|
||||
|
||||
# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz
|
||||
from app.core.database import collection_names
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]:
|
||||
"""Holt die Metadaten einer Note aus Qdrant via Scroll."""
|
||||
"""Holt die Metadaten einer Note aus Qdrant."""
|
||||
notes_col, _, _ = collection_names(prefix)
|
||||
try:
|
||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||
|
|
@ -30,10 +25,9 @@ def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optio
|
|||
return None
|
||||
|
||||
def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]:
|
||||
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges für eine Note."""
|
||||
"""Prüft auf vorhandene Chunks und Edges."""
|
||||
_, chunks_col, edges_col = collection_names(prefix)
|
||||
try:
|
||||
# Filter für die Existenz-Prüfung (Klassisch via note_id)
|
||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||
c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1)
|
||||
e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1)
|
||||
|
|
@ -45,17 +39,12 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[
|
|||
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
|
||||
"""
|
||||
WP-24c: Prüft, ob eine Kante mit der gegebenen ID bereits als 'explizit' existiert.
|
||||
Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen
|
||||
durch virtuelle Symmetrie-Kanten zu verhindern.
|
||||
Verhindert das Überschreiben von manuellem Wissen durch Symmetrie-Kanten.
|
||||
"""
|
||||
_, _, edges_col = collection_names(prefix)
|
||||
try:
|
||||
# retrieve erwartet eine Liste von IDs
|
||||
res = client.retrieve(
|
||||
collection_name=edges_col,
|
||||
ids=[edge_id],
|
||||
with_payload=True
|
||||
)
|
||||
# retrieve ist der schnellste Weg, um einen Punkt via ID zu laden
|
||||
res = client.retrieve(collection_name=edges_col, ids=[edge_id], with_payload=True)
|
||||
if res and not res[0].payload.get("virtual", False):
|
||||
return True
|
||||
return False
|
||||
|
|
@ -63,37 +52,15 @@ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) ->
|
|||
return False
|
||||
|
||||
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
|
||||
"""
|
||||
WP-24c: Selektives Löschen von Artefakten vor einem Re-Import.
|
||||
Implementiert das Origin-Purge-Prinzip zur Sicherung der bidirektionalen Graph-Integrität.
|
||||
"""
|
||||
"""Löscht Artefakte basierend auf ihrer Herkunft (Origin)."""
|
||||
_, chunks_col, edges_col = collection_names(prefix)
|
||||
|
||||
try:
|
||||
# 1. Chunks löschen (immer fest an die note_id gebunden)
|
||||
chunks_filter = rest.Filter(must=[
|
||||
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
|
||||
])
|
||||
client.delete(
|
||||
collection_name=chunks_col,
|
||||
points_selector=rest.FilterSelector(filter=chunks_filter)
|
||||
)
|
||||
|
||||
# 2. WP-24c: Kanten löschen (HERKUNFTS-BASIERT)
|
||||
# Wir löschen alle Kanten, die von DIESER Note erzeugt wurden (origin_note_id).
|
||||
# Dies umfasst:
|
||||
# - Alle ausgehenden Kanten (A -> B)
|
||||
# - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A)
|
||||
# Fremde inverse Kanten (C -> A), die von anderen Notizen stammen, bleiben erhalten.
|
||||
edges_filter = rest.Filter(must=[
|
||||
rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))
|
||||
])
|
||||
client.delete(
|
||||
collection_name=edges_col,
|
||||
points_selector=rest.FilterSelector(filter=edges_filter)
|
||||
)
|
||||
chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||
client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter))
|
||||
|
||||
# Origin-basiertes Löschen schützt fremde inverse Kanten
|
||||
edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))])
|
||||
client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter))
|
||||
logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}")
|
||||
|
|
@ -22,7 +22,7 @@ from app.core.parser import (
|
|||
validate_required_frontmatter, NoteContext
|
||||
)
|
||||
from app.core.chunking import assemble_chunks
|
||||
# WP-24c: Import für die deterministische UUID-Vorabberechnung
|
||||
# WP-24c: Import für die deterministische ID-Vorabberechnung
|
||||
from app.core.graph.graph_utils import _mk_edge_id
|
||||
|
||||
# Datenbank-Ebene (Modularisierte database-Infrastruktur)
|
||||
|
|
@ -83,7 +83,7 @@ class IngestionService:
|
|||
# WP-15b: Kontext-Gedächtnis für ID-Auflösung
|
||||
self.batch_cache: Dict[str, NoteContext] = {}
|
||||
|
||||
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion)
|
||||
# WP-24c: Puffer für Phase 2 (Symmetrie-Injektion nach Persistierung)
|
||||
self.symmetry_buffer: List[Dict[str, Any]] = []
|
||||
|
||||
try:
|
||||
|
|
@ -143,7 +143,7 @@ class IngestionService:
|
|||
success_count += 1
|
||||
|
||||
# 3. Schritt: SYMMETRY INJECTION (PHASE 2)
|
||||
# Erst jetzt, wo alle manuellen Kanten in Qdrant liegen, prüfen wir die Symmetrien.
|
||||
# Erst jetzt, wo alle manuellen Kanten in Qdrant liegen, schreiben wir die Symmetrien.
|
||||
if self.symmetry_buffer:
|
||||
logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen Live-DB...")
|
||||
final_virtuals = []
|
||||
|
|
@ -244,7 +244,6 @@ class IngestionService:
|
|||
is_valid = await validate_edge_candidate(
|
||||
ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
|
||||
)
|
||||
# Fix (v3.3.2): Sicherer Zugriff via .get() verhindert Crash
|
||||
t_id = cand.get('target_id') or cand.get('note_id') or "Unknown"
|
||||
logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}")
|
||||
if is_valid: new_pool.append(cand)
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user