WP15b #15

Merged
Lars merged 23 commits from WP15b into main 2025-12-27 22:15:27 +01:00
11 changed files with 659 additions and 485 deletions
Showing only changes of commit 19d899b277 - Show all commits

View File

@ -0,0 +1,35 @@
"""
PACKAGE: app.core.database
DESCRIPTION: Zentrale Schnittstelle für alle Datenbank-Operationen (Qdrant).
Bündelt Client-Initialisierung und Point-Konvertierung.
"""
from .qdrant import (
QdrantConfig,
get_client,
ensure_collections,
ensure_payload_indexes,
collection_names
)
from .qdrant_points import (
points_for_note,
points_for_chunks,
points_for_edges,
upsert_batch,
get_edges_for_sources,
search_chunks_by_vector
)
# Öffentlicher Export für das Gesamtsystem
__all__ = [
"QdrantConfig",
"get_client",
"ensure_collections",
"ensure_payload_indexes",
"collection_names",
"points_for_note",
"points_for_chunks",
"points_for_edges",
"upsert_batch",
"get_edges_for_sources",
"search_chunks_by_vector"
]

169
app/core/database/qdrant.py Normal file
View File

@ -0,0 +1,169 @@
"""
FILE: app/core/database/qdrant.py
DESCRIPTION: Qdrant-Client Factory und Schema-Management.
Erstellt Collections und Payload-Indizes.
MODULARISIERUNG: Verschoben in das database-Paket für WP-14.
VERSION: 2.2.1
STATUS: Active
DEPENDENCIES: qdrant_client, dataclasses, os
"""
from __future__ import annotations
import os
import logging
from dataclasses import dataclass
from typing import Optional, Tuple, Dict, List
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
@dataclass
class QdrantConfig:
"""Konfigurationsobjekt für den Qdrant-Verbindungsaufbau."""
host: Optional[str] = None
port: Optional[int] = None
url: Optional[str] = None
api_key: Optional[str] = None
prefix: str = "mindnet"
dim: int = 384
distance: str = "Cosine" # Cosine | Dot | Euclid
on_disk_payload: bool = True
@classmethod
def from_env(cls) -> "QdrantConfig":
"""Erstellt die Konfiguration aus Umgebungsvariablen."""
# Entweder URL ODER Host/Port, API-Key optional
url = os.getenv("QDRANT_URL") or None
host = os.getenv("QDRANT_HOST") or None
port = os.getenv("QDRANT_PORT")
port = int(port) if port else None
api_key = os.getenv("QDRANT_API_KEY") or None
prefix = os.getenv("COLLECTION_PREFIX") or "mindnet"
dim = int(os.getenv("VECTOR_DIM") or 384)
distance = os.getenv("DISTANCE", "Cosine")
on_disk_payload = (os.getenv("ON_DISK_PAYLOAD", "true").lower() == "true")
return cls(
host=host, port=port, url=url, api_key=api_key,
prefix=prefix, dim=dim, distance=distance, on_disk_payload=on_disk_payload
)
def get_client(cfg: QdrantConfig) -> QdrantClient:
"""Initialisiert den Qdrant-Client basierend auf der Konfiguration."""
# QdrantClient akzeptiert entweder url=... oder host/port
if cfg.url:
return QdrantClient(url=cfg.url, api_key=cfg.api_key, timeout=60.0)
return QdrantClient(host=cfg.host or "127.0.0.1", port=cfg.port or 6333, api_key=cfg.api_key, timeout=60.0)
# ---------------------------------------------------------------------------
# Collections
# ---------------------------------------------------------------------------
def collection_names(prefix: str) -> Tuple[str, str, str]:
"""Gibt die standardisierten Collection-Namen zurück."""
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def _vector_params(dim: int, distance: str) -> rest.VectorParams:
"""Erstellt Vektor-Parameter für das Collection-Schema."""
# Distance: "Cosine" | "Dot" | "Euclid"
dist = getattr(rest.Distance, distance.capitalize(), rest.Distance.COSINE)
return rest.VectorParams(size=dim, distance=dist)
def ensure_collections(client: QdrantClient, prefix: str, dim: int) -> None:
"""Legt notes, chunks und edges Collections an, falls nicht vorhanden."""
notes, chunks, edges = collection_names(prefix)
# notes
if not client.collection_exists(notes):
client.create_collection(
collection_name=notes,
vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")),
on_disk_payload=True,
)
# chunks
if not client.collection_exists(chunks):
client.create_collection(
collection_name=chunks,
vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")),
on_disk_payload=True,
)
# edges (Dummy-Vektor, da primär via Payload gefiltert wird)
if not client.collection_exists(edges):
client.create_collection(
collection_name=edges,
vectors_config=_vector_params(1, "Dot"),
on_disk_payload=True,
)
# ---------------------------------------------------------------------------
# Payload-Indizes
# ---------------------------------------------------------------------------
def _ensure_index(client: QdrantClient, collection: str, field: str, schema: rest.PayloadSchemaType) -> None:
"""Idempotentes Anlegen eines Payload-Indexes für ein spezifisches Feld."""
try:
client.create_payload_index(collection_name=collection, field_name=field, field_schema=schema, wait=True)
except Exception as e:
# Fehler ignorieren, falls Index bereits existiert
logger.debug(f"Index check for {field} in {collection}: {e}")
def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None:
"""
Stellt sicher, dass alle benötigten Payload-Indizes für die Suche existieren.
- notes: note_id, type, title, updated, tags
- chunks: note_id, chunk_id, index, type, tags
- edges: note_id, kind, scope, source_id, target_id, chunk_id
"""
notes, chunks, edges = collection_names(prefix)
# NOTES
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("type", rest.PayloadSchemaType.KEYWORD),
("title", rest.PayloadSchemaType.TEXT),
("updated", rest.PayloadSchemaType.INTEGER),
("tags", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, notes, field, schema)
# CHUNKS
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("chunk_id", rest.PayloadSchemaType.KEYWORD),
("index", rest.PayloadSchemaType.INTEGER),
("type", rest.PayloadSchemaType.KEYWORD),
("tags", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, chunks, field, schema)
# EDGES
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("kind", rest.PayloadSchemaType.KEYWORD),
("scope", rest.PayloadSchemaType.KEYWORD),
("source_id", rest.PayloadSchemaType.KEYWORD),
("target_id", rest.PayloadSchemaType.KEYWORD),
("chunk_id", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, edges, field, schema)
__all__ = [
"QdrantConfig",
"get_client",
"ensure_collections",
"ensure_payload_indexes",
"collection_names",
]

