Update ingestion_processor.py to version 3.1.5: Implement database-aware redundancy checks to prevent overwriting explicit edges by virtual symmetries. Enhance edge validation logic to include real-time database queries, ensuring improved integrity in edge generation. Adjust versioning and documentation accordingly.
This commit is contained in:
parent
61a319a049
commit
d5d6987ce2
|
|
@ -5,8 +5,9 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
|
||||||
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
||||||
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
||||||
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
||||||
AUDIT v3.1.4: Semantischer Cross-Note Redundanz-Check (Typ-spezifisch).
|
AUDIT v3.1.5: Datenbank-gestützter Redundanz-Check verhindert das
|
||||||
VERSION: 3.1.4 (WP-24c: Semantic Cross-Note Redundancy Fix)
|
Überschreiben expliziter Kanten durch virtuelle Symmetrien.
|
||||||
|
VERSION: 3.1.5 (WP-24c: DB-Aware Redundancy Check)
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
|
@ -24,6 +25,7 @@ from app.core.chunking import assemble_chunks
|
||||||
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
|
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
|
||||||
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
||||||
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
|
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
|
||||||
|
from qdrant_client.http import models as rest # WICHTIG: Für den Real-Time DB-Check
|
||||||
|
|
||||||
# Services
|
# Services
|
||||||
from app.services.embeddings_client import EmbeddingsClient
|
from app.services.embeddings_client import EmbeddingsClient
|
||||||
|
|
@ -99,6 +101,27 @@ class IngestionService:
|
||||||
logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...")
|
logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...")
|
||||||
return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths]
|
return [await self.process_file(p, vault_root, apply=True, purge_before=True) for p in file_paths]
|
||||||
|
|
||||||
|
async def _check_db_for_explicit_edge(self, source_id: str, target_id: str, kind: str) -> bool:
|
||||||
|
"""
|
||||||
|
WP-24c: Real-Time Abfrage gegen Qdrant, ob bereits eine explizite Kante existiert.
|
||||||
|
Verhindert das Überschreiben korrekter 'origin_note_ids' durch virtuelle Symmetrien.
|
||||||
|
"""
|
||||||
|
edges_col = f"{self.prefix}_edges"
|
||||||
|
try:
|
||||||
|
query_filter = rest.Filter(
|
||||||
|
must=[
|
||||||
|
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=source_id)),
|
||||||
|
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=target_id)),
|
||||||
|
rest.FieldCondition(key="kind", match=rest.MatchValue(value=kind)),
|
||||||
|
rest.FieldCondition(key="virtual", match=rest.MatchValue(value=False)) # Nur echte Kanten
|
||||||
|
]
|
||||||
|
)
|
||||||
|
# Nutzt Scroll für eine effiziente Existenzprüfung
|
||||||
|
res, _ = self.client.scroll(collection_name=edges_col, scroll_filter=query_filter, limit=1)
|
||||||
|
return len(res) > 0
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
|
async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]:
|
||||||
"""Transformiert eine Markdown-Datei in den Graphen."""
|
"""Transformiert eine Markdown-Datei in den Graphen."""
|
||||||
apply = kwargs.get("apply", False)
|
apply = kwargs.get("apply", False)
|
||||||
|
|
@ -236,24 +259,26 @@ class IngestionService:
|
||||||
for ex in raw_edges
|
for ex in raw_edges
|
||||||
)
|
)
|
||||||
|
|
||||||
# B. Cross-Note Redundanz Fix (v3.1.4): Prüfe auf identischen semantischen Beziehungstyp in der Ziel-Note
|
# B. Cross-Note Redundanz Check (v3.1.5): Prüfe Batch-Cache UND Datenbank
|
||||||
is_cross_redundant = False
|
is_cross_redundant = False
|
||||||
|
|
||||||
|
# 1. Prüfung im Batch-Cache (für Notizen im gleichen Lauf)
|
||||||
if target_ctx and hasattr(target_ctx, 'links'):
|
if target_ctx and hasattr(target_ctx, 'links'):
|
||||||
for link in target_ctx.links:
|
for link in target_ctx.links:
|
||||||
link_to = link.get("to")
|
link_to_id = self.batch_cache.get(link.get("to"), {}).note_id or link.get("to")
|
||||||
# Auflösung des Link-Ziels der anderen Note
|
|
||||||
link_to_ctx = self.batch_cache.get(link_to)
|
|
||||||
link_to_id = link_to_ctx.note_id if link_to_ctx else link_to
|
|
||||||
|
|
||||||
if link_to_id == note_id:
|
if link_to_id == note_id:
|
||||||
# Wir prüfen nun, ob der Beziehungstyp in der Ziel-Note semantisch identisch
|
|
||||||
# mit der geplanten Symmetrie-Kante ist.
|
|
||||||
planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to"))
|
planned_kind_in_target = edge_registry.resolve(link.get("kind", "related_to"))
|
||||||
if planned_kind_in_target == inverse_kind:
|
if planned_kind_in_target == inverse_kind:
|
||||||
is_cross_redundant = True
|
is_cross_redundant = True
|
||||||
break
|
break
|
||||||
|
|
||||||
# Nur anlegen, wenn keine semantische Redundanz vorliegt und kein simpler Loop
|
# 2. Prüfung in der Datenbank (für bereits existierende Notizen)
|
||||||
|
if not is_cross_redundant:
|
||||||
|
is_cross_redundant = await self._check_db_for_explicit_edge(
|
||||||
|
target_canonical_id, note_id, inverse_kind
|
||||||
|
)
|
||||||
|
|
||||||
|
# Nur anlegen, wenn keine semantische Redundanz vorliegt
|
||||||
if not is_local_redundant and not is_cross_redundant and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]):
|
if not is_local_redundant and not is_cross_redundant and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]):
|
||||||
inv_edge = e.copy()
|
inv_edge = e.copy()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user