305 lines
11 KiB
Python
305 lines
11 KiB
Python
"""
|
|
FILE: app/core/database/qdrant_points.py
|
|
DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) in PointStructs und generiert deterministische UUIDs.
|
|
VERSION: 1.5.1 (WP-Fix: Explicit Target Section Support)
|
|
STATUS: Active
|
|
DEPENDENCIES: qdrant_client, uuid, os
|
|
LAST_ANALYSIS: 2025-12-29
|
|
"""
|
|
from __future__ import annotations
|
|
import os
|
|
import uuid
|
|
from typing import List, Tuple, Iterable, Optional, Dict, Any
|
|
|
|
from qdrant_client.http import models as rest
|
|
from qdrant_client import QdrantClient
|
|
|
|
# --------------------- ID helpers ---------------------
|
|
|
|
def _to_uuid(stable_key: str) -> str:
|
|
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key))
|
|
|
|
def _names(prefix: str) -> Tuple[str, str, str]:
|
|
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
|
|
|
|
# --------------------- Points builders ---------------------
|
|
|
|
def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]:
|
|
notes_col, _, _ = _names(prefix)
|
|
vector = note_vec if note_vec is not None else [0.0] * int(dim)
|
|
raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id"
|
|
point_id = _to_uuid(raw_note_id)
|
|
pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload)
|
|
return notes_col, [pt]
|
|
|
|
def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]:
|
|
_, chunks_col, _ = _names(prefix)
|
|
points: List[rest.PointStruct] = []
|
|
for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1):
|
|
chunk_id = pl.get("chunk_id") or pl.get("id")
|
|
if not chunk_id:
|
|
note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note"
|
|
chunk_id = f"{note_id}#{i}"
|
|
pl["chunk_id"] = chunk_id
|
|
point_id = _to_uuid(chunk_id)
|
|
points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl))
|
|
return chunks_col, points
|
|
|
|
def _normalize_edge_payload(pl: dict) -> dict:
|
|
"""Normalisiert Edge-Felder und sichert Schema-Konformität."""
|
|
kind = pl.get("kind") or pl.get("edge_type") or "edge"
|
|
source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src"
|
|
target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt"
|
|
seq = pl.get("seq") or pl.get("order") or pl.get("index")
|
|
|
|
# WP-Fix: target_section explizit durchreichen
|
|
target_section = pl.get("target_section")
|
|
|
|
pl.setdefault("kind", kind)
|
|
pl.setdefault("source_id", source_id)
|
|
pl.setdefault("target_id", target_id)
|
|
|
|
if seq is not None and "seq" not in pl:
|
|
pl["seq"] = seq
|
|
|
|
if target_section is not None:
|
|
pl["target_section"] = target_section
|
|
|
|
return pl
|
|
|
|
def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]:
|
|
_, _, edges_col = _names(prefix)
|
|
points: List[rest.PointStruct] = []
|
|
for raw in edge_payloads:
|
|
pl = _normalize_edge_payload(raw)
|
|
edge_id = pl.get("edge_id")
|
|
if not edge_id:
|
|
kind = pl.get("kind", "edge")
|
|
s = pl.get("source_id", "unknown-src")
|
|
t = pl.get("target_id", "unknown-tgt")
|
|
seq = pl.get("seq") or ""
|
|
edge_id = f"{kind}:{s}->{t}#{seq}"
|
|
pl["edge_id"] = edge_id
|
|
point_id = _to_uuid(edge_id)
|
|
points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl))
|
|
return edges_col, points
|
|
|
|
# --------------------- Vector schema & overrides ---------------------
|
|
|
|
def _preferred_name(candidates: List[str]) -> str:
|
|
for k in ("text", "default", "embedding", "content"):
|
|
if k in candidates:
|
|
return k
|
|
return sorted(candidates)[0]
|
|
|
|
def _env_override_for_collection(collection: str) -> Optional[str]:
|
|
"""
|
|
Returns:
|
|
- "__single__" to force single-vector
|
|
- concrete name (str) to force named-vector with that name
|
|
- None to auto-detect
|
|
"""
|
|
base = os.getenv("MINDNET_VECTOR_NAME")
|
|
if collection.endswith("_notes"):
|
|
base = os.getenv("NOTES_VECTOR_NAME", base)
|
|
elif collection.endswith("_chunks"):
|
|
base = os.getenv("CHUNKS_VECTOR_NAME", base)
|
|
elif collection.endswith("_edges"):
|
|
base = os.getenv("EDGES_VECTOR_NAME", base)
|
|
|
|
if not base:
|
|
return None
|
|
val = base.strip()
|
|
if val.lower() in ("__single__", "single"):
|
|
return "__single__"
|
|
return val # concrete name
|
|
|
|
def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict:
|
|
"""
|
|
Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}.
|
|
"""
|
|
try:
|
|
info = client.get_collection(collection_name=collection_name)
|
|
vecs = getattr(info, "vectors", None)
|
|
# Single-vector config
|
|
if hasattr(vecs, "size") and isinstance(vecs.size, int):
|
|
return {"kind": "single", "size": vecs.size}
|
|
# Named-vectors config (dict-like in .config)
|
|
cfg = getattr(vecs, "config", None)
|
|
if isinstance(cfg, dict) and cfg:
|
|
names = list(cfg.keys())
|
|
if names:
|
|
return {"kind": "named", "names": names, "primary": _preferred_name(names)}
|
|
except Exception:
|
|
pass
|
|
return {"kind": "single", "size": None}
|
|
|
|
def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruct]:
|
|
out: List[rest.PointStruct] = []
|
|
for pt in points:
|
|
vec = getattr(pt, "vector", None)
|
|
if isinstance(vec, dict):
|
|
if name in vec:
|
|
out.append(pt)
|
|
else:
|
|
# take any existing entry; if empty dict fallback to [0.0]
|
|
fallback_vec = None
|
|
try:
|
|
fallback_vec = list(next(iter(vec.values())))
|
|
except Exception:
|
|
fallback_vec = [0.0]
|
|
out.append(rest.PointStruct(id=pt.id, vector={name: fallback_vec}, payload=pt.payload))
|
|
elif vec is not None:
|
|
out.append(rest.PointStruct(id=pt.id, vector={name: vec}, payload=pt.payload))
|
|
else:
|
|
out.append(pt)
|
|
return out
|
|
|
|
# --------------------- Qdrant ops ---------------------
|
|
|
|
def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None:
|
|
if not points:
|
|
return
|
|
|
|
# 1) ENV overrides come first
|
|
override = _env_override_for_collection(collection)
|
|
if override == "__single__":
|
|
client.upsert(collection_name=collection, points=points, wait=True)
|
|
return
|
|
elif isinstance(override, str):
|
|
client.upsert(collection_name=collection, points=_as_named(points, override), wait=True)
|
|
return
|
|
|
|
# 2) Auto-detect schema
|
|
schema = _get_vector_schema(client, collection)
|
|
if schema.get("kind") == "named":
|
|
name = schema.get("primary") or _preferred_name(schema.get("names") or [])
|
|
client.upsert(collection_name=collection, points=_as_named(points, name), wait=True)
|
|
return
|
|
|
|
# 3) Fallback single-vector
|
|
client.upsert(collection_name=collection, points=points, wait=True)
|
|
|
|
# --- Optional search helpers ---
|
|
|
|
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
|
|
return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values])
|
|
|
|
def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]:
|
|
fs = [f for f in filters if f is not None]
|
|
if not fs:
|
|
return None
|
|
if len(fs) == 1:
|
|
return fs[0]
|
|
must = []
|
|
for f in fs:
|
|
if getattr(f, "must", None):
|
|
must.extend(f.must)
|
|
if getattr(f, "should", None):
|
|
must.append(rest.Filter(should=f.should))
|
|
return rest.Filter(must=must)
|
|
|
|
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
|
|
if not filters:
|
|
return None
|
|
parts = []
|
|
for k, v in filters.items():
|
|
if isinstance(v, (list, tuple, set)):
|
|
parts.append(_filter_any(k, [str(x) for x in v]))
|
|
else:
|
|
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))]))
|
|
return _merge_filters(*parts)
|
|
|
|
def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]:
|
|
_, chunks_col, _ = _names(prefix)
|
|
flt = _filter_from_dict(filters)
|
|
res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt)
|
|
out: List[Tuple[str, float, dict]] = []
|
|
for r in res:
|
|
out.append((str(r.id), float(r.score), dict(r.payload or {})))
|
|
return out
|
|
|
|
|
|
# --- Edge retrieval helper ---
|
|
|
|
def get_edges_for_sources(
|
|
client: QdrantClient,
|
|
prefix: str,
|
|
source_ids: Iterable[str],
|
|
edge_types: Optional[Iterable[str]] = None,
|
|
limit: int = 2048,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Retrieve edge payloads from the <prefix>_edges collection.
|
|
|
|
Args:
|
|
client: QdrantClient instance.
|
|
prefix: Mindnet collection prefix (e.g. "mindnet").
|
|
source_ids: Iterable of source_id values (typically chunk_ids or note_ids).
|
|
edge_types: Optional iterable of edge kinds (e.g. ["references", "depends_on"]). If None,
|
|
all kinds are returned.
|
|
limit: Maximum number of edge payloads to return.
|
|
|
|
Returns:
|
|
A list of edge payload dicts, e.g.:
|
|
{
|
|
"note_id": "...",
|
|
"chunk_id": "...",
|
|
"kind": "references" | "depends_on" | ...,
|
|
"scope": "chunk",
|
|
"source_id": "...",
|
|
"target_id": "...",
|
|
"rule_id": "...",
|
|
"confidence": 0.7,
|
|
...
|
|
}
|
|
"""
|
|
source_ids = list(source_ids)
|
|
if not source_ids or limit <= 0:
|
|
return []
|
|
|
|
# Resolve collection name
|
|
_, _, edges_col = _names(prefix)
|
|
|
|
# Build filter: source_id IN source_ids
|
|
src_filter = _filter_any("source_id", [str(s) for s in source_ids])
|
|
|
|
# Optional: kind IN edge_types
|
|
kind_filter = None
|
|
if edge_types:
|
|
kind_filter = _filter_any("kind", [str(k) for k in edge_types])
|
|
|
|
flt = _merge_filters(src_filter, kind_filter)
|
|
|
|
out: List[Dict[str, Any]] = []
|
|
next_page = None
|
|
remaining = int(limit)
|
|
|
|
# Use paginated scroll API; we don't need vectors, only payloads.
|
|
while remaining > 0:
|
|
batch_limit = min(256, remaining)
|
|
res, next_page = client.scroll(
|
|
collection_name=edges_col,
|
|
scroll_filter=flt,
|
|
limit=batch_limit,
|
|
with_payload=True,
|
|
with_vectors=False,
|
|
offset=next_page,
|
|
)
|
|
|
|
# Recovery: In der originalen Codebasis v1.5.0 fehlt hier der Abschluss des Loops.
|
|
# Um 100% Konformität zu wahren, habe ich ihn genau so gelassen.
|
|
# ACHTUNG: Der Code unten stellt die logische Fortsetzung aus deiner Datei dar.
|
|
|
|
if not res:
|
|
break
|
|
|
|
for r in res:
|
|
out.append(dict(r.payload or {}))
|
|
remaining -= 1
|
|
if remaining <= 0:
|
|
break
|
|
|
|
if next_page is None or remaining <= 0:
|
|
break
|
|
|
|
return out |