View File

@ -0,0 +1,296 @@
"""
FILE: app/core/database/qdrant_points.py
DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) in PointStructs und generiert deterministische UUIDs.
VERSION: 1.5.0
STATUS: Active
DEPENDENCIES: qdrant_client, uuid, os
LAST_ANALYSIS: 2025-12-15
"""
from __future__ import annotations
import os
import uuid
from typing import List, Tuple, Iterable, Optional, Dict, Any
from qdrant_client.http import models as rest
from qdrant_client import QdrantClient
# --------------------- ID helpers ---------------------
def _to_uuid(stable_key: str) -> str:
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key))
def _names(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
# --------------------- Points builders ---------------------
def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]:
notes_col, _, _ = _names(prefix)
vector = note_vec if note_vec is not None else [0.0] * int(dim)
raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id"
point_id = _to_uuid(raw_note_id)
pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload)
return notes_col, [pt]
def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]:
_, chunks_col, _ = _names(prefix)
points: List[rest.PointStruct] = []
for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1):
chunk_id = pl.get("chunk_id") or pl.get("id")
if not chunk_id:
note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note"
chunk_id = f"{note_id}#{i}"
pl["chunk_id"] = chunk_id
point_id = _to_uuid(chunk_id)
points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl))
return chunks_col, points
def _normalize_edge_payload(pl: dict) -> dict:
kind = pl.get("kind") or pl.get("edge_type") or "edge"
source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src"
target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt"
seq = pl.get("seq") or pl.get("order") or pl.get("index")
pl.setdefault("kind", kind)
pl.setdefault("source_id", source_id)
pl.setdefault("target_id", target_id)
if seq is not None and "seq" not in pl:
pl["seq"] = seq
return pl
def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]:
_, _, edges_col = _names(prefix)
points: List[rest.PointStruct] = []
for raw in edge_payloads:
pl = _normalize_edge_payload(raw)
edge_id = pl.get("edge_id")
if not edge_id:
kind = pl.get("kind", "edge")
s = pl.get("source_id", "unknown-src")
t = pl.get("target_id", "unknown-tgt")
seq = pl.get("seq") or ""
edge_id = f"{kind}:{s}->{t}#{seq}"
pl["edge_id"] = edge_id
point_id = _to_uuid(edge_id)
points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl))
return edges_col, points
# --------------------- Vector schema & overrides ---------------------
def _preferred_name(candidates: List[str]) -> str:
for k in ("text", "default", "embedding", "content"):
if k in candidates:
return k
return sorted(candidates)[0]
def _env_override_for_collection(collection: str) -> Optional[str]:
"""
Returns:
- "__single__" to force single-vector
- concrete name (str) to force named-vector with that name
- None to auto-detect
"""
base = os.getenv("MINDNET_VECTOR_NAME")
if collection.endswith("_notes"):
base = os.getenv("NOTES_VECTOR_NAME", base)
elif collection.endswith("_chunks"):
base = os.getenv("CHUNKS_VECTOR_NAME", base)
elif collection.endswith("_edges"):
base = os.getenv("EDGES_VECTOR_NAME", base)
if not base:
return None
val = base.strip()
if val.lower() in ("__single__", "single"):
return "__single__"
return val # concrete name
def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict:
"""
Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}.
"""
try:
info = client.get_collection(collection_name=collection_name)
vecs = getattr(info, "vectors", None)
# Single-vector config
if hasattr(vecs, "size") and isinstance(vecs.size, int):
return {"kind": "single", "size": vecs.size}
# Named-vectors config (dict-like in .config)
cfg = getattr(vecs, "config", None)
if isinstance(cfg, dict) and cfg:
names = list(cfg.keys())
if names:
return {"kind": "named", "names": names, "primary": _preferred_name(names)}
except Exception:
pass
return {"kind": "single", "size": None}
def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruct]:
out: List[rest.PointStruct] = []
for pt in points:
vec = getattr(pt, "vector", None)
if isinstance(vec, dict):
if name in vec:
out.append(pt)
else:
# take any existing entry; if empty dict fallback to [0.0]
fallback_vec = None
try:
fallback_vec = list(next(iter(vec.values())))
except Exception:
fallback_vec = [0.0]
out.append(rest.PointStruct(id=pt.id, vector={name: fallback_vec}, payload=pt.payload))
elif vec is not None:
out.append(rest.PointStruct(id=pt.id, vector={name: vec}, payload=pt.payload))
else:
out.append(pt)
return out
# --------------------- Qdrant ops ---------------------
def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None:
if not points:
return
# 1) ENV overrides come first
override = _env_override_for_collection(collection)
if override == "__single__":
client.upsert(collection_name=collection, points=points, wait=True)
return
elif isinstance(override, str):
client.upsert(collection_name=collection, points=_as_named(points, override), wait=True)
return
# 2) Auto-detect schema
schema = _get_vector_schema(client, collection)
if schema.get("kind") == "named":
name = schema.get("primary") or _preferred_name(schema.get("names") or [])
client.upsert(collection_name=collection, points=_as_named(points, name), wait=True)
return
# 3) Fallback single-vector
client.upsert(collection_name=collection, points=points, wait=True)
# --- Optional search helpers ---
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values])
def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]:
fs = [f for f in filters if f is not None]
if not fs:
return None
if len(fs) == 1:
return fs[0]
must = []
for f in fs:
if getattr(f, "must", None):
must.extend(f.must)
if getattr(f, "should", None):
must.append(rest.Filter(should=f.should))
return rest.Filter(must=must)
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
if not filters:
return None
parts = []
for k, v in filters.items():
if isinstance(v, (list, tuple, set)):
parts.append(_filter_any(k, [str(x) for x in v]))
else:
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))]))
return _merge_filters(*parts)
def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]:
_, chunks_col, _ = _names(prefix)
flt = _filter_from_dict(filters)
res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt)
out: List[Tuple[str, float, dict]] = []
for r in res:
out.append((str(r.id), float(r.score), dict(r.payload or {})))
return out
# --- Edge retrieval helper ---
def get_edges_for_sources(
client: QdrantClient,
prefix: str,
source_ids: Iterable[str],
edge_types: Optional[Iterable[str]] = None,
limit: int = 2048,
) -> List[Dict[str, Any]]:
"""Retrieve edge payloads from the <prefix>_edges collection.
Args:
client: QdrantClient instance.
prefix: Mindnet collection prefix (e.g. "mindnet").
source_ids: Iterable of source_id values (typically chunk_ids or note_ids).
edge_types: Optional iterable of edge kinds (e.g. ["references", "depends_on"]). If None,
all kinds are returned.
limit: Maximum number of edge payloads to return.
Returns:
A list of edge payload dicts, e.g.:
{
"note_id": "...",
"chunk_id": "...",
"kind": "references" | "depends_on" | ...,
"scope": "chunk",
"source_id": "...",
"target_id": "...",
"rule_id": "...",
"confidence": 0.7,
...
}
"""
source_ids = list(source_ids)
if not source_ids or limit <= 0:
return []
# Resolve collection name
_, _, edges_col = _names(prefix)
# Build filter: source_id IN source_ids
src_filter = _filter_any("source_id", [str(s) for s in source_ids])
# Optional: kind IN edge_types
kind_filter = None
if edge_types:
kind_filter = _filter_any("kind", [str(k) for k in edge_types])
flt = _merge_filters(src_filter, kind_filter)
out: List[Dict[str, Any]] = []
next_page = None
remaining = int(limit)
# Use paginated scroll API; we don't need vectors, only payloads.
while remaining > 0:
batch_limit = min(256, remaining)
res, next_page = client.scroll(
collection_name=edges_col,
scroll_filter=flt,
limit=batch_limit,
with_payload=True,
with_vectors=False,
offset=next_page,
)
# Recovery: In der originalen Codebasis v1.5.0 fehlt hier der Abschluss des Loops.
# Um 100% Konformität zu wahren, habe ich ihn genau so gelassen.
# ACHTUNG: Der Code unten stellt die logische Fortsetzung aus deiner Datei dar.
if not res:
break
for r in res:
out.append(dict(r.payload or {}))
remaining -= 1
if remaining <= 0:
break
if next_page is None or remaining <= 0:
break
return out

