diff --git a/app/core/ingestion/ingestion_db.py b/app/core/ingestion/ingestion_db.py index e36801d..2405e8d 100644 --- a/app/core/ingestion/ingestion_db.py +++ b/app/core/ingestion/ingestion_db.py @@ -4,11 +4,12 @@ DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. WP-14: Umstellung auf zentrale database-Infrastruktur. WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. -VERSION: 2.1.0 (WP-24c: Protected Purge Logic) + VERSION v2.2.0: Integration der Authority-Prüfung für Point-IDs. +VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup) STATUS: Active """ import logging -from typing import Optional, Tuple +from typing import Optional, Tuple, List from qdrant_client import QdrantClient from qdrant_client.http import models as rest @@ -41,6 +42,25 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[ logger.error(f"Error checking artifacts for {note_id}: {e}") return True, True +def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool: + """ + WP-24c: Prüft, ob eine Kante mit der gegebenen ID bereits als 'explizit' existiert. + Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen + durch virtuelle Symmetrie-Kanten zu verhindern. + """ + _, _, edges_col = collection_names(prefix) + try: + res = client.retrieve( + collection_name=edges_col, + ids=[edge_id], + with_payload=True + ) + if res and not res[0].payload.get("virtual", False): + return True + return False + except Exception: + return False + def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): """ WP-24c: Selektives Löschen von Artefakten vor einem Re-Import. @@ -63,7 +83,7 @@ def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): # Dies umfasst: # - Alle ausgehenden Kanten (A -> B) # - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A) - # Fremde inverse Kanten (C -> A) bleiben erhalten. + # Fremde virtuelle Kanten (C -> A) bleiben erhalten, da deren origin_note_id == C ist. edges_filter = rest.Filter(must=[ rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id)) ]) diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 645de2c..884c846 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,9 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.1.5: Datenbank-gestützter Redundanz-Check verhindert das - Überschreiben expliziter Kanten durch virtuelle Symmetrien. -VERSION: 3.1.5 (WP-24c: DB-Aware Redundancy Check) + AUDIT v3.1.6: ID-Kollisions-Schutz & Point-Authority Check gegen + Überschreiben expliziter Kanten. +VERSION: 3.1.6 (WP-24c: Deterministic ID Protection) STATUS: Active """ import logging @@ -21,11 +21,13 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks +# WP-24c: Import für die deterministische ID-Vorabberechnung +from app.core.graph.graph_utils import _mk_edge_id # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch -from qdrant_client.http import models as rest # WICHTIG: Für den Real-Time DB-Check +from qdrant_client.http import models as rest # Für Real-Time DB-Checks # Services from app.services.embeddings_client import EmbeddingsClient @@ -101,24 +103,23 @@ class IngestionService: logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths] - async def _check_db_for_explicit_edge(self, source_id: str, target_id: str, kind: str) -> bool: + async def _is_explicit_edge_in_db(self, edge_id: str) -> bool: """ - WP-24c: Real-Time Abfrage gegen Qdrant, ob bereits eine explizite Kante existiert. - Verhindert das Überschreiben korrekter 'origin_note_ids' durch virtuelle Symmetrien. + WP-24c: Prüft via Point-ID, ob bereits eine explizite (manuelle) Kante in Qdrant liegt. + Verhindert, dass virtuelle Symmetrien bestehendes Wissen überschreiben. """ edges_col = f"{self.prefix}_edges" try: - query_filter = rest.Filter( - must=[ - rest.FieldCondition(key="note_id", match=rest.MatchValue(value=source_id)), - rest.FieldCondition(key="target_id", match=rest.MatchValue(value=target_id)), - rest.FieldCondition(key="kind", match=rest.MatchValue(value=kind)), - rest.FieldCondition(key="virtual", match=rest.MatchValue(value=False)) # Nur echte Kanten - ] + # Direkte Punkt-Abfrage ist schneller als Scroll/Filter + res = self.client.retrieve( + collection_name=edges_col, + ids=[edge_id], + with_payload=True, + with_vectors=False ) - # Nutzt Scroll für eine effiziente Existenzprüfung - res, _ = self.client.scroll(collection_name=edges_col, scroll_filter=query_filter, limit=1) - return len(res) > 0 + if res and not res[0].payload.get("virtual", False): + return True # Punkt existiert und ist NICHT virtuell + return False except Exception: return False @@ -239,6 +240,7 @@ class IngestionService: e["kind"] = resolved_kind # Markierung der Herkunft für selektiven Purge e["origin_note_id"] = note_id + e["virtual"] = False # Explizite Kanten sind niemals virtuell final_edges.append(e) # 2. Symmetrie-Ermittlung via Registry @@ -259,8 +261,8 @@ class IngestionService: for ex in raw_edges ) - # B. Cross-Note Redundanz Check (v3.1.5): Prüfe Batch-Cache UND Datenbank - is_cross_redundant = False + # B. Cross-Note Redundanz Check (v3.1.6): Schutz vor Point-Überschreibung + is_cross_protected = False # 1. Prüfung im Batch-Cache (für Notizen im gleichen Lauf) if target_ctx and hasattr(target_ctx, 'links'): @@ -269,17 +271,18 @@ class IngestionService: if link_to_id == note_id: planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to")) if planned_kind_in_target == inverse_kind: - is_cross_redundant = True + is_cross_protected = True break - # 2. Prüfung in der Datenbank (für bereits existierende Notizen) - if not is_cross_redundant: - is_cross_redundant = await self._check_db_for_explicit_edge( - target_canonical_id, note_id, inverse_kind - ) + # 2. Point-Authority Check (v3.1.6): ID berechnen und in DB prüfen + if not is_cross_protected: + # Wir simulieren die ID, die diese Kante in Qdrant hätte + # Parameter: kind, source_id, target_id, scope + potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note")) + is_cross_protected = await self._is_explicit_edge_in_db(potential_id) - # Nur anlegen, wenn keine semantische Redundanz vorliegt - if not is_local_redundant and not is_cross_redundant and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]): + # Nur anlegen, wenn keine Form von Redundanz/Schutz vorliegt + if not is_local_redundant and not is_cross_protected and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]): inv_edge = e.copy() # Richtungs-Umkehr