Implement origin-based purge logic in ingestion_db.py to prevent accidental deletion of inverse edges during re-imports. Enhance logging for error handling and artifact checks. Update ingestion_processor.py to support redundancy checks and improve symmetry logic for edge generation, ensuring bidirectional graph integrity. Version bump to 3.1.2.
This commit is contained in:
parent
9b3fd7723e
commit
5e2a074019
|
|
@ -2,7 +2,12 @@
|
||||||
FILE: app/core/ingestion/ingestion_db.py
|
FILE: app/core/ingestion/ingestion_db.py
|
||||||
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
|
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
|
||||||
WP-14: Umstellung auf zentrale database-Infrastruktur.
|
WP-14: Umstellung auf zentrale database-Infrastruktur.
|
||||||
|
WP-24c: Implementierung der herkunftsbasierten Lösch-Logik (Origin-Purge).
|
||||||
|
Verhindert das versehentliche Löschen von inversen Kanten beim Re-Import.
|
||||||
|
VERSION: 2.1.0 (WP-24c: Protected Purge Logic)
|
||||||
|
STATUS: Active
|
||||||
"""
|
"""
|
||||||
|
import logging
|
||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
from qdrant_client import QdrantClient
|
from qdrant_client import QdrantClient
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
|
|
@ -10,6 +15,8 @@ from qdrant_client.http import models as rest
|
||||||
# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz
|
# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz
|
||||||
from app.core.database import collection_names
|
from app.core.database import collection_names
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]:
|
def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]:
|
||||||
"""Holt die Metadaten einer Note aus Qdrant via Scroll."""
|
"""Holt die Metadaten einer Note aus Qdrant via Scroll."""
|
||||||
notes_col, _, _ = collection_names(prefix)
|
notes_col, _, _ = collection_names(prefix)
|
||||||
|
|
@ -17,23 +24,55 @@ def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optio
|
||||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
pts, _ = client.scroll(collection_name=notes_col, scroll_filter=f, limit=1, with_payload=True)
|
pts, _ = client.scroll(collection_name=notes_col, scroll_filter=f, limit=1, with_payload=True)
|
||||||
return pts[0].payload if pts else None
|
return pts[0].payload if pts else None
|
||||||
except: return None
|
except Exception as e:
|
||||||
|
logger.debug(f"Note {note_id} not found: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]:
|
def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]:
|
||||||
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges."""
|
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges für eine Note."""
|
||||||
_, chunks_col, edges_col = collection_names(prefix)
|
_, chunks_col, edges_col = collection_names(prefix)
|
||||||
try:
|
try:
|
||||||
|
# Filter für die Existenz-Prüfung (Klassisch via note_id)
|
||||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1)
|
c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1)
|
||||||
e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1)
|
e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1)
|
||||||
return (not bool(c_pts)), (not bool(e_pts))
|
return (not bool(c_pts)), (not bool(e_pts))
|
||||||
except: return True, True
|
except Exception as e:
|
||||||
|
logger.error(f"Error checking artifacts for {note_id}: {e}")
|
||||||
|
return True, True
|
||||||
|
|
||||||
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
|
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
|
||||||
"""Löscht verwaiste Chunks/Edges vor einem Re-Import."""
|
"""
|
||||||
|
WP-24c: Selektives Löschen von Artefakten vor einem Re-Import.
|
||||||
|
Implementiert das Origin-Purge-Prinzip zur Sicherung der bidirektionalen Graph-Integrität.
|
||||||
|
"""
|
||||||
_, chunks_col, edges_col = collection_names(prefix)
|
_, chunks_col, edges_col = collection_names(prefix)
|
||||||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
|
||||||
# Iteration über die nun zentral verwalteten Collection-Namen
|
try:
|
||||||
for col in [chunks_col, edges_col]:
|
# 1. Chunks löschen (immer fest an die note_id gebunden)
|
||||||
try: client.delete(collection_name=col, points_selector=rest.FilterSelector(filter=f))
|
chunks_filter = rest.Filter(must=[
|
||||||
except: pass
|
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
|
||||||
|
])
|
||||||
|
client.delete(
|
||||||
|
collection_name=chunks_col,
|
||||||
|
points_selector=rest.FilterSelector(filter=chunks_filter)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. WP-24c: Kanten löschen (HERKUNFTS-BASIERT)
|
||||||
|
# Wir löschen alle Kanten, die von DIESER Note erzeugt wurden (origin_note_id).
|
||||||
|
# Dies umfasst:
|
||||||
|
# - Alle ausgehenden Kanten (A -> B)
|
||||||
|
# - Alle inversen Kanten, die diese Note in anderen Notizen "deponiert" hat (B -> A)
|
||||||
|
# Fremde inverse Kanten (C -> A) bleiben erhalten.
|
||||||
|
edges_filter = rest.Filter(must=[
|
||||||
|
rest.FieldCondition(key="origin_note_id", match=rest.MatchValue(value=note_id))
|
||||||
|
])
|
||||||
|
client.delete(
|
||||||
|
collection_name=edges_col,
|
||||||
|
points_selector=rest.FilterSelector(filter=edges_filter)
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"🧹 [PURGE] Global artifacts owned by '{note_id}' cleared.")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}")
|
||||||
|
|
@ -5,8 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
|
||||||
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
|
||||||
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
|
||||||
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
|
||||||
AUDIT v3.1.0: Korrektur der bidirektionalen Graph-Injektion für Qdrant.
|
AUDIT v3.1.2: Redundanz-Check, ID-Resolution & Origin-Tracking.
|
||||||
VERSION: 3.1.0 (WP-24c: Symmetric Edge Injection Fix)
|
VERSION: 3.1.2 (WP-24c: Redundancy-Aware Symmetric Ingestion)
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
|
@ -33,7 +33,6 @@ from app.services.llm_service import LLMService
|
||||||
# Package-Interne Imports
|
# Package-Interne Imports
|
||||||
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
|
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
|
||||||
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts
|
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts
|
||||||
# WP-24c: Wir nutzen die Basis-Validierung; die Symmetrie wird im Prozessor injiziert
|
|
||||||
from .ingestion_validation import validate_edge_candidate
|
from .ingestion_validation import validate_edge_candidate
|
||||||
from .ingestion_note_payload import make_note_payload
|
from .ingestion_note_payload import make_note_payload
|
||||||
from .ingestion_chunk_payload import make_chunk_payloads
|
from .ingestion_chunk_payload import make_chunk_payloads
|
||||||
|
|
@ -205,48 +204,63 @@ class IngestionService:
|
||||||
include_note_scope_refs=note_scope_refs
|
include_note_scope_refs=note_scope_refs
|
||||||
)
|
)
|
||||||
|
|
||||||
# --- WP-24c: Symmetrie-Injektion (Invers-Kanten Fix) ---
|
# --- WP-24c: Symmetrie-Injektion (Bidirektionale Graph-Logik) ---
|
||||||
# Wir bauen die finalen Kanten-Objekte inklusive ihrer Gegenstücke
|
|
||||||
final_edges = []
|
final_edges = []
|
||||||
for e in raw_edges:
|
for e in raw_edges:
|
||||||
# 1. Primär-Kante auflösen & kanonisieren
|
# 1. Primär-Kante kanonisieren & Owner setzen
|
||||||
resolved_kind = edge_registry.resolve(
|
resolved_kind = edge_registry.resolve(
|
||||||
e.get("kind", "related_to"),
|
e.get("kind", "related_to"),
|
||||||
provenance=e.get("provenance", "explicit"),
|
provenance=e.get("provenance", "explicit"),
|
||||||
context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")}
|
context={"file": file_path, "note_id": note_id}
|
||||||
)
|
)
|
||||||
e["kind"] = resolved_kind
|
e["kind"] = resolved_kind
|
||||||
|
# Markierung der Herkunft für selektiven Purge
|
||||||
|
e["origin_note_id"] = note_id
|
||||||
final_edges.append(e)
|
final_edges.append(e)
|
||||||
|
|
||||||
# 2. Symmetrie-Erzeugung via Registry
|
# 2. Symmetrie-Ermittlung via Registry
|
||||||
inverse_kind = edge_registry.get_inverse(resolved_kind)
|
inverse_kind = edge_registry.get_inverse(resolved_kind)
|
||||||
target_id = e.get("target_id")
|
target_raw = e.get("target_id")
|
||||||
|
|
||||||
# Wir erzeugen eine Inverse nur bei sinnvoller Symmetrie und existierendem Ziel
|
# ID-Resolution: Finden der echten Note_ID im Cache
|
||||||
if inverse_kind and inverse_kind != resolved_kind and target_id:
|
target_ctx = self.batch_cache.get(target_raw)
|
||||||
# Deep Copy für die Inverse zur Vermeidung von Side-Effects
|
target_canonical_id = target_ctx.note_id if target_ctx else target_raw
|
||||||
inv_edge = e.copy()
|
|
||||||
|
|
||||||
# Richtungs-Umkehr
|
# Validierung für Symmetrie-Erzeugung (Kein Self-Loop, Existenz der Inversen)
|
||||||
inv_edge["note_id"] = target_id # Ursprung ist nun das Ziel
|
if (inverse_kind and target_canonical_id and target_canonical_id != note_id):
|
||||||
inv_edge["target_id"] = note_id # Ziel ist nun die Quelle
|
|
||||||
inv_edge["kind"] = inverse_kind
|
|
||||||
|
|
||||||
# Metadaten-Anpassung
|
# REDUNDANZ-CHECK: Existiert bereits eine explizite Gegenrichtung?
|
||||||
inv_edge["virtual"] = True
|
is_redundant = any(
|
||||||
inv_edge["provenance"] = "structure" # Schutz durch Firewall
|
ex.get("target_id") == target_canonical_id and
|
||||||
inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9 # Leichte Dämpfung
|
edge_registry.resolve(ex.get("kind")) == inverse_kind
|
||||||
|
for ex in raw_edges
|
||||||
|
)
|
||||||
|
|
||||||
# Lifecycle-Verankerung: Die Inverse gehört logisch zur Quell-Note
|
# Nur anlegen, wenn nicht redundant und kein simpler related_to Loop
|
||||||
inv_edge["origin_note_id"] = note_id
|
if not is_redundant and (inverse_kind != resolved_kind or resolved_kind not in ["related_to", "references"]):
|
||||||
|
inv_edge = e.copy()
|
||||||
|
|
||||||
final_edges.append(inv_edge)
|
# Richtungs-Umkehr
|
||||||
logger.info(f"🔄 [SYMMETRY] Built inverse in payload: {target_id} --({inverse_kind})--> {note_id}")
|
inv_edge["note_id"] = target_canonical_id
|
||||||
|
inv_edge["target_id"] = note_id
|
||||||
|
inv_edge["kind"] = inverse_kind
|
||||||
|
|
||||||
|
# Metadaten für Struktur-Kante
|
||||||
|
inv_edge["virtual"] = True
|
||||||
|
inv_edge["provenance"] = "structure"
|
||||||
|
inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9
|
||||||
|
|
||||||
|
# Lifecycle-Verankerung: Diese Kante gehört logisch zum Verursacher (Note A)
|
||||||
|
inv_edge["origin_note_id"] = note_id
|
||||||
|
|
||||||
|
final_edges.append(inv_edge)
|
||||||
|
logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}")
|
||||||
|
|
||||||
edges = final_edges
|
edges = final_edges
|
||||||
|
|
||||||
# 4. DB Upsert via modularisierter Points-Logik
|
# 4. DB Upsert via modularisierter Points-Logik
|
||||||
if purge_before and old_payload:
|
if purge_before and old_payload:
|
||||||
|
# Hinweis: purge_artifacts wird im nächsten Schritt auf origin_note_id umgestellt
|
||||||
purge_artifacts(self.client, self.prefix, note_id)
|
purge_artifacts(self.client, self.prefix, note_id)
|
||||||
|
|
||||||
# Speichern der Haupt-Note
|
# Speichern der Haupt-Note
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user