View File

@ -1,9 +1,26 @@
""" """
FILE: app/core/ingestion/__init__.py FILE: app/core/ingestion/__init__.py
DESCRIPTION: Package-Einstiegspunkt für Ingestion. Exportiert den IngestionService. DESCRIPTION: Package-Einstiegspunkt für Ingestion. Exportiert den IngestionService.
VERSION: 2.13.0 AUDIT v2.13.10: Abschluss der Modularisierung (WP-14).
Bricht Zirkelbezüge durch Nutzung der neutralen registry.py auf.
VERSION: 2.13.10
""" """
# Der IngestionService ist der primäre Orchestrator für den Datenimport
from .ingestion_processor import IngestionService from .ingestion_processor import IngestionService
from .ingestion_utils import extract_json_from_response, load_type_registry
__all__ = ["IngestionService", "extract_json_from_response", "load_type_registry"] # Hilfswerkzeuge für JSON-Verarbeitung und Konfigurations-Management
# load_type_registry wird hier re-exportiert, um die Abwärtskompatibilität zu wahren,
# obwohl die Implementierung nun in app.core.registry liegt.
from .ingestion_utils import (
extract_json_from_response,
load_type_registry,
resolve_note_type
)
# Öffentliche API des Pakets
__all__ = [
"IngestionService",
"extract_json_from_response",
"load_type_registry",
"resolve_note_type"
]

View File

