mindnet/app/core/graph/graph_db_adapter.py

71 lines
2.5 KiB
Python

"""
FILE: app/core/graph/graph_db_adapter.py
DESCRIPTION: Datenbeschaffung aus Qdrant für den Graphen.
AUDIT v1.1.1: Volle Unterstützung für WP-15c Metadaten.
Stellt sicher, dass 'target_section' und 'provenance' für die
Super-Edge-Aggregation im Retriever geladen werden.
"""
from typing import List, Dict, Optional
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
# Nutzt die zentrale Infrastruktur für konsistente Collection-Namen (WP-14)
from app.core.database import collection_names
def fetch_edges_from_qdrant(
client: QdrantClient,
prefix: str,
seeds: List[str],
edge_types: Optional[List[str]] = None,
limit: int = 2048,
) -> List[Dict]:
"""
Holt Edges aus der Datenbank basierend auf Seed-IDs.
WP-15c: Erhält alle Metadaten für das Note-Level Diversity Pooling.
"""
if not seeds or limit <= 0:
return []
# Konsistente Namensauflösung via database-Paket
# Rückgabe: (notes_col, chunks_col, edges_col)
_, _, edges_col = collection_names(prefix)
# Wir suchen Kanten, bei denen die Seed-IDs entweder Quelle, Ziel oder Kontext-Note sind.
seed_conditions = []
for field in ("source_id", "target_id", "note_id"):
for s in seeds:
seed_conditions.append(
rest.FieldCondition(key=field, match=rest.MatchValue(value=str(s)))
)
seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None
# Optionaler Filter auf spezifische Kanten-Typen (z.B. für Intent-Routing)
type_filter = None
if edge_types:
type_conds = [
rest.FieldCondition(key="kind", match=rest.MatchValue(value=str(k)))
for k in edge_types
]
type_filter = rest.Filter(should=type_conds)
must = []
if seeds_filter:
must.append(seeds_filter)
if type_filter:
must.append(type_filter)
flt = rest.Filter(must=must) if must else None
# Abfrage via Qdrant Scroll API
# WICHTIG: with_payload=True lädt alle Metadaten (target_section, provenance etc.)
pts, _ = client.scroll(
collection_name=edges_col,
scroll_filter=flt,
limit=limit,
with_payload=True,
with_vectors=False,
)
# Wir geben das vollständige Payload zurück, damit der Retriever
# alle Signale für die Super-Edge-Aggregation und das Scoring hat.
return [dict(p.payload) for p in pts if p.payload]