From 29e334625efb39da3011a7c66464929e5d155f60 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 10 Jan 2026 06:54:11 +0100 Subject: [PATCH] Refactor ingestion_db.py and ingestion_processor.py: Simplify comments and documentation for clarity, enhance artifact purging logic to protect against accidental deletions, and improve symmetry injection process descriptions. Update versioning to reflect changes in functionality and maintainability. --- app/core/ingestion/ingestion_db.py | 55 +++++------------------ app/core/ingestion/ingestion_processor.py | 7 ++- 2 files changed, 14 insertions(+), 48 deletions(-) diff --git a/app/core/ingestion/ingestion_db.py b/app/core/ingestion/ingestion_db.py index c90fdfa..74f22f8 100644 --- a/app/core/ingestion/ingestion_db.py +++ b/app/core/ingestion/ingestion_db.py @@ -1,10 +1,7 @@ """ FILE: app/core/ingestion/ingestion_db.py DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. - WP-14: Umstellung auf zentrale database-Infrastruktur. WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). - Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. - VERSION v2.2.0: Integration der Authority-Prüfung für Point-IDs. VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup) STATUS: Active """ @@ -12,14 +9,12 @@ import logging from typing import Optional, Tuple, List from qdrant_client import QdrantClient from qdrant_client.http import models as rest - -# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz from app.core.database import collection_names logger = logging.getLogger(__name__) def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]: - """Holt die Metadaten einer Note aus Qdrant via Scroll.""" + """Holt die Metadaten einer Note aus Qdrant.""" notes_col, _, _ = collection_names(prefix) try: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) @@ -30,10 +25,9 @@ def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optio return None def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]: - """Prüft Qdrant aktiv auf vorhandene Chunks und Edges für eine Note.""" + """Prüft auf vorhandene Chunks und Edges.""" _, chunks_col, edges_col = collection_names(prefix) try: - # Filter für die Existenz-Prüfung (Klassisch via note_id) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1) e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1) @@ -45,17 +39,12 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool: """ WP-24c: Prüft, ob eine Kante mit der gegebenen ID bereits als 'explizit' existiert. - Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen - durch virtuelle Symmetrie-Kanten zu verhindern. + Verhindert das Überschreiben von manuellem Wissen durch Symmetrie-Kanten. """ _, _, edges_col = collection_names(prefix) try: - # retrieve erwartet eine Liste von IDs - res = client.retrieve( - collection_name=edges_col, - ids=[edge_id], - with_payload=True - ) + # retrieve ist der schnellste Weg, um einen Punkt via ID zu laden + res = client.retrieve(collection_name=edges_col, ids=[edge_id], with_payload=True) if res and not res[0].payload.get("virtual", False): return True return False @@ -63,37 +52,15 @@ def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> return False def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): - """ - WP-24c: Selektives Löschen von Artefakten vor einem Re-Import. - Implementiert das Origin-Purge-Prinzip zur Sicherung der bidirektionalen Graph-Integrität. - """ + """Löscht Artefakte basierend auf ihrer Herkunft (Origin).""" _, chunks_col, edges_col = collection_names(prefix) - try: - # 1. Chunks löschen (immer fest an die note_id gebunden) - chunks_filter = rest.Filter(must=[ - rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)) - ]) - client.delete( - collection_name=chunks_col, - points_selector=rest.FilterSelector(filter=chunks_filter) - ) - - # 2. WP-24c: Kanten löschen (HERKUNFTS-BASIERT) - # Wir löschen alle Kanten, die von DIESER Note erzeugt wurden (origin_note_id). - # Dies umfasst: - # - Alle ausgehenden Kanten (A -> B) - # - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A) - # Fremde inverse Kanten (C -> A), die von anderen Notizen stammen, bleiben erhalten. - edges_filter = rest.Filter(must=[ - rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id)) - ]) - client.delete( - collection_name=edges_col, - points_selector=rest.FilterSelector(filter=edges_filter) - ) + chunks_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + client.delete(collection_name=chunks_col, points_selector=rest.FilterSelector(filter=chunks_filter)) + # Origin-basiertes Löschen schützt fremde inverse Kanten + edges_filter = rest.Filter(must=[rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))]) + client.delete(collection_name=edges_col, points_selector=rest.FilterSelector(filter=edges_filter)) logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.") - except Exception as e: logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}") \ No newline at end of file diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 0ad0e1c..d1ae98a 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -22,7 +22,7 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks -# WP-24c: Import für die deterministische UUID-Vorabberechnung +# WP-24c: Import für die deterministische ID-Vorabberechnung from app.core.graph.graph_utils import _mk_edge_id # Datenbank-Ebene (Modularisierte database-Infrastruktur) @@ -83,7 +83,7 @@ class IngestionService: # WP-15b: Kontext-Gedächtnis für ID-Auflösung self.batch_cache: Dict[str, NoteContext] = {} - # WP-24c: Puffer für Phase 2 (Symmetrie-Injektion) + # WP-24c: Puffer für Phase 2 (Symmetrie-Injektion nach Persistierung) self.symmetry_buffer: List[Dict[str, Any]] = [] try: @@ -143,7 +143,7 @@ class IngestionService: success_count += 1 # 3. Schritt: SYMMETRY INJECTION (PHASE 2) - # Erst jetzt, wo alle manuellen Kanten in Qdrant liegen, prüfen wir die Symmetrien. + # Erst jetzt, wo alle manuellen Kanten in Qdrant liegen, schreiben wir die Symmetrien. if self.symmetry_buffer: logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrie-Kanten gegen Live-DB...") final_virtuals = [] @@ -244,7 +244,6 @@ class IngestionService: is_valid = await validate_edge_candidate( ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator" ) - # Fix (v3.3.2): Sicherer Zugriff via .get() verhindert Crash t_id = cand.get('target_id') or cand.get('note_id') or "Unknown" logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}") if is_valid: new_pool.append(cand)