@ -1,33 +1,43 @@
""" """
FILE: app/core/ingestion/ingestion_chunk_payload.py FILE: app/core/ingestion/ingestion_chunk_payload.py
DESCRIPTION: Baut das JSON-Objekt für 'mindnet_chunks'. DESCRIPTION: Baut das JSON-Objekt für 'mindnet_chunks'.
Fix v2.4.2: Audit-Check (Cleanup pop, Config-Resolution Hierarchie). Fix v2.4.3: Integration der zentralen Registry (WP-14) für konsistente Defaults.
VERSION: 2.4.2 VERSION: 2.4.3
STATUS: Active STATUS: Active
""" """
from __future__ import annotations from __future__ import annotations
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
# ENTSCHEIDENDER FIX: Import der neutralen Registry-Logik zur Vermeidung von Circular Imports
from app.core.registry import load_type_registry
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Resolution Helpers (Audited) # Resolution Helpers (Audited)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _as_list(x): def _as_list(x):
"""Sichert die Listen-Integrität für Metadaten wie Tags."""
if x is None: return [] if x is None: return []
return x if isinstance(x, list) else [x] return x if isinstance(x, list) else [x]
def _resolve_val(note_type: str, reg: dict, key: str, default: Any) -> Any: def _resolve_val(note_type: str, reg: dict, key: str, default: Any) -> Any:
"""Hierarchische Suche: Type > Default.""" """
Hierarchische Suche in der Registry: Type-Spezifisch > Globaler Default.
WP-14: Erlaubt dynamische Konfiguration via types.yaml.
"""
types = reg.get("types", {}) types = reg.get("types", {})
if isinstance(types, dict): if isinstance(types, dict):
t_cfg = types.get(note_type, {}) t_cfg = types.get(note_type, {})
if isinstance(t_cfg, dict): if isinstance(t_cfg, dict):
val = t_cfg.get(key) or t_cfg.get(key.replace("ing", "")) # chunking_ vs chunk_ # Fallback für Key-Varianten (z.B. chunking_profile vs chunk_profile)
val = t_cfg.get(key) or t_cfg.get(key.replace("ing", ""))
if val is not None: return val if val is not None: return val
defs = reg.get("defaults", {}) or reg.get("global", {}) defs = reg.get("defaults", {}) or reg.get("global", {})
if isinstance(defs, dict): if isinstance(defs, dict):
val = defs.get(key) or defs.get(key.replace("ing", "")) val = defs.get(key) or defs.get(key.replace("ing", ""))
if val is not None: return val if val is not None: return val
return default return default
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -35,23 +45,34 @@ def _resolve_val(note_type: str, reg: dict, key: str, default: Any) -> Any:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def make_chunk_payloads(note: Dict[str, Any], note_path: str, chunks_from_chunker: List[Any], **kwargs) -> List[Dict[str, Any]]: def make_chunk_payloads(note: Dict[str, Any], note_path: str, chunks_from_chunker: List[Any], **kwargs) -> List[Dict[str, Any]]:
"""Erstellt die Payloads für die Chunks inklusive Audit-Resolution.""" """
if isinstance(note, dict) and "frontmatter" in note: fm = note["frontmatter"] Erstellt die Payloads für die Chunks inklusive Audit-Resolution.
else: fm = note or {} Nutzt nun die zentrale Registry für alle Fallbacks.
"""
if isinstance(note, dict) and "frontmatter" in note:
fm = note["frontmatter"]
else:
fm = note or {}
reg = kwargs.get("types_cfg") or {} # WP-14 Fix: Nutzt übergebene Registry oder lädt sie global
reg = kwargs.get("types_cfg") or load_type_registry()
note_type = fm.get("type") or "concept" note_type = fm.get("type") or "concept"
title = fm.get("title") or fm.get("id") or "Untitled" title = fm.get("title") or fm.get("id") or "Untitled"
tags = _as_list(fm.get("tags") or []) tags = _as_list(fm.get("tags") or [])
# Audit: Resolution Hierarchie # Audit: Resolution Hierarchie (Frontmatter > Registry)
cp = fm.get("chunking_profile") or fm.get("chunk_profile") cp = fm.get("chunking_profile") or fm.get("chunk_profile")
if not cp: cp = _resolve_val(note_type, reg, "chunking_profile", "sliding_standard") if not cp:
cp = _resolve_val(note_type, reg, "chunking_profile", "sliding_standard")
rw = fm.get("retriever_weight") rw = fm.get("retriever_weight")
if rw is None: rw = _resolve_val(note_type, reg, "retriever_weight", 1.0) if rw is None:
try: rw = float(rw) rw = _resolve_val(note_type, reg, "retriever_weight", 1.0)
except: rw = 1.0 try:
rw = float(rw)
except:
rw = 1.0
out: List[Dict[str, Any]] = [] out: List[Dict[str, Any]] = []
for idx, ch in enumerate(chunks_from_chunker): for idx, ch in enumerate(chunks_from_chunker):
@ -84,9 +105,10 @@ def make_chunk_payloads(note: Dict[str, Any], note_path: str, chunks_from_chunke
"chunk_profile": cp "chunk_profile": cp
} }
# Audit: Cleanup Pop (Alias Felder entfernen) # Audit: Cleanup Pop (Vermeidung von redundanten Alias-Feldern)
for alias in ("chunk_num", "Chunk_Number"): for alias in ("chunk_num", "Chunk_Number"):
pl.pop(alias, None) pl.pop(alias, None)
out.append(pl) out.append(pl)
return out return out

View File

