From 5e2a07401924af2c3d94af8a56312612c210f629 Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 9 Jan 2026 14:41:50 +0100 Subject: [PATCH] Implement origin-based purge logic in ingestion_db.py to prevent accidental deletion of inverse edges during re-imports. Enhance logging for error handling and artifact checks. Update ingestion_processor.py to support redundancy checks and improve symmetry logic for edge generation, ensuring bidirectional graph integrity. Version bump to 3.1.2. --- app/core/ingestion/ingestion_db.py | 57 ++++++++++++++++--- app/core/ingestion/ingestion_processor.py | 68 ++++++++++++++--------- 2 files changed, 89 insertions(+), 36 deletions(-) diff --git a/app/core/ingestion/ingestion_db.py b/app/core/ingestion/ingestion_db.py index 64cd57f..e36801d 100644 --- a/app/core/ingestion/ingestion_db.py +++ b/app/core/ingestion/ingestion_db.py @@ -2,7 +2,12 @@ FILE: app/core/ingestion/ingestion_db.py DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. WP-14: Umstellung auf zentrale database-Infrastruktur. + WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). + Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. +VERSION: 2.1.0 (WP-24c: Protected Purge Logic) +STATUS: Active """ +import logging from typing import Optional, Tuple from qdrant_client import QdrantClient from qdrant_client.http import models as rest @@ -10,6 +15,8 @@ from qdrant_client.http import models as rest # Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz from app.core.database import collection_names +logger = logging.getLogger(__name__) + def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]: """Holt die Metadaten einer Note aus Qdrant via Scroll.""" notes_col, _, _ = collection_names(prefix) @@ -17,23 +24,55 @@ def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optio f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) pts, _ = client.scroll(collection_name=notes_col, scroll_filter=f, limit=1, with_payload=True) return pts[0].payload if pts else None - except: return None + except Exception as e: + logger.debug(f"Note {note_id} not found: {e}") + return None def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]: - """Prüft Qdrant aktiv auf vorhandene Chunks und Edges.""" + """Prüft Qdrant aktiv auf vorhandene Chunks und Edges für eine Note.""" _, chunks_col, edges_col = collection_names(prefix) try: + # Filter für die Existenz-Prüfung (Klassisch via note_id) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1) e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1) return (not bool(c_pts)), (not bool(e_pts)) - except: return True, True + except Exception as e: + logger.error(f"Error checking artifacts for {note_id}: {e}") + return True, True def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): - """Löscht verwaiste Chunks/Edges vor einem Re-Import.""" + """ + WP-24c: Selektives Löschen von Artefakten vor einem Re-Import. + Implementiert das Origin-Purge-Prinzip zur Sicherung der bidirektionalen Graph-Integrität. + """ _, chunks_col, edges_col = collection_names(prefix) - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - # Iteration über die nun zentral verwalteten Collection-Namen - for col in [chunks_col, edges_col]: - try: client.delete(collection_name=col, points_selector=rest.FilterSelector(filter=f)) - except: pass \ No newline at end of file + + try: + # 1. Chunks löschen (immer fest an die note_id gebunden) + chunks_filter = rest.Filter(must=[ + rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)) + ]) + client.delete( + collection_name=chunks_col, + points_selector=rest.FilterSelector(filter=chunks_filter) + ) + + # 2. WP-24c: Kanten löschen (HERKUNFTS-BASIERT) + # Wir löschen alle Kanten, die von DIESER Note erzeugt wurden (origin_note_id). + # Dies umfasst: + # - Alle ausgehenden Kanten (A -> B) + # - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A) + # Fremde inverse Kanten (C -> A) bleiben erhalten. + edges_filter = rest.Filter(must=[ + rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id)) + ]) + client.delete( + collection_name=edges_col, + points_selector=rest.FilterSelector(filter=edges_filter) + ) + + logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.") + + except Exception as e: + logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}") \ No newline at end of file diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index dd3d78e..c9a3b7d 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,8 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.1.0: Korrektur der bidirektionalen Graph-Injektion für Qdrant. -VERSION: 3.1.0 (WP-24c: Symmetric Edge Injection Fix) + AUDIT v3.1.2: Redundanz-Check, ID-Resolution & Origin-Tracking. +VERSION: 3.1.2 (WP-24c: Redundancy-Aware Symmetric Ingestion) STATUS: Active """ import logging @@ -33,7 +33,6 @@ from app.services.llm_service import LLMService # Package-Interne Imports from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts -# WP-24c: Wir nutzen die Basis-Validierung; die Symmetrie wird im Prozessor injiziert from .ingestion_validation import validate_edge_candidate from .ingestion_note_payload import make_note_payload from .ingestion_chunk_payload import make_chunk_payloads @@ -205,48 +204,63 @@ class IngestionService: include_note_scope_refs=note_scope_refs ) - # --- WP-24c: Symmetrie-Injektion (Invers-Kanten Fix) --- - # Wir bauen die finalen Kanten-Objekte inklusive ihrer Gegenstücke + # --- WP-24c: Symmetrie-Injektion (Bidirektionale Graph-Logik) --- final_edges = [] for e in raw_edges: - # 1. Primär-Kante auflösen & kanonisieren + # 1. Primär-Kante kanonisieren & Owner setzen resolved_kind = edge_registry.resolve( e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), - context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")} + context={"file": file_path, "note_id": note_id} ) e["kind"] = resolved_kind + # Markierung der Herkunft für selektiven Purge + e["origin_note_id"] = note_id final_edges.append(e) - # 2. Symmetrie-Erzeugung via Registry + # 2. Symmetrie-Ermittlung via Registry inverse_kind = edge_registry.get_inverse(resolved_kind) - target_id = e.get("target_id") + target_raw = e.get("target_id") - # Wir erzeugen eine Inverse nur bei sinnvoller Symmetrie und existierendem Ziel - if inverse_kind and inverse_kind != resolved_kind and target_id: - # Deep Copy für die Inverse zur Vermeidung von Side-Effects - inv_edge = e.copy() + # ID-Resolution: Finden der echten Note_ID im Cache + target_ctx = self.batch_cache.get(target_raw) + target_canonical_id = target_ctx.note_id if target_ctx else target_raw + + # Validierung für Symmetrie-Erzeugung (Kein Self-Loop, Existenz der Inversen) + if (inverse_kind and target_canonical_id and target_canonical_id != note_id): - # Richtungs-Umkehr - inv_edge["note_id"] = target_id # Ursprung ist nun das Ziel - inv_edge["target_id"] = note_id # Ziel ist nun die Quelle - inv_edge["kind"] = inverse_kind + # REDUNDANZ-CHECK: Existiert bereits eine explizite Gegenrichtung? + is_redundant = any( + ex.get("target_id") == target_canonical_id and + edge_registry.resolve(ex.get("kind")) == inverse_kind + for ex in raw_edges + ) - # Metadaten-Anpassung - inv_edge["virtual"] = True - inv_edge["provenance"] = "structure" # Schutz durch Firewall - inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9 # Leichte Dämpfung - - # Lifecycle-Verankerung: Die Inverse gehört logisch zur Quell-Note - inv_edge["origin_note_id"] = note_id - - final_edges.append(inv_edge) - logger.info(f"🔄 [SYMMETRY] Built inverse in payload: {target_id} --({inverse_kind})--> {note_id}") + # Nur anlegen, wenn nicht redundant und kein simpler related_to Loop + if not is_redundant and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]): + inv_edge = e.copy() + + # Richtungs-Umkehr + inv_edge["note_id"] = target_canonical_id + inv_edge["target_id"] = note_id + inv_edge["kind"] = inverse_kind + + # Metadaten für Struktur-Kante + inv_edge["virtual"] = True + inv_edge["provenance"] = "structure" + inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9 + + # Lifecycle-Verankerung: Diese Kante gehört logisch zum Verursacher (Note A) + inv_edge["origin_note_id"] = note_id + + final_edges.append(inv_edge) + logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}") edges = final_edges # 4. DB Upsert via modularisierter Points-Logik if purge_before and old_payload: + # Hinweis: purge_artifacts wird im nächsten Schritt auf origin_note_id umgestellt purge_artifacts(self.client, self.prefix, note_id) # Speichern der Haupt-Note