diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 884c846..a9b656b 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,9 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.1.6: ID-Kollisions-Schutz & Point-Authority Check gegen - Überschreiben expliziter Kanten. -VERSION: 3.1.6 (WP-24c: Deterministic ID Protection) + AUDIT v3.1.7: Explicit Authority Enforcement. Verhindert durch interne + ID-Registry und DB-Abgleich das Überschreiben manueller Kanten. +VERSION: 3.1.7 (WP-24c: Strict Authority Protection) STATUS: Active """ import logging @@ -72,6 +72,9 @@ class IngestionService: # Festlegen, welcher Hash für die Change-Detection maßgeblich ist self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache + + # WP-24c: Laufzeit-Speicher für explizite Kanten-IDs im aktuellen Batch + self.processed_explicit_ids = set() try: # Aufruf der modularisierten Schema-Logik @@ -86,6 +89,9 @@ class IngestionService: Pass 1: Pre-Scan füllt den Context-Cache (3-Wege-Indexierung). Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung. """ + # Reset der Authority-Registry für den neuen Batch + self.processed_explicit_ids.clear() + logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") for path in file_paths: try: @@ -228,10 +234,11 @@ class IngestionService: include_note_scope_refs=note_scope_refs ) - # --- WP-24c: Symmetrie-Injektion (Bidirektionale Graph-Logik) --- + # --- WP-24c: Symmetrie-Injektion (Authority Implementation) --- final_edges = [] + + # PHASE 1: Alle expliziten Kanten vorverarbeiten und registrieren for e in raw_edges: - # 1. Primär-Kante kanonisieren & Owner setzen resolved_kind = edge_registry.resolve( e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), @@ -240,11 +247,22 @@ class IngestionService: e["kind"] = resolved_kind # Markierung der Herkunft für selektiven Purge e["origin_note_id"] = note_id - e["virtual"] = False # Explizite Kanten sind niemals virtuell - final_edges.append(e) + e["virtual"] = False # Authority-Markierung für explizite Kanten + e["confidence"] = e.get("confidence", 1.0) # Volle Gewichtung - # 2. Symmetrie-Ermittlung via Registry - inverse_kind = edge_registry.get_inverse(resolved_kind) + # Registrierung der ID im Laufzeit-Schutz (Authority) + edge_id = _mk_edge_id(resolved_kind, note_id, e.get("target_id"), e.get("scope", "note")) + self.processed_explicit_ids.add(edge_id) + + final_edges.append(e) + + # PHASE 2: Symmetrische Kanten (Invers) mit Authority-Schutz erzeugen + # Wir nutzen hierfür nur die expliziten Kanten aus Phase 1 als Basis + explicit_only = [x for x in final_edges if not x.get("virtual")] + + for e in explicit_only: + kind = e["kind"] + inverse_kind = edge_registry.get_inverse(kind) target_raw = e.get("target_id") # ID-Resolution: Finden der echten Note_ID im Cache @@ -254,52 +272,39 @@ class IngestionService: # Validierung für Symmetrie-Erzeugung (Kein Self-Loop, Existenz der Inversen) if (inverse_kind and target_canonical_id and target_canonical_id != note_id): - # A. Lokale Redundanz: Hat der User in DIESER Note schon die Gegenrichtung definiert? - is_local_redundant = any( - ex.get("target_id") == target_canonical_id and - edge_registry.resolve(ex.get("kind")) == inverse_kind - for ex in raw_edges - ) + # 1. ID der potenziellen virtuellen Kante berechnen + # Wir nutzen exakt die Parameter, die auch points_for_edges nutzt + potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note")) + + # 2. AUTHORITY-CHECK A: Wurde diese Kante bereits explizit im aktuellen Batch registriert? + is_in_batch = potential_id in self.processed_explicit_ids + + # 3. AUTHORITY-CHECK B: Existiert sie bereits als explizit in der Datenbank? + is_in_db = False + if not is_in_batch: + is_in_db = await self._is_explicit_edge_in_db(potential_id) - # B. Cross-Note Redundanz Check (v3.1.6): Schutz vor Point-Überschreibung - is_cross_protected = False - - # 1. Prüfung im Batch-Cache (für Notizen im gleichen Lauf) - if target_ctx and hasattr(target_ctx, 'links'): - for link in target_ctx.links: - link_to_id = self.batch_cache.get(link.get("to"), {}).note_id or link.get("to") - if link_to_id == note_id: - planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to")) - if planned_kind_in_target == inverse_kind: - is_cross_protected = True - break - - # 2. Point-Authority Check (v3.1.6): ID berechnen und in DB prüfen - if not is_cross_protected: - # Wir simulieren die ID, die diese Kante in Qdrant hätte - # Parameter: kind, source_id, target_id, scope - potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note")) - is_cross_protected = await self._is_explicit_edge_in_db(potential_id) - - # Nur anlegen, wenn keine Form von Redundanz/Schutz vorliegt - if not is_local_redundant and not is_cross_protected and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]): - inv_edge = e.copy() - - # Richtungs-Umkehr - inv_edge["note_id"] = target_canonical_id - inv_edge["target_id"] = note_id - inv_edge["kind"] = inverse_kind - - # Metadaten für Struktur-Kante - inv_edge["virtual"] = True - inv_edge["provenance"] = "structure" - inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9 - - # Lifecycle-Verankerung: Diese Kante gehört logisch zum Verursacher (Note A) - inv_edge["origin_note_id"] = note_id - - final_edges.append(inv_edge) - logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}") + # 4. Filter: Nur anlegen, wenn KEINE explizite Autorität vorliegt + # Keine Abwertung der Confidence auf Wunsch des Nutzers + if not is_in_batch and not is_in_db: + if (inverse_kind != kind or kind not in ["related_to", "references"]): + inv_edge = e.copy() + + # Richtungs-Umkehr + inv_edge["note_id"] = target_canonical_id + inv_edge["target_id"] = note_id + inv_edge["kind"] = inverse_kind + + # Metadaten für Struktur-Kante + inv_edge["virtual"] = True + inv_edge["provenance"] = "structure" + inv_edge["confidence"] = e.get("confidence", 1.0) # Gewichtung bleibt gleich + + # Lifecycle-Verankerung: Diese Kante gehört logisch zum Verursacher (Note A) + inv_edge["origin_note_id"] = note_id + + final_edges.append(inv_edge) + logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}") edges = final_edges