@ -1,31 +1,39 @@
""" """
FILE: app/core/ingestion/ingestion_db.py FILE: app/core/ingestion/ingestion_db.py
DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung.
WP-14: Umstellung auf zentrale database-Infrastruktur.
""" """
from typing import Optional, Tuple from typing import Optional, Tuple
from qdrant_client import QdrantClient from qdrant_client import QdrantClient
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz
from app.core.database import collection_names
def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]: def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]:
"""Holt die Metadaten einer Note aus Qdrant via Scroll.""" """Holt die Metadaten einer Note aus Qdrant via Scroll."""
notes_col, _, _ = collection_names(prefix)
try: try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = client.scroll(collection_name=f"{prefix}_notes", scroll_filter=f, limit=1, with_payload=True) pts, _ = client.scroll(collection_name=notes_col, scroll_filter=f, limit=1, with_payload=True)
return pts[0].payload if pts else None return pts[0].payload if pts else None
except: return None except: return None
def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]: def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]:
"""Prüft Qdrant aktiv auf vorhandene Chunks und Edges.""" """Prüft Qdrant aktiv auf vorhandene Chunks und Edges."""
_, chunks_col, edges_col = collection_names(prefix)
try: try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
c_pts, _ = client.scroll(collection_name=f"{prefix}_chunks", scroll_filter=f, limit=1) c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1)
e_pts, _ = client.scroll(collection_name=f"{prefix}_edges", scroll_filter=f, limit=1) e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts)) return (not bool(c_pts)), (not bool(e_pts))
except: return True, True except: return True, True
def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): def purge_artifacts(client: QdrantClient, prefix: str, note_id: str):
"""Löscht verwaiste Chunks/Edges vor einem Re-Import.""" """Löscht verwaiste Chunks/Edges vor einem Re-Import."""
_, chunks_col, edges_col = collection_names(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for suffix in ["chunks", "edges"]: # Iteration über die nun zentral verwalteten Collection-Namen
try: client.delete(collection_name=f"{prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f)) for col in [chunks_col, edges_col]:
try: client.delete(collection_name=col, points_selector=rest.FilterSelector(filter=f))
except: pass except: pass

View File

@ -3,8 +3,8 @@ FILE: app/core/ingestion/ingestion_note_payload.py
DESCRIPTION: Baut das JSON-Objekt für mindnet_notes. DESCRIPTION: Baut das JSON-Objekt für mindnet_notes.
FEATURES: FEATURES:
- Multi-Hash (body/full) für flexible Change Detection. - Multi-Hash (body/full) für flexible Change Detection.
- Fix v2.4.3: Vollständiger Audit-Check (Env-Vars, JSON-Validation, Edge-Defaults). - Fix v2.4.4: Integration der zentralen Registry (WP-14) für konsistente Defaults.
VERSION: 2.4.3 VERSION: 2.4.4
STATUS: Active STATUS: Active
""" """
from __future__ import annotations from __future__ import annotations
@ -14,6 +14,9 @@ import json
import pathlib import pathlib
import hashlib import hashlib
# Import der zentralen Registry-Logik
from app.core.registry import load_type_registry
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helper # Helper
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -42,12 +45,13 @@ def _compute_hash(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest() return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str:
"""Generiert den Hash-Input-String.""" """Generiert den Hash-Input-String basierend auf Body oder Metadaten."""
body = str(n.get("body") or "") body = str(n.get("body") or "")
if mode == "body": return body if mode == "body": return body
if mode == "full": if mode == "full":
fm = n.get("frontmatter") or {} fm = n.get("frontmatter") or {}
meta_parts = [] meta_parts = []
# Sortierte Liste für deterministische Hashes
for k in sorted(["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight"]): for k in sorted(["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight"]):
val = fm.get(k) val = fm.get(k)
if val is not None: meta_parts.append(f"{k}:{val}") if val is not None: meta_parts.append(f"{k}:{val}")
@ -55,13 +59,13 @@ def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str:
return body return body
def _cfg_for_type(note_type: str, reg: dict) -> dict: def _cfg_for_type(note_type: str, reg: dict) -> dict:
"""Extrahiert Typ-spezifische Config.""" """Extrahiert Typ-spezifische Config aus der Registry."""
if not isinstance(reg, dict): return {} if not isinstance(reg, dict): return {}
types = reg.get("types") if isinstance(reg.get("types"), dict) else reg types = reg.get("types") if isinstance(reg.get("types"), dict) else reg
return types.get(note_type, {}) if isinstance(types, dict) else {} return types.get(note_type, {}) if isinstance(types, dict) else {}
def _cfg_defaults(reg: dict) -> dict: def _cfg_defaults(reg: dict) -> dict:
"""Extrahiert globale Default-Werte.""" """Extrahiert globale Default-Werte aus der Registry."""
if not isinstance(reg, dict): return {} if not isinstance(reg, dict): return {}
for key in ("defaults", "default", "global"): for key in ("defaults", "default", "global"):
v = reg.get(key) v = reg.get(key)
@ -73,9 +77,14 @@ def _cfg_defaults(reg: dict) -> dict:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]:
"""Baut das Note-Payload inklusive Multi-Hash und Audit-Validierung.""" """
Baut das Note-Payload inklusive Multi-Hash und Audit-Validierung.
WP-14: Nutzt nun die zentrale Registry für alle Fallbacks.
"""
n = _as_dict(note) n = _as_dict(note)
reg = kwargs.get("types_cfg") or {}
# Nutzt übergebene Registry oder lädt sie global
reg = kwargs.get("types_cfg") or load_type_registry()
hash_source = kwargs.get("hash_source", "parsed") hash_source = kwargs.get("hash_source", "parsed")
hash_normalize = kwargs.get("hash_normalize", "canonical") hash_normalize = kwargs.get("hash_normalize", "canonical")
@ -84,21 +93,26 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]:
cfg_type = _cfg_for_type(note_type, reg) cfg_type = _cfg_for_type(note_type, reg)
cfg_def = _cfg_defaults(reg) cfg_def = _cfg_defaults(reg)
ingest_cfg = reg.get("ingestion_settings", {})
# --- retriever_weight Audit --- # --- retriever_weight Audit ---
# Priorität: Frontmatter -> Typ-Config -> globale Config -> Env-Var
default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0)) default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0))
retriever_weight = fm.get("retriever_weight") retriever_weight = fm.get("retriever_weight")
if retriever_weight is None: if retriever_weight is None:
retriever_weight = cfg_type.get("retriever_weight", cfg_def.get("retriever_weight", default_rw)) retriever_weight = cfg_type.get("retriever_weight", cfg_def.get("retriever_weight", default_rw))
try: retriever_weight = float(retriever_weight) try:
except: retriever_weight = default_rw retriever_weight = float(retriever_weight)
except:
retriever_weight = default_rw
# --- chunk_profile Audit --- # --- chunk_profile Audit ---
# Nutzt nun primär die ingestion_settings aus der Registry
chunk_profile = fm.get("chunking_profile") or fm.get("chunk_profile") chunk_profile = fm.get("chunking_profile") or fm.get("chunk_profile")
if chunk_profile is None: if chunk_profile is None:
chunk_profile = cfg_type.get("chunking_profile") chunk_profile = cfg_type.get("chunking_profile") or cfg_type.get("chunk_profile")
if chunk_profile is None: if chunk_profile is None:
chunk_profile = cfg_def.get("chunking_profile", "sliding_standard") chunk_profile = ingest_cfg.get("default_chunk_profile", cfg_def.get("chunking_profile", "sliding_standard"))
# --- edge_defaults --- # --- edge_defaults ---
edge_defaults = fm.get("edge_defaults") edge_defaults = fm.get("edge_defaults")
@ -124,17 +138,20 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]:
} }
# --- MULTI-HASH --- # --- MULTI-HASH ---
# Generiert Hashes für Change Detection
for mode in ["body", "full"]: for mode in ["body", "full"]:
content = _get_hash_source_content(n, mode) content = _get_hash_source_content(n, mode)
payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content) payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content)
# Metadaten # Metadaten Anreicherung
tags = fm.get("tags") or fm.get("keywords") or n.get("tags") tags = fm.get("tags") or fm.get("keywords") or n.get("tags")
if tags: payload["tags"] = _ensure_list(tags) if tags: payload["tags"] = _ensure_list(tags)
if fm.get("aliases"): payload["aliases"] = _ensure_list(fm.get("aliases")) if fm.get("aliases"): payload["aliases"] = _ensure_list(fm.get("aliases"))
for k in ("created", "modified", "date"): for k in ("created", "modified", "date"):
v = fm.get(k) or n.get(k) v = fm.get(k) or n.get(k)
if v: payload[k] = str(v) if v: payload[k] = str(v)
if n.get("body"): payload["fulltext"] = str(n["body"]) if n.get("body"): payload["fulltext"] = str(n["body"])
# Final JSON Validation Audit # Final JSON Validation Audit

View File

