mindnet/app/core/ingestion/ingestion_db.py
Lars 7cc823e2f4 NEUSTART von vorne mit frischer Codebasis
Update qdrant_points.py, graph_utils.py, ingestion_db.py, ingestion_processor.py, and import_markdown.py: Enhance UUID generation for edge IDs, improve error handling, and refine documentation for clarity. Implement atomic consistency in batch upserts and ensure strict phase separation in the ingestion workflow. Update versioning to reflect changes in functionality and maintain compatibility with the ingestion service.
2026-01-10 10:56:47 +01:00

116 lines
4.4 KiB
Python

"""
FILE: app/core/ingestion/ingestion_db.py
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
WP-14: Umstellung auf zentrale database-Infrastruktur.
WP-24c: Integration der Authority-Prüfung für Point-IDs.
Ermöglicht dem Prozessor die Unterscheidung zwischen
manueller Nutzer-Autorität und virtuellen Symmetrien.
VERSION: 2.2.0 (WP-24c: Authority Lookup Integration)
STATUS: Active
"""
import logging
from typing import Optional, Tuple, List
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz
from app.core.database import collection_names
logger = logging.getLogger(__name__)
def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]:
"""
Holt die Metadaten einer Note aus Qdrant via Scroll-API.
Wird primär für die Change-Detection (Hash-Vergleich) genutzt.
"""
notes_col, _, _ = collection_names(prefix)
try:
f = rest.Filter(must=[
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
])
pts, _ = client.scroll(
collection_name=notes_col,
scroll_filter=f,
limit=1,
with_payload=True
)
return pts[0].payload if pts else None
except Exception as e:
logger.debug(f"Note {note_id} not found or error during fetch: {e}")
return None
def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]:
"""
Prüft Qdrant aktiv auf vorhandene Chunks und Edges für eine Note.
Gibt (chunks_missing, edges_missing) als Boolean-Tupel zurück.
"""
_, chunks_col, edges_col = collection_names(prefix)
try:
# Filter für die note_id Suche
f = rest.Filter(must=[
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
])
c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1)
e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts))
except Exception as e:
logger.error(f"Error checking artifacts for {note_id}: {e}")
return True, True
def is_explicit_edge_present(client: QdrantClient, prefix: str, edge_id: str) -> bool:
"""
WP-24c: Prüft via Point-ID, ob bereits eine explizite Kante existiert.
Wird vom IngestionProcessor in Phase 2 genutzt, um das Überschreiben
von manuellem Wissen durch virtuelle Symmetrie-Kanten zu verhindern.
Args:
edge_id: Die deterministisch berechnete UUID der Kante.
Returns:
True, wenn eine physische Kante (virtual=False) existiert.
"""
if not edge_id:
return False
_, _, edges_col = collection_names(prefix)
try:
# retrieve ist die effizienteste Methode für den Zugriff via ID
res = client.retrieve(
collection_name=edges_col,
ids=[edge_id],
with_payload=True
)
if res and len(res) > 0:
# Wir prüfen das 'virtual' Flag im Payload
is_virtual = res[0].payload.get("virtual", False)
if not is_virtual:
return True # Es ist eine explizite Nutzer-Kante
return False
except Exception as e:
logger.debug(f"Authority check failed for ID {edge_id}: {e}")
return False
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
"""
Löscht verwaiste Chunks und Edges einer Note vor einem Re-Import.
Stellt sicher, dass keine Duplikate bei Inhaltsänderungen entstehen.
"""
_, chunks_col, edges_col = collection_names(prefix)
try:
f = rest.Filter(must=[
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
])
# Chunks löschen
client.delete(
collection_name=chunks_col,
points_selector=rest.FilterSelector(filter=f)
)
# Edges löschen
client.delete(
collection_name=edges_col,
points_selector=rest.FilterSelector(filter=f)
)
logger.info(f"🧹 [PURGE] Local artifacts for '{note_id}' cleared.")
except Exception as e:
logger.error(f"❌ [PURGE ERROR] Failed to clear artifacts for {note_id}: {e}")