""" FILE: app/core/graph/graph_db_adapter.py DESCRIPTION: Datenbeschaffung aus Qdrant für den Graphen. AUDIT v1.1.1: Volle Unterstützung für WP-15c Metadaten. Stellt sicher, dass 'target_section' und 'provenance' für die Super-Edge-Aggregation im Retriever geladen werden. """ from typing import List, Dict, Optional from qdrant_client import QdrantClient from qdrant_client.http import models as rest # Nutzt die zentrale Infrastruktur für konsistente Collection-Namen (WP-14) from app.core.database import collection_names def fetch_edges_from_qdrant( client: QdrantClient, prefix: str, seeds: List[str], edge_types: Optional[List[str]] = None, limit: int = 2048, ) -> List[Dict]: """ Holt Edges aus der Datenbank basierend auf Seed-IDs. WP-15c: Erhält alle Metadaten für das Note-Level Diversity Pooling. """ if not seeds or limit <= 0: return [] # Konsistente Namensauflösung via database-Paket # Rückgabe: (notes_col, chunks_col, edges_col) _, _, edges_col = collection_names(prefix) # Wir suchen Kanten, bei denen die Seed-IDs entweder Quelle, Ziel oder Kontext-Note sind. seed_conditions = [] for field in ("source_id", "target_id", "note_id"): for s in seeds: seed_conditions.append( rest.FieldCondition(key=field, match=rest.MatchValue(value=str(s))) ) seeds_filter = rest.Filter(should=seed_conditions) if seed_conditions else None # Optionaler Filter auf spezifische Kanten-Typen (z.B. für Intent-Routing) type_filter = None if edge_types: type_conds = [ rest.FieldCondition(key="kind", match=rest.MatchValue(value=str(k))) for k in edge_types ] type_filter = rest.Filter(should=type_conds) must = [] if seeds_filter: must.append(seeds_filter) if type_filter: must.append(type_filter) flt = rest.Filter(must=must) if must else None # Abfrage via Qdrant Scroll API # WICHTIG: with_payload=True lädt alle Metadaten (target_section, provenance etc.) pts, _ = client.scroll( collection_name=edges_col, scroll_filter=flt, limit=limit, with_payload=True, with_vectors=False, ) # Wir geben das vollständige Payload zurück, damit der Retriever # alle Signale für die Super-Edge-Aggregation und das Scoring hat. return [dict(p.payload) for p in pts if p.payload]