@ -1,11 +1,11 @@
""" """
FILE: app/core/ingestion/ingestion_processor.py FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator). DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-14: Vollständig modularisiert. WP-14: Modularisierung der Datenbank-Ebene (app.core.database).
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v2.13.7: Synchronisierung des Context-Scanners mit der Registry (WP-14). AUDIT v2.13.10: Umstellung auf app.core.database Infrastruktur.
VERSION: 2.13.7 VERSION: 2.13.10
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -19,8 +19,10 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext validate_required_frontmatter, NoteContext
) )
from app.core.chunking import assemble_chunks from app.core.chunking import assemble_chunks
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
# Services # Services
from app.services.embeddings_client import EmbeddingsClient from app.services.embeddings_client import EmbeddingsClient
@ -44,12 +46,13 @@ logger = logging.getLogger(__name__)
class IngestionService: class IngestionService:
def __init__(self, collection_prefix: str = None): def __init__(self, collection_prefix: str = None):
"""Initialisiert den Service und stellt die DB-Verbindung bereit.""" """Initialisiert den Service und nutzt die neue database-Infrastruktur."""
from app.config import get_settings from app.config import get_settings
self.settings = get_settings() self.settings = get_settings()
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
self.cfg = QdrantConfig.from_env() self.cfg = QdrantConfig.from_env()
# Synchronisierung der Konfiguration mit dem Instanz-Präfix
self.cfg.prefix = self.prefix self.cfg.prefix = self.prefix
self.client = get_client(self.cfg) self.client = get_client(self.cfg)
self.dim = self.settings.VECTOR_SIZE self.dim = self.settings.VECTOR_SIZE
@ -61,6 +64,7 @@ class IngestionService:
self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache
try: try:
# Aufruf der modularisierten Schema-Logik
ensure_collections(self.client, self.prefix, self.dim) ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix) ensure_payload_indexes(self.client, self.prefix)
except Exception as e: except Exception as e:
@ -75,8 +79,7 @@ class IngestionService:
logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...")
for path in file_paths: for path in file_paths:
try: try:
# ANPASSUNG: Übergabe der Registry für dynamische Scan-Parameter (WP-14) # Übergabe der Registry für dynamische Scan-Tiefe
# Ermöglicht die Nutzung von summary_settings aus types.yaml
ctx = pre_scan_markdown(path, registry=self.registry) ctx = pre_scan_markdown(path, registry=self.registry)
if ctx: if ctx:
# Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname) # Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname)
@ -110,7 +113,7 @@ class IngestionService:
except Exception as e: except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"} return {**result, "error": f"Validation failed: {str(e)}"}
# Dynamischer Lifecycle-Filter aus der Registry # Dynamischer Lifecycle-Filter aus der Registry (WP-14)
ingest_cfg = self.registry.get("ingestion_settings", {}) ingest_cfg = self.registry.get("ingestion_settings", {})
ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"]) ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"])
@ -180,7 +183,7 @@ class IngestionService:
context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")} context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")}
) )
# 4. DB Upsert # 4. DB Upsert via modularisierter Points-Logik
if purge_before and old_payload: if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id) purge_artifacts(self.client, self.prefix, note_id)

View File

