Update qdrant_points.py, graph_utils.py, ingestion_db.py, ingestion_processor.py, and import_markdown.py: Enhance UUID generation for edge IDs, improve error handling, and refine documentation for clarity. Implement atomic consistency in batch upserts and ensure strict phase separation in the ingestion workflow. Update versioning to reflect changes in functionality and maintain compatibility with the ingestion service.
116 lines
4.4 KiB
Python
116 lines
4.4 KiB
Python
"""
|
|
FILE: app/core/ingestion/ingestion_db.py
|
|
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
|
|
WP-14: Umstellung auf zentrale database-Infrastruktur.
|
|
WP-24c: Integration der Authority-Prüfung für Point-IDs.
|
|
Ermöglicht dem Prozessor die Unterscheidung zwischen
|
|
manueller Nutzer-Autorität und virtuellen Symmetrien.
|
|
VERSION: 2.2.0 (WP-24c: Authority Lookup Integration)
|
|
STATUS: Active
|
|
"""
|
|
import logging
|
|
from typing import Optional, Tuple, List
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http import models as rest
|
|
|
|
# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz
|
|
from app.core.database import collection_names
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]:
|
|
"""
|
|
Holt die Metadaten einer Note aus Qdrant via Scroll-API.
|
|
Wird primär für die Change-Detection (Hash-Vergleich) genutzt.
|
|
"""
|
|
notes_col, _, _ = collection_names(prefix)
|
|
try:
|
|
f = rest.Filter(must=[
|
|
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
|
|
])
|
|
pts, _ = client.scroll(
|
|
collection_name=notes_col,
|
|
scroll_filter=f,
|
|
limit=1,
|
|
with_payload=True
|
|
)
|
|
return pts[0].payload if pts else None
|
|
except Exception as e:
|
|
logger.debug(f"Note {note_id} not found or error during fetch: {e}")
|
|
return None
|
|
|
|
def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]:
|
|
"""
|
|
Prüft Qdrant aktiv auf vorhandene Chunks und Edges für eine Note.
|
|
Gibt (chunks_missing, edges_missing) als Boolean-Tupel zurück.
|
|
"""
|
|
_, chunks_col, edges_col = collection_names(prefix)
|
|
try:
|
|
# Filter für die note_id Suche
|
|
f = rest.Filter(must=[
|
|
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
|
|
])
|
|
c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1)
|
|
e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1)
|
|
return (not bool(c_pts)), (not bool(e_pts))
|
|
except Exception as e:
|
|
logger.error(f"Error checking artifacts for {note_id}: {e}")
|
|
return True, True
|
|
|
|
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
|
|
"""
|
|
WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert.
|
|
Wird vom IngestionProcessor in Phase 2 genutzt, um das Überschreiben
|
|
von manuellem Wissen durch virtuelle Symmetrie-Kanten zu verhindern.
|
|
|
|
Args:
|
|
edge_id: Die deterministisch berechnete UUID der Kante.
|
|
Returns:
|
|
True, wenn eine physische Kante (virtual=False) existiert.
|
|
"""
|
|
if not edge_id:
|
|
return False
|
|
|
|
_, _, edges_col = collection_names(prefix)
|
|
try:
|
|
# retrieve ist die effizienteste Methode für den Zugriff via ID
|
|
res = client.retrieve(
|
|
collection_name=edges_col,
|
|
ids=[edge_id],
|
|
with_payload=True
|
|
)
|
|
|
|
if res and len(res) > 0:
|
|
# Wir prüfen das 'virtual' Flag im Payload
|
|
is_virtual = res[0].payload.get("virtual", False)
|
|
if not is_virtual:
|
|
return True # Es ist eine explizite Nutzer-Kante
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.debug(f"Authority check failed for ID {edge_id}: {e}")
|
|
return False
|
|
|
|
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
|
|
"""
|
|
Löscht verwaiste Chunks und Edges einer Note vor einem Re-Import.
|
|
Stellt sicher, dass keine Duplikate bei Inhaltsänderungen entstehen.
|
|
"""
|
|
_, chunks_col, edges_col = collection_names(prefix)
|
|
try:
|
|
f = rest.Filter(must=[
|
|
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
|
|
])
|
|
# Chunks löschen
|
|
client.delete(
|
|
collection_name=chunks_col,
|
|
points_selector=rest.FilterSelector(filter=f)
|
|
)
|
|
# Edges löschen
|
|
client.delete(
|
|
collection_name=edges_col,
|
|
points_selector=rest.FilterSelector(filter=f)
|
|
)
|
|
logger.info(f"🧹 [PURGE] Local artifacts for '{note_id}' cleared.")
|
|
except Exception as e:
|
|
logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}") |