Update ingestion_db.py and ingestion_processor.py to version 2.2.0 and 3.1.6 respectively: Integrate authority checks for Point-IDs and enhance edge validation logic to prevent overwriting explicit edges by virtual symmetries. Introduce new function to verify explicit edge presence in the database, ensuring improved integrity in edge generation. Adjust versioning and documentation accordingly.

This commit is contained in:
Lars 2026-01-09 21:07:02 +01:00
parent d5d6987ce2
commit 2c18f8b3de
2 changed files with 53 additions and 30 deletions

View File

@ -4,11 +4,12 @@ DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
WP-14: Umstellung auf zentrale database-Infrastruktur. WP-14: Umstellung auf zentrale database-Infrastruktur.
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge). WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import. Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import.
VERSION: 2.1.0 (WP-24c: Protected Purge Logic) VERSION v2.2.0: Integration der Authority-Prüfung für Point-IDs.
VERSION: 2.2.0 (WP-24c: Protected Purge & Authority Lookup)
STATUS: Active STATUS: Active
""" """
import logging import logging
from typing import Optional, Tuple from typing import Optional, Tuple, List
from qdrant_client import QdrantClient from qdrant_client import QdrantClient
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
@ -41,6 +42,25 @@ def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[
logger.error(f"Error checking artifacts for {note_id}: {e}") logger.error(f"Error checking artifacts for {note_id}: {e}")
return True, True return True, True
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
"""
WP-24c: Prüft, ob eine Kante mit der gegebenen ID bereits als 'explizit' existiert.
Wird vom IngestionProcessor genutzt, um das Überschreiben von manuellem Wissen
durch virtuelle Symmetrie-Kanten zu verhindern.
"""
_, _, edges_col = collection_names(prefix)
try:
res = client.retrieve(
collection_name=edges_col,
ids=[edge_id],
with_payload=True
)
if res and not res[0].payload.get("virtual", False):
return True
return False
except Exception:
return False
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
""" """
WP-24c: Selektives Löschen von Artefakten vor einem Re-Import. WP-24c: Selektives Löschen von Artefakten vor einem Re-Import.
@ -63,7 +83,7 @@ def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
# Dies umfasst: # Dies umfasst:
# - Alle ausgehenden Kanten (A -> B) # - Alle ausgehenden Kanten (A -> B)
# - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A) # - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A)
# Fremde inverse Kanten (C -> A) bleiben erhalten. # Fremde virtuelle Kanten (C -> A) bleiben erhalten, da deren origin_note_id == C ist.
edges_filter = rest.Filter(must=[ edges_filter = rest.Filter(must=[
rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id)) rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))
]) ])

View File

@ -5,9 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.1.5: Datenbank-gestützter Redundanz-Check verhindert das AUDIT v3.1.6: ID-Kollisions-Schutz & Point-Authority Check gegen
Überschreiben expliziter Kanten durch virtuelle Symmetrien. Überschreiben expliziter Kanten.
VERSION: 3.1.5 (WP-24c: DB-Aware Redundancy Check) VERSION: 3.1.6 (WP-24c: Deterministic ID Protection)
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -21,11 +21,13 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext validate_required_frontmatter, NoteContext
) )
from app.core.chunking import assemble_chunks from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische ID-Vorabberechnung
from app.core.graph.graph_utils import _mk_edge_id
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from qdrant_client.http import models as rest # WICHTIG: Für den Real-Time DB-Check from qdrant_client.http import models as rest # Für Real-Time DB-Checks
# Services # Services
from app.services.embeddings_client import EmbeddingsClient from app.services.embeddings_client import EmbeddingsClient
@ -101,24 +103,23 @@ class IngestionService:
logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...")
return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths] return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths]
async def _check_db_for_explicit_edge(self, source_id: str, target_id: str, kind: str) -> bool: async def _is_explicit_edge_in_db(self, edge_id: str) -> bool:
""" """
WP-24c: Real-Time Abfrage gegen Qdrant, ob bereits eine explizite Kante existiert. WP-24c: Prüft via Point-ID, ob bereits eine explizite (manuelle) Kante in Qdrant liegt.
Verhindert das Überschreiben korrekter 'origin_note_ids' durch virtuelle Symmetrien. Verhindert, dass virtuelle Symmetrien bestehendes Wissen überschreiben.
""" """
edges_col = f"{self.prefix}_edges" edges_col = f"{self.prefix}_edges"
try: try:
query_filter = rest.Filter( # Direkte Punkt-Abfrage ist schneller als Scroll/Filter
must=[ res = self.client.retrieve(
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=source_id)), collection_name=edges_col,
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=target_id)), ids=[edge_id],
rest.FieldCondition(key="kind", match=rest.MatchValue(value=kind)), with_payload=True,
rest.FieldCondition(key="virtual", match=rest.MatchValue(value=False)) # Nur echte Kanten with_vectors=False
]
) )
# Nutzt Scroll für eine effiziente Existenzprüfung if res and not res[0].payload.get("virtual", False):
res, _ = self.client.scroll(collection_name=edges_col, scroll_filter=query_filter, limit=1) return True # Punkt existiert und ist NICHT virtuell
return len(res) > 0 return False
except Exception: except Exception:
return False return False
@ -239,6 +240,7 @@ class IngestionService:
e["kind"] = resolved_kind e["kind"] = resolved_kind
# Markierung der Herkunft für selektiven Purge # Markierung der Herkunft für selektiven Purge
e["origin_note_id"] = note_id e["origin_note_id"] = note_id
e["virtual"] = False # Explizite Kanten sind niemals virtuell
final_edges.append(e) final_edges.append(e)
# 2. Symmetrie-Ermittlung via Registry # 2. Symmetrie-Ermittlung via Registry
@ -259,8 +261,8 @@ class IngestionService:
for ex in raw_edges for ex in raw_edges
) )
# B. Cross-Note Redundanz Check (v3.1.5): Prüfe Batch-Cache UND Datenbank # B. Cross-Note Redundanz Check (v3.1.6): Schutz vor Point-Überschreibung
is_cross_redundant = False is_cross_protected = False
# 1. Prüfung im Batch-Cache (für Notizen im gleichen Lauf) # 1. Prüfung im Batch-Cache (für Notizen im gleichen Lauf)
if target_ctx and hasattr(target_ctx, 'links'): if target_ctx and hasattr(target_ctx, 'links'):
@ -269,17 +271,18 @@ class IngestionService:
if link_to_id == note_id: if link_to_id == note_id:
planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to")) planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to"))
if planned_kind_in_target == inverse_kind: if planned_kind_in_target == inverse_kind:
is_cross_redundant = True is_cross_protected = True
break break
# 2. Prüfung in der Datenbank (für bereits existierende Notizen) # 2. Point-Authority Check (v3.1.6): ID berechnen und in DB prüfen
if not is_cross_redundant: if not is_cross_protected:
is_cross_redundant = await self._check_db_for_explicit_edge( # Wir simulieren die ID, die diese Kante in Qdrant hätte
target_canonical_id, note_id, inverse_kind # Parameter: kind, source_id, target_id, scope
) potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note"))
is_cross_protected = await self._is_explicit_edge_in_db(potential_id)
# Nur anlegen, wenn keine semantische Redundanz vorliegt # Nur anlegen, wenn keine Form von Redundanz/Schutz vorliegt
if not is_local_redundant and not is_cross_redundant and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]): if not is_local_redundant and not is_cross_protected and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]):
inv_edge = e.copy() inv_edge = e.copy()
# Richtungs-Umkehr # Richtungs-Umkehr