""" FILE: app/core/database/qdrant_points.py DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) in PointStructs und generiert deterministische UUIDs. VERSION: 4.0.0 (WP-24c: Gold-Standard Identity - 4-Parameter-ID) STATUS: Active DEPENDENCIES: qdrant_client, uuid, os, app.core.graph.graph_utils LAST_ANALYSIS: 2026-01-10 """ from __future__ import annotations import os import uuid from typing import List, Tuple, Iterable, Optional, Dict, Any from qdrant_client.http import models as rest from qdrant_client import QdrantClient # WP-24c: Import der zentralen Identitäts-Logik zur Vermeidung von ID-Drift from app.core.graph.graph_utils import _mk_edge_id # --------------------- ID helpers --------------------- def _to_uuid(stable_key: str) -> str: """ Erzeugt eine deterministische UUIDv5 basierend auf einem stabilen Schlüssel. Härtung v1.5.2: Guard gegen leere Schlüssel zur Vermeidung von Pydantic-Fehlern. """ if not stable_key: raise ValueError("UUID generation failed: stable_key is empty or None") return str(uuid.uuid5(uuid.NAMESPACE_URL, str(stable_key))) def _names(prefix: str) -> Tuple[str, str, str]: """Interne Auflösung der Collection-Namen basierend auf dem Präfix.""" return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" # --------------------- Points builders --------------------- def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]: """Konvertiert Note-Metadaten in Qdrant Points.""" notes_col, _, _ = _names(prefix) # Nutzt Null-Vektor als Fallback, falls kein Embedding vorhanden ist vector = note_vec if note_vec is not None else [0.0] * int(dim) raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" point_id = _to_uuid(raw_note_id) pt = rest.PointStruct( id=point_id, vector=vector, payload=note_payload ) return notes_col, [pt] def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]: """Konvertiert Chunks und deren Vektoren in Qdrant Points.""" _, chunks_col, _ = _names(prefix) points: List[rest.PointStruct] = [] for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): chunk_id = pl.get("chunk_id") or pl.get("id") if not chunk_id: note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" chunk_id = f"{note_id}#{i}" pl["chunk_id"] = chunk_id point_id = _to_uuid(chunk_id) points.append(rest.PointStruct( id=point_id, vector=vec, payload=pl )) return chunks_col, points def _normalize_edge_payload(pl: dict) -> dict: """Normalisiert Edge-Felder und sichert Schema-Konformität.""" kind = pl.get("kind") or pl.get("edge_type") or "edge" source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src" target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt" seq = pl.get("seq") or pl.get("order") or pl.get("index") # WP-Fix: target_section explizit durchreichen target_section = pl.get("target_section") pl.setdefault("kind", kind) pl.setdefault("source_id", source_id) pl.setdefault("target_id", target_id) if seq is not None and "seq" not in pl: pl["seq"] = seq if target_section is not None: pl["target_section"] = target_section return pl def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: """ Konvertiert Kanten-Payloads in PointStructs. WP-24c v4.0.0: Nutzt die zentrale _mk_edge_id Funktion aus graph_utils. Dies eliminiert den ID-Drift zwischen manuellen und virtuellen Kanten. GOLD-STANDARD v4.0.0: Die ID-Generierung verwendet STRICT nur die 4 Parameter (kind, source_id, target_id, scope). rule_id und variant werden ignoriert. """ _, _, edges_col = _names(prefix) points: List[rest.PointStruct] = [] for raw in edge_payloads: pl = _normalize_edge_payload(raw) # Extraktion der Identitäts-Parameter (GOLD-STANDARD v4.0.0: nur 4 Parameter) kind = pl.get("kind", "edge") s = pl.get("source_id", "unknown-src") t = pl.get("target_id", "unknown-tgt") scope = pl.get("scope", "note") # Hinweis: rule_id und variant werden im Payload gespeichert, # fließen aber NICHT in die ID-Generierung ein (v4.0.0 Standard) try: # Aufruf der Single-Source-of-Truth für IDs # GOLD-STANDARD v4.0.0: Nur 4 Parameter werden verwendet point_id = _mk_edge_id( kind=kind, s=s, t=t, scope=scope ) # Synchronisierung des Payloads mit der berechneten ID pl["edge_id"] = point_id points.append(rest.PointStruct( id=point_id, vector=[0.0], payload=pl )) except ValueError as e: # Fehlerhaft definierte Kanten werden übersprungen, um Pydantic-Crashes zu vermeiden continue return edges_col, points # --------------------- Vector schema & overrides --------------------- def _preferred_name(candidates: List[str]) -> str: """Ermittelt den primären Vektor-Namen aus einer Liste von Kandidaten.""" for k in ("text", "default", "embedding", "content"): if k in candidates: return k return sorted(candidates)[0] def _env_override_for_collection(collection: str) -> Optional[str]: """ Prüft auf Umgebungsvariablen-Overrides für Vektor-Namen. Returns: - "__single__" für erzwungenen Single-Vector Modus - Name (str) für spezifischen Named-Vector - None für automatische Erkennung """ base = os.getenv("MINDNET_VECTOR_NAME") if collection.endswith("_notes"): base = os.getenv("NOTES_VECTOR_NAME", base) elif collection.endswith("_chunks"): base = os.getenv("CHUNKS_VECTOR_NAME", base) elif collection.endswith("_edges"): base = os.getenv("EDGES_VECTOR_NAME", base) if not base: return None val = base.strip() if val.lower() in ("__single__", "single"): return "__single__" return val def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict: """Ermittelt das Vektor-Schema einer existierenden Collection via API.""" try: info = client.get_collection(collection_name=collection_name) vecs = getattr(info, "vectors", None) # Prüfung auf Single-Vector Konfiguration if hasattr(vecs, "size") and isinstance(vecs.size, int): return {"kind": "single", "size": vecs.size} # Prüfung auf Named-Vectors Konfiguration cfg = getattr(vecs, "config", None) if isinstance(cfg, dict) and cfg: names = list(cfg.keys()) if names: return {"kind": "named", "names": names, "primary": _preferred_name(names)} except Exception: pass return {"kind": "single", "size": None} def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruct]: """Transformiert PointStructs in das Named-Vector Format.""" out: List[rest.PointStruct] = [] for pt in points: vec = getattr(pt, "vector", None) if isinstance(vec, dict): if name in vec: out.append(pt) else: fallback_vec = None try: fallback_vec = list(next(iter(vec.values()))) except Exception: fallback_vec = [0.0] out.append(rest.PointStruct(id=pt.id, vector={name: fallback_vec}, payload=pt.payload)) elif vec is not None: out.append(rest.PointStruct(id=pt.id, vector={name: vec}, payload=pt.payload)) else: out.append(pt) return out # --------------------- Qdrant ops --------------------- def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct], wait: bool = True) -> None: """ Schreibt Points hocheffizient in eine Collection. Unterstützt automatische Schema-Erkennung und Named-Vector Transformation. WP-Fix: 'wait=True' ist Default für Datenkonsistenz zwischen den Ingest-Phasen. """ if not points: return # 1) ENV overrides prüfen override = _env_override_for_collection(collection) if override == "__single__": client.upsert(collection_name=collection, points=points, wait=wait) return elif isinstance(override, str): client.upsert(collection_name=collection, points=_as_named(points, override), wait=wait) return # 2) Automatische Schema-Erkennung (Live-Check) schema = _get_vector_schema(client, collection) if schema.get("kind") == "named": name = schema.get("primary") or _preferred_name(schema.get("names") or []) client.upsert(collection_name=collection, points=_as_named(points, name), wait=wait) return # 3) Fallback: Single-Vector Upsert client.upsert(collection_name=collection, points=points, wait=wait) # --- Optional search helpers --- def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: """Hilfsfunktion für händische Filter-Konstruktion (Logical OR).""" return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values]) def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: """Führt mehrere Filter-Objekte zu einem konsolidierten Filter zusammen.""" fs = [f for f in filters if f is not None] if not fs: return None if len(fs) == 1: return fs[0] must = [] for f in fs: if getattr(f, "must", None): must.extend(f.must) if getattr(f, "should", None): must.append(rest.Filter(should=f.should)) return rest.Filter(must=must) def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: """Konvertiert ein Python-Dict in ein Qdrant-Filter Objekt.""" if not filters: return None parts = [] for k, v in filters.items(): if isinstance(v, (list, tuple, set)): parts.append(_filter_any(k, [str(x) for x in v])) else: parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))])) return _merge_filters(*parts) def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]: """Sucht semantisch ähnliche Chunks in der Vektordatenbank.""" _, chunks_col, _ = _names(prefix) flt = _filter_from_dict(filters) res = client.search( collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt ) out: List[Tuple[str, float, dict]] = [] for r in res: out.append((str(r.id), float(r.score), dict(r.payload or {}))) return out # --- Edge retrieval helper --- def get_edges_for_sources( client: QdrantClient, prefix: str, source_ids: Iterable[str], edge_types: Optional[Iterable[str]] = None, limit: int = 2048, ) -> List[Dict[str, Any]]: """Ruft alle Kanten ab, die von einer Menge von Quell-Notizen ausgehen.""" source_ids = list(source_ids) if not source_ids or limit <= 0: return [] # Namen der Edges-Collection auflösen _, _, edges_col = _names(prefix) # Filter-Bau: source_id IN source_ids src_filter = _filter_any("source_id", [str(s) for s in source_ids]) # Optionaler Filter auf den Kanten-Typ kind_filter = None if edge_types: kind_filter = _filter_any("kind", [str(k) for k in edge_types]) flt = _merge_filters(src_filter, kind_filter) out: List[Dict[str, Any]] = [] next_page = None remaining = int(limit) # Paginated Scroll API (NUR Payload, keine Vektoren) while remaining > 0: batch_limit = min(256, remaining) res, next_page = client.scroll( collection_name=edges_col, scroll_filter=flt, limit=batch_limit, with_payload=True, with_vectors=False, offset=next_page, ) if not res: break for r in res: out.append(dict(r.payload or {})) remaining -= 1 if remaining <= 0: break if next_page is None or remaining <= 0: break return out