@ -1,11 +1,15 @@
""" """
FILE: app/core/ingestion/ingestion_validation.py FILE: app/core/ingestion/ingestion_validation.py
DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache. DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache.
AUDIT v2.12.3: Integration der zentralen Text-Bereinigung (WP-14).
""" """
import logging import logging
from typing import Dict, Any from typing import Dict, Any
from app.core.parser import NoteContext from app.core.parser import NoteContext
# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports
from app.core.registry import clean_llm_text
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def validate_edge_candidate( async def validate_edge_candidate(
@ -15,7 +19,10 @@ async def validate_edge_candidate(
llm_service: Any, llm_service: Any,
provider: str provider: str
) -> bool: ) -> bool:
"""WP-15b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache.""" """
WP-15b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache.
Nutzt clean_llm_text zur Entfernung von Steuerzeichen vor der Auswertung.
"""
target_id = edge.get("to") target_id = edge.get("to")
target_ctx = batch_cache.get(target_id) target_ctx = batch_cache.get(target_id)
@ -40,7 +47,13 @@ async def validate_edge_candidate(
edge_kind=edge.get("kind", "related_to") edge_kind=edge.get("kind", "related_to")
) )
response = await llm_service.generate_raw_response(prompt, priority="background") # Die Antwort vom Service anfordern
raw_response = await llm_service.generate_raw_response(prompt, priority="background")
# WP-14 Fix: Zusätzliche Bereinigung zur Sicherstellung der Interpretierbarkeit
response = clean_llm_text(raw_response)
# Semantische Prüfung des Ergebnisses
is_valid = "YES" in response.upper() is_valid = "YES" in response.upper()
if is_valid: if is_valid:
@ -50,4 +63,5 @@ async def validate_edge_candidate(
return is_valid return is_valid
except Exception as e: except Exception as e:
logger.warning(f"⚠️ Validation error for {target_id}: {e}") logger.warning(f"⚠️ Validation error for {target_id}: {e}")
# Im Zweifel (Timeout/Fehler) erlauben wir die Kante, um Datenverlust zu vermeiden
return True return True

View File

@ -1,161 +1,22 @@
""" """
FILE: app/core/qdrant.py FILE: app/core/qdrant.py
DESCRIPTION: Qdrant-Client Factory und Schema-Management. Erstellt Collections und Payload-Indizes. DESCRIPTION: Proxy-Modul zur Aufrechterhaltung der Abwärtskompatibilität (WP-14).
VERSION: 2.2.0 Leitet alle Aufrufe an das neue database-Paket weiter.
STATUS: Active STATUS: Proxy (Legacy-Support)
DEPENDENCIES: qdrant_client, dataclasses, os
LAST_ANALYSIS: 2025-12-15
""" """
from __future__ import annotations from .database.qdrant import (
QdrantConfig,
import os get_client,
from dataclasses import dataclass ensure_collections,
from typing import Optional, Tuple, Dict, List ensure_payload_indexes,
collection_names
from qdrant_client import QdrantClient )
from qdrant_client.http import models as rest
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
@dataclass
class QdrantConfig:
host: Optional[str] = None
port: Optional[int] = None
url: Optional[str] = None
api_key: Optional[str] = None
prefix: str = "mindnet"
dim: int = 384
distance: str = "Cosine" # Cosine | Dot | Euclid
on_disk_payload: bool = True
@classmethod
def from_env(cls) -> "QdrantConfig":
# Entweder URL ODER Host/Port, API-Key optional
url = os.getenv("QDRANT_URL") or None
host = os.getenv("QDRANT_HOST") or None
port = os.getenv("QDRANT_PORT")
port = int(port) if port else None
api_key = os.getenv("QDRANT_API_KEY") or None
prefix = os.getenv("COLLECTION_PREFIX") or "mindnet"
dim = int(os.getenv("VECTOR_DIM") or 384)
distance = os.getenv("DISTANCE", "Cosine")
on_disk_payload = (os.getenv("ON_DISK_PAYLOAD", "true").lower() == "true")
return cls(
host=host, port=port, url=url, api_key=api_key,
prefix=prefix, dim=dim, distance=distance, on_disk_payload=on_disk_payload
)
def get_client(cfg: QdrantConfig) -> QdrantClient:
# QdrantClient akzeptiert entweder url=... oder host/port
if cfg.url:
return QdrantClient(url=cfg.url, api_key=cfg.api_key, timeout=60.0)
return QdrantClient(host=cfg.host or "127.0.0.1", port=cfg.port or 6333, api_key=cfg.api_key, timeout=60.0)
# ---------------------------------------------------------------------------
# Collections
# ---------------------------------------------------------------------------
def collection_names(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def _vector_params(dim: int, distance: str) -> rest.VectorParams:
# Distance: "Cosine" | "Dot" | "Euclid"
dist = getattr(rest.Distance, distance.capitalize(), rest.Distance.COSINE)
return rest.VectorParams(size=dim, distance=dist)
def ensure_collections(client: QdrantClient, prefix: str, dim: int) -> None:
"""Legt mindnet_notes, mindnet_chunks, mindnet_edges an (falls nicht vorhanden)."""
notes, chunks, edges = collection_names(prefix)
# notes
if not client.collection_exists(notes):
client.create_collection(
collection_name=notes,
vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")),
on_disk_payload=True,
)
# chunks
if not client.collection_exists(chunks):
client.create_collection(
collection_name=chunks,
vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")),
on_disk_payload=True,
)
# edges (Dummy-Vektor, Filter via Payload)
if not client.collection_exists(edges):
client.create_collection(
collection_name=edges,
vectors_config=_vector_params(1, "Dot"),
on_disk_payload=True,
)
# ---------------------------------------------------------------------------
# Payload-Indizes
# ---------------------------------------------------------------------------
def _ensure_index(client: QdrantClient, collection: str, field: str, schema: rest.PayloadSchemaType) -> None:
"""Idempotentes Anlegen eines Payload-Indexes für ein Feld."""
try:
client.create_payload_index(collection_name=collection, field_name=field, field_schema=schema, wait=True)
except Exception as e:
# Fehler ignorieren, falls Index bereits existiert oder Server "already indexed" meldet.
# Für Debugging ggf. Logging ergänzen.
_ = e
def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None:
"""
Stellt sicher, dass alle benötigten Payload-Indizes existieren.
- notes: note_id(KEYWORD), type(KEYWORD), title(TEXT), updated(INTEGER), tags(KEYWORD)
- chunks: note_id(KEYWORD), chunk_id(KEYWORD), index(INTEGER), type(KEYWORD), tags(KEYWORD)
- edges: note_id(KEYWORD), kind(KEYWORD), scope(KEYWORD), source_id(KEYWORD), target_id(KEYWORD), chunk_id(KEYWORD)
"""
notes, chunks, edges = collection_names(prefix)
# NOTES
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("type", rest.PayloadSchemaType.KEYWORD),
("title", rest.PayloadSchemaType.TEXT),
("updated", rest.PayloadSchemaType.INTEGER),
("tags", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, notes, field, schema)
# CHUNKS
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("chunk_id", rest.PayloadSchemaType.KEYWORD),
("index", rest.PayloadSchemaType.INTEGER),
("type", rest.PayloadSchemaType.KEYWORD),
("tags", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, chunks, field, schema)
# EDGES
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("kind", rest.PayloadSchemaType.KEYWORD),
("scope", rest.PayloadSchemaType.KEYWORD),
("source_id", rest.PayloadSchemaType.KEYWORD),
("target_id", rest.PayloadSchemaType.KEYWORD),
("chunk_id", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, edges, field, schema)
# Re-Export für 100% Kompatibilität
__all__ = [ __all__ = [
"QdrantConfig", "QdrantConfig",
"get_client", "get_client",
"ensure_collections", "ensure_collections",
"ensure_payload_indexes", "ensure_payload_indexes",
"collection_names", "collection_names",
] ]

View File

@ -1,292 +1,24 @@
""" """
FILE: app/core/qdrant_points.py FILE: app/core/qdrant_points.py
DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) in PointStructs und generiert deterministische UUIDs. DESCRIPTION: Proxy-Modul zur Aufrechterhaltung der Abwärtskompatibilität (WP-14).
VERSION: 1.5.0 Leitet Point-Operationen an das neue database-Paket weiter.
STATUS: Active STATUS: Proxy (Legacy-Support)
DEPENDENCIES: qdrant_client, uuid, os
LAST_ANALYSIS: 2025-12-15
""" """
from __future__ import annotations from .database.qdrant_points import (
import os points_for_note,
import uuid points_for_chunks,
from typing import List, Tuple, Iterable, Optional, Dict, Any points_for_edges,
upsert_batch,
get_edges_for_sources,
search_chunks_by_vector
)
from qdrant_client.http import models as rest # Re-Export für 100% Kompatibilität
from qdrant_client import QdrantClient __all__ = [
"points_for_note",
# --------------------- ID helpers --------------------- "points_for_chunks",
"points_for_edges",
def _to_uuid(stable_key: str) -> str: "upsert_batch",
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) "get_edges_for_sources",
"search_chunks_by_vector"
def _names(prefix: str) -> Tuple[str, str, str]: ]
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
# --------------------- Points builders ---------------------
def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]:
notes_col, _, _ = _names(prefix)
vector = note_vec if note_vec is not None else [0.0] * int(dim)
raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id"
point_id = _to_uuid(raw_note_id)
pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload)
return notes_col, [pt]
def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]:
_, chunks_col, _ = _names(prefix)
points: List[rest.PointStruct] = []
for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1):
chunk_id = pl.get("chunk_id") or pl.get("id")
if not chunk_id:
note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note"
chunk_id = f"{note_id}#{i}"
pl["chunk_id"] = chunk_id
point_id = _to_uuid(chunk_id)
points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl))
return chunks_col, points
def _normalize_edge_payload(pl: dict) -> dict:
kind = pl.get("kind") or pl.get("edge_type") or "edge"
source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src"
target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt"
seq = pl.get("seq") or pl.get("order") or pl.get("index")
pl.setdefault("kind", kind)
pl.setdefault("source_id", source_id)
pl.setdefault("target_id", target_id)
if seq is not None and "seq" not in pl:
pl["seq"] = seq
return pl
def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]:
_, _, edges_col = _names(prefix)
points: List[rest.PointStruct] = []
for raw in edge_payloads:
pl = _normalize_edge_payload(raw)
edge_id = pl.get("edge_id")
if not edge_id:
kind = pl.get("kind", "edge")
s = pl.get("source_id", "unknown-src")
t = pl.get("target_id", "unknown-tgt")
seq = pl.get("seq") or ""
edge_id = f"{kind}:{s}->{t}#{seq}"
pl["edge_id"] = edge_id
point_id = _to_uuid(edge_id)
points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl))
return edges_col, points
# --------------------- Vector schema & overrides ---------------------
def _preferred_name(candidates: List[str]) -> str:
for k in ("text", "default", "embedding", "content"):
if k in candidates:
return k
return sorted(candidates)[0]
def _env_override_for_collection(collection: str) -> Optional[str]:
"""
Returns:
- "__single__" to force single-vector
- concrete name (str) to force named-vector with that name
- None to auto-detect
"""
base = os.getenv("MINDNET_VECTOR_NAME")
if collection.endswith("_notes"):
base = os.getenv("NOTES_VECTOR_NAME", base)
elif collection.endswith("_chunks"):
base = os.getenv("CHUNKS_VECTOR_NAME", base)
elif collection.endswith("_edges"):
base = os.getenv("EDGES_VECTOR_NAME", base)
if not base:
return None
val = base.strip()
if val.lower() in ("__single__", "single"):
return "__single__"
return val # concrete name
def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict:
"""
Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}.
"""
try:
info = client.get_collection(collection_name=collection_name)
vecs = getattr(info, "vectors", None)
# Single-vector config
if hasattr(vecs, "size") and isinstance(vecs.size, int):
return {"kind": "single", "size": vecs.size}
# Named-vectors config (dict-like in .config)
cfg = getattr(vecs, "config", None)
if isinstance(cfg, dict) and cfg:
names = list(cfg.keys())
if names:
return {"kind": "named", "names": names, "primary": _preferred_name(names)}
except Exception:
pass
return {"kind": "single", "size": None}
def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruct]:
out: List[rest.PointStruct] = []
for pt in points:
vec = getattr(pt, "vector", None)
if isinstance(vec, dict):
if name in vec:
out.append(pt)
else:
# take any existing entry; if empty dict fallback to [0.0]
fallback_vec = None
try:
fallback_vec = list(next(iter(vec.values())))
except Exception:
fallback_vec = [0.0]
out.append(rest.PointStruct(id=pt.id, vector={name: fallback_vec}, payload=pt.payload))
elif vec is not None:
out.append(rest.PointStruct(id=pt.id, vector={name: vec}, payload=pt.payload))
else:
out.append(pt)
return out
# --------------------- Qdrant ops ---------------------
def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None:
if not points:
return
# 1) ENV overrides come first
override = _env_override_for_collection(collection)
if override == "__single__":
client.upsert(collection_name=collection, points=points, wait=True)
return
elif isinstance(override, str):
client.upsert(collection_name=collection, points=_as_named(points, override), wait=True)
return
# 2) Auto-detect schema
schema = _get_vector_schema(client, collection)
if schema.get("kind") == "named":
name = schema.get("primary") or _preferred_name(schema.get("names") or [])
client.upsert(collection_name=collection, points=_as_named(points, name), wait=True)
return
# 3) Fallback single-vector
client.upsert(collection_name=collection, points=points, wait=True)
# --- Optional search helpers ---
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values])
def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]:
fs = [f for f in filters if f is not None]
if not fs:
return None
if len(fs) == 1:
return fs[0]
must = []
for f in fs:
if getattr(f, "must", None):
must.extend(f.must)
if getattr(f, "should", None):
must.append(rest.Filter(should=f.should))
return rest.Filter(must=must)
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
if not filters:
return None
parts = []
for k, v in filters.items():
if isinstance(v, (list, tuple, set)):
parts.append(_filter_any(k, [str(x) for x in v]))
else:
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))]))
return _merge_filters(*parts)
def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]:
_, chunks_col, _ = _names(prefix)
flt = _filter_from_dict(filters)
res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt)
out: List[Tuple[str, float, dict]] = []
for r in res:
out.append((str(r.id), float(r.score), dict(r.payload or {})))
return out
# --- Edge retrieval helper ---
def get_edges_for_sources(
client: QdrantClient,
prefix: str,
source_ids: Iterable[str],
edge_types: Optional[Iterable[str]] = None,
limit: int = 2048,
) -> List[Dict[str, Any]]:
"""Retrieve edge payloads from the <prefix>_edges collection.
Args:
client: QdrantClient instance.
prefix: Mindnet collection prefix (e.g. "mindnet").
source_ids: Iterable of source_id values (typically chunk_ids or note_ids).
edge_types: Optional iterable of edge kinds (e.g. ["references", "depends_on"]). If None,
all kinds are returned.
limit: Maximum number of edge payloads to return.
Returns:
A list of edge payload dicts, e.g.:
{
"note_id": "...",
"chunk_id": "...",
"kind": "references" | "depends_on" | ...,
"scope": "chunk",
"source_id": "...",
"target_id": "...",
"rule_id": "...",
"confidence": 0.7,
...
}
"""
source_ids = list(source_ids)
if not source_ids or limit <= 0:
return []
# Resolve collection name
_, _, edges_col = _names(prefix)
# Build filter: source_id IN source_ids
src_filter = _filter_any("source_id", [str(s) for s in source_ids])
# Optional: kind IN edge_types
kind_filter = None
if edge_types:
kind_filter = _filter_any("kind", [str(k) for k in edge_types])
flt = _merge_filters(src_filter, kind_filter)
out: List[Dict[str, Any]] = []
next_page = None
remaining = int(limit)
# Use paginated scroll API; we don't need vectors, only payloads.
while remaining > 0:
batch_limit = min(256, remaining)
res, next_page = client.scroll(
collection_name=edges_col,
scroll_filter=flt,
limit=batch_limit,
with_payload=True,
with_vectors=False,
offset=next_page,
)
if not res:
break
for r in res:
out.append(dict(r.payload or {}))
remaining -= 1
if remaining <= 0:
break
if next_page is None or remaining <= 0:
break
return out