diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 4017fe0..645de2c 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,8 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.1.4: Semantischer Cross-Note Redundanz-Check (Typ-spezifisch). -VERSION: 3.1.4 (WP-24c: Semantic Cross-Note Redundancy Fix) + AUDIT v3.1.5: Datenbank-gestützter Redundanz-Check verhindert das + Überschreiben expliziter Kanten durch virtuelle Symmetrien. +VERSION: 3.1.5 (WP-24c: DB-Aware Redundancy Check) STATUS: Active """ import logging @@ -24,6 +25,7 @@ from app.core.chunking import assemble_chunks # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch +from qdrant_client.http import models as rest # WICHTIG: Für den Real-Time DB-Check # Services from app.services.embeddings_client import EmbeddingsClient @@ -99,6 +101,27 @@ class IngestionService: logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths] + async def _check_db_for_explicit_edge(self, source_id: str, target_id: str, kind: str) -> bool: + """ + WP-24c: Real-Time Abfrage gegen Qdrant, ob bereits eine explizite Kante existiert. + Verhindert das Überschreiben korrekter 'origin_note_ids' durch virtuelle Symmetrien. + """ + edges_col = f"{self.prefix}_edges" + try: + query_filter = rest.Filter( + must=[ + rest.FieldCondition(key="note_id", match=rest.MatchValue(value=source_id)), + rest.FieldCondition(key="target_id", match=rest.MatchValue(value=target_id)), + rest.FieldCondition(key="kind", match=rest.MatchValue(value=kind)), + rest.FieldCondition(key="virtual", match=rest.MatchValue(value=False)) # Nur echte Kanten + ] + ) + # Nutzt Scroll für eine effiziente Existenzprüfung + res, _ = self.client.scroll(collection_name=edges_col, scroll_filter=query_filter, limit=1) + return len(res) > 0 + except Exception: + return False + async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: """Transformiert eine Markdown-Datei in den Graphen.""" apply = kwargs.get("apply", False) @@ -236,24 +259,26 @@ class IngestionService: for ex in raw_edges ) - # B. Cross-Note Redundanz Fix (v3.1.4): Prüfe auf identischen semantischen Beziehungstyp in der Ziel-Note + # B. Cross-Note Redundanz Check (v3.1.5): Prüfe Batch-Cache UND Datenbank is_cross_redundant = False + + # 1. Prüfung im Batch-Cache (für Notizen im gleichen Lauf) if target_ctx and hasattr(target_ctx, 'links'): for link in target_ctx.links: - link_to = link.get("to") - # Auflösung des Link-Ziels der anderen Note - link_to_ctx = self.batch_cache.get(link_to) - link_to_id = link_to_ctx.note_id if link_to_ctx else link_to - + link_to_id = self.batch_cache.get(link.get("to"), {}).note_id or link.get("to") if link_to_id == note_id: - # Wir prüfen nun, ob der Beziehungstyp in der Ziel-Note semantisch identisch - # mit der geplanten Symmetrie-Kante ist. planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to")) if planned_kind_in_target == inverse_kind: is_cross_redundant = True break - # Nur anlegen, wenn keine semantische Redundanz vorliegt und kein simpler Loop + # 2. Prüfung in der Datenbank (für bereits existierende Notizen) + if not is_cross_redundant: + is_cross_redundant = await self._check_db_for_explicit_edge( + target_canonical_id, note_id, inverse_kind + ) + + # Nur anlegen, wenn keine semantische Redundanz vorliegt if not is_local_redundant and not is_cross_redundant and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]): inv_edge = e.copy()