Update ingestion_processor.py to version 3.1.7: Enhance authority enforcement for explicit edges by implementing runtime ID protection and database checks to prevent overwriting. Refactor edge generation logic to ensure strict authority compliance and improve symmetry handling. Adjust versioning and documentation accordingly.

This commit is contained in:
Lars 2026-01-09 21:31:44 +01:00
parent 2c18f8b3de
commit 9cb08777fa

View File

@ -5,9 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.1.6: ID-Kollisions-Schutz & Point-Authority Check gegen AUDIT v3.1.7: Explicit Authority Enforcement. Verhindert durch interne
Überschreiben expliziter Kanten. ID-Registry und DB-Abgleich das Überschreiben manueller Kanten.
VERSION: 3.1.6 (WP-24c: Deterministic ID Protection) VERSION: 3.1.7 (WP-24c: Strict Authority Protection)
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -72,6 +72,9 @@ class IngestionService:
# Festlegen, welcher Hash für die Change-Detection maßgeblich ist # Festlegen, welcher Hash für die Change-Detection maßgeblich ist
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache
# WP-24c: Laufzeit-Speicher für explizite Kanten-IDs im aktuellen Batch
self.processed_explicit_ids = set()
try: try:
# Aufruf der modularisierten Schema-Logik # Aufruf der modularisierten Schema-Logik
@ -86,6 +89,9 @@ class IngestionService:
Pass 1: Pre-Scan füllt den Context-Cache (3-Wege-Indexierung). Pass 1: Pre-Scan füllt den Context-Cache (3-Wege-Indexierung).
Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung. Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung.
""" """
# Reset der Authority-Registry für den neuen Batch
self.processed_explicit_ids.clear()
logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...")
for path in file_paths: for path in file_paths:
try: try:
@ -228,10 +234,11 @@ class IngestionService:
include_note_scope_refs=note_scope_refs include_note_scope_refs=note_scope_refs
) )
# --- WP-24c: Symmetrie-Injektion (Bidirektionale Graph-Logik) --- # --- WP-24c: Symmetrie-Injektion (Authority Implementation) ---
final_edges = [] final_edges = []
# PHASE 1: Alle expliziten Kanten vorverarbeiten und registrieren
for e in raw_edges: for e in raw_edges:
# 1. Primär-Kante kanonisieren & Owner setzen
resolved_kind = edge_registry.resolve( resolved_kind = edge_registry.resolve(
e.get("kind", "related_to"), e.get("kind", "related_to"),
provenance=e.get("provenance", "explicit"), provenance=e.get("provenance", "explicit"),
@ -240,11 +247,22 @@ class IngestionService:
e["kind"] = resolved_kind e["kind"] = resolved_kind
# Markierung der Herkunft für selektiven Purge # Markierung der Herkunft für selektiven Purge
e["origin_note_id"] = note_id e["origin_note_id"] = note_id
e["virtual"] = False # Explizite Kanten sind niemals virtuell e["virtual"] = False # Authority-Markierung für explizite Kanten
final_edges.append(e) e["confidence"] = e.get("confidence", 1.0) # Volle Gewichtung
# 2. Symmetrie-Ermittlung via Registry # Registrierung der ID im Laufzeit-Schutz (Authority)
inverse_kind = edge_registry.get_inverse(resolved_kind) edge_id = _mk_edge_id(resolved_kind, note_id, e.get("target_id"), e.get("scope", "note"))
self.processed_explicit_ids.add(edge_id)
final_edges.append(e)
# PHASE 2: Symmetrische Kanten (Invers) mit Authority-Schutz erzeugen
# Wir nutzen hierfür nur die expliziten Kanten aus Phase 1 als Basis
explicit_only = [x for x in final_edges if not x.get("virtual")]
for e in explicit_only:
kind = e["kind"]
inverse_kind = edge_registry.get_inverse(kind)
target_raw = e.get("target_id") target_raw = e.get("target_id")
# ID-Resolution: Finden der echten Note_ID im Cache # ID-Resolution: Finden der echten Note_ID im Cache
@ -254,52 +272,39 @@ class IngestionService:
# Validierung für Symmetrie-Erzeugung (Kein Self-Loop, Existenz der Inversen) # Validierung für Symmetrie-Erzeugung (Kein Self-Loop, Existenz der Inversen)
if (inverse_kind and target_canonical_id and target_canonical_id != note_id): if (inverse_kind and target_canonical_id and target_canonical_id != note_id):
# A. Lokale Redundanz: Hat der User in DIESER Note schon die Gegenrichtung definiert? # 1. ID der potenziellen virtuellen Kante berechnen
is_local_redundant = any( # Wir nutzen exakt die Parameter, die auch points_for_edges nutzt
ex.get("target_id") == target_canonical_id and potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note"))
edge_registry.resolve(ex.get("kind")) == inverse_kind
for ex in raw_edges # 2. AUTHORITY-CHECK A: Wurde diese Kante bereits explizit im aktuellen Batch registriert?
) is_in_batch = potential_id in self.processed_explicit_ids
# 3. AUTHORITY-CHECK B: Existiert sie bereits als explizit in der Datenbank?
is_in_db = False
if not is_in_batch:
is_in_db = await self._is_explicit_edge_in_db(potential_id)
# B. Cross-Note Redundanz Check (v3.1.6): Schutz vor Point-Überschreibung # 4. Filter: Nur anlegen, wenn KEINE explizite Autorität vorliegt
is_cross_protected = False # Keine Abwertung der Confidence auf Wunsch des Nutzers
if not is_in_batch and not is_in_db:
# 1. Prüfung im Batch-Cache (für Notizen im gleichen Lauf) if (inverse_kind != kind or kind not in ["related_to", "references"]):
if target_ctx and hasattr(target_ctx, 'links'): inv_edge = e.copy()
for link in target_ctx.links:
link_to_id = self.batch_cache.get(link.get("to"), {}).note_id or link.get("to") # Richtungs-Umkehr
if link_to_id == note_id: inv_edge["note_id"] = target_canonical_id
planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to")) inv_edge["target_id"] = note_id
if planned_kind_in_target == inverse_kind: inv_edge["kind"] = inverse_kind
is_cross_protected = True
break # Metadaten für Struktur-Kante
inv_edge["virtual"] = True
# 2. Point-Authority Check (v3.1.6): ID berechnen und in DB prüfen inv_edge["provenance"] = "structure"
if not is_cross_protected: inv_edge["confidence"] = e.get("confidence", 1.0) # Gewichtung bleibt gleich
# Wir simulieren die ID, die diese Kante in Qdrant hätte
# Parameter: kind, source_id, target_id, scope # Lifecycle-Verankerung: Diese Kante gehört logisch zum Verursacher (Note A)
potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note")) inv_edge["origin_note_id"] = note_id
is_cross_protected = await self._is_explicit_edge_in_db(potential_id)
final_edges.append(inv_edge)
# Nur anlegen, wenn keine Form von Redundanz/Schutz vorliegt logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}")
if not is_local_redundant and not is_cross_protected and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]):
inv_edge = e.copy()
# Richtungs-Umkehr
inv_edge["note_id"] = target_canonical_id
inv_edge["target_id"] = note_id
inv_edge["kind"] = inverse_kind
# Metadaten für Struktur-Kante
inv_edge["virtual"] = True
inv_edge["provenance"] = "structure"
inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9
# Lifecycle-Verankerung: Diese Kante gehört logisch zum Verursacher (Note A)
inv_edge["origin_note_id"] = note_id
final_edges.append(inv_edge)
logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}")
edges = final_edges edges = final_edges