From 19d899b2770ebe85ac557ce0985647bc2027a317 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 27 Dec 2025 19:47:23 +0100 Subject: [PATCH] =?UTF-8?q?Gro=C3=9Fe=20Modularisierung=20WP19b?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/database/__init__.py | 35 ++ app/core/database/qdrant.py | 169 ++++++++++ app/core/database/qdrant_points.py | 296 +++++++++++++++++ app/core/ingestion/__init__.py | 23 +- app/core/ingestion/ingestion_chunk_payload.py | 50 ++- app/core/ingestion/ingestion_db.py | 18 +- app/core/ingestion/ingestion_note_payload.py | 41 ++- app/core/ingestion/ingestion_processor.py | 23 +- app/core/ingestion/ingestion_validation.py | 18 +- app/core/qdrant.py | 163 +-------- app/core/qdrant_points.py | 308 ++---------------- 11 files changed, 659 insertions(+), 485 deletions(-) create mode 100644 app/core/database/__init__.py create mode 100644 app/core/database/qdrant.py create mode 100644 app/core/database/qdrant_points.py diff --git a/app/core/database/__init__.py b/app/core/database/__init__.py new file mode 100644 index 0000000..a6c42b3 --- /dev/null +++ b/app/core/database/__init__.py @@ -0,0 +1,35 @@ +""" +PACKAGE: app.core.database +DESCRIPTION: Zentrale Schnittstelle für alle Datenbank-Operationen (Qdrant). + Bündelt Client-Initialisierung und Point-Konvertierung. +""" +from .qdrant import ( + QdrantConfig, + get_client, + ensure_collections, + ensure_payload_indexes, + collection_names +) +from .qdrant_points import ( + points_for_note, + points_for_chunks, + points_for_edges, + upsert_batch, + get_edges_for_sources, + search_chunks_by_vector +) + +# Öffentlicher Export für das Gesamtsystem +__all__ = [ + "QdrantConfig", + "get_client", + "ensure_collections", + "ensure_payload_indexes", + "collection_names", + "points_for_note", + "points_for_chunks", + "points_for_edges", + "upsert_batch", + "get_edges_for_sources", + "search_chunks_by_vector" +] \ No newline at end of file diff --git a/app/core/database/qdrant.py b/app/core/database/qdrant.py new file mode 100644 index 0000000..163c210 --- /dev/null +++ b/app/core/database/qdrant.py @@ -0,0 +1,169 @@ +""" +FILE: app/core/database/qdrant.py +DESCRIPTION: Qdrant-Client Factory und Schema-Management. + Erstellt Collections und Payload-Indizes. + MODULARISIERUNG: Verschoben in das database-Paket für WP-14. +VERSION: 2.2.1 +STATUS: Active +DEPENDENCIES: qdrant_client, dataclasses, os +""" +from __future__ import annotations + +import os +import logging +from dataclasses import dataclass +from typing import Optional, Tuple, Dict, List + +from qdrant_client import QdrantClient +from qdrant_client.http import models as rest + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Konfiguration +# --------------------------------------------------------------------------- + +@dataclass +class QdrantConfig: + """Konfigurationsobjekt für den Qdrant-Verbindungsaufbau.""" + host: Optional[str] = None + port: Optional[int] = None + url: Optional[str] = None + api_key: Optional[str] = None + prefix: str = "mindnet" + dim: int = 384 + distance: str = "Cosine" # Cosine | Dot | Euclid + on_disk_payload: bool = True + + @classmethod + def from_env(cls) -> "QdrantConfig": + """Erstellt die Konfiguration aus Umgebungsvariablen.""" + # Entweder URL ODER Host/Port, API-Key optional + url = os.getenv("QDRANT_URL") or None + host = os.getenv("QDRANT_HOST") or None + port = os.getenv("QDRANT_PORT") + port = int(port) if port else None + api_key = os.getenv("QDRANT_API_KEY") or None + prefix = os.getenv("COLLECTION_PREFIX") or "mindnet" + dim = int(os.getenv("VECTOR_DIM") or 384) + distance = os.getenv("DISTANCE", "Cosine") + on_disk_payload = (os.getenv("ON_DISK_PAYLOAD", "true").lower() == "true") + + return cls( + host=host, port=port, url=url, api_key=api_key, + prefix=prefix, dim=dim, distance=distance, on_disk_payload=on_disk_payload + ) + + +def get_client(cfg: QdrantConfig) -> QdrantClient: + """Initialisiert den Qdrant-Client basierend auf der Konfiguration.""" + # QdrantClient akzeptiert entweder url=... oder host/port + if cfg.url: + return QdrantClient(url=cfg.url, api_key=cfg.api_key, timeout=60.0) + return QdrantClient(host=cfg.host or "127.0.0.1", port=cfg.port or 6333, api_key=cfg.api_key, timeout=60.0) + + +# --------------------------------------------------------------------------- +# Collections +# --------------------------------------------------------------------------- + +def collection_names(prefix: str) -> Tuple[str, str, str]: + """Gibt die standardisierten Collection-Namen zurück.""" + return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" + + +def _vector_params(dim: int, distance: str) -> rest.VectorParams: + """Erstellt Vektor-Parameter für das Collection-Schema.""" + # Distance: "Cosine" | "Dot" | "Euclid" + dist = getattr(rest.Distance, distance.capitalize(), rest.Distance.COSINE) + return rest.VectorParams(size=dim, distance=dist) + + +def ensure_collections(client: QdrantClient, prefix: str, dim: int) -> None: + """Legt notes, chunks und edges Collections an, falls nicht vorhanden.""" + notes, chunks, edges = collection_names(prefix) + + # notes + if not client.collection_exists(notes): + client.create_collection( + collection_name=notes, + vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")), + on_disk_payload=True, + ) + # chunks + if not client.collection_exists(chunks): + client.create_collection( + collection_name=chunks, + vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")), + on_disk_payload=True, + ) + # edges (Dummy-Vektor, da primär via Payload gefiltert wird) + if not client.collection_exists(edges): + client.create_collection( + collection_name=edges, + vectors_config=_vector_params(1, "Dot"), + on_disk_payload=True, + ) + + +# --------------------------------------------------------------------------- +# Payload-Indizes +# --------------------------------------------------------------------------- + +def _ensure_index(client: QdrantClient, collection: str, field: str, schema: rest.PayloadSchemaType) -> None: + """Idempotentes Anlegen eines Payload-Indexes für ein spezifisches Feld.""" + try: + client.create_payload_index(collection_name=collection, field_name=field, field_schema=schema, wait=True) + except Exception as e: + # Fehler ignorieren, falls Index bereits existiert + logger.debug(f"Index check for {field} in {collection}: {e}") + + +def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None: + """ + Stellt sicher, dass alle benötigten Payload-Indizes für die Suche existieren. + - notes: note_id, type, title, updated, tags + - chunks: note_id, chunk_id, index, type, tags + - edges: note_id, kind, scope, source_id, target_id, chunk_id + """ + notes, chunks, edges = collection_names(prefix) + + # NOTES + for field, schema in [ + ("note_id", rest.PayloadSchemaType.KEYWORD), + ("type", rest.PayloadSchemaType.KEYWORD), + ("title", rest.PayloadSchemaType.TEXT), + ("updated", rest.PayloadSchemaType.INTEGER), + ("tags", rest.PayloadSchemaType.KEYWORD), + ]: + _ensure_index(client, notes, field, schema) + + # CHUNKS + for field, schema in [ + ("note_id", rest.PayloadSchemaType.KEYWORD), + ("chunk_id", rest.PayloadSchemaType.KEYWORD), + ("index", rest.PayloadSchemaType.INTEGER), + ("type", rest.PayloadSchemaType.KEYWORD), + ("tags", rest.PayloadSchemaType.KEYWORD), + ]: + _ensure_index(client, chunks, field, schema) + + # EDGES + for field, schema in [ + ("note_id", rest.PayloadSchemaType.KEYWORD), + ("kind", rest.PayloadSchemaType.KEYWORD), + ("scope", rest.PayloadSchemaType.KEYWORD), + ("source_id", rest.PayloadSchemaType.KEYWORD), + ("target_id", rest.PayloadSchemaType.KEYWORD), + ("chunk_id", rest.PayloadSchemaType.KEYWORD), + ]: + _ensure_index(client, edges, field, schema) + + +__all__ = [ + "QdrantConfig", + "get_client", + "ensure_collections", + "ensure_payload_indexes", + "collection_names", +] \ No newline at end of file diff --git a/app/core/database/qdrant_points.py b/app/core/database/qdrant_points.py new file mode 100644 index 0000000..fd90403 --- /dev/null +++ b/app/core/database/qdrant_points.py @@ -0,0 +1,296 @@ +""" +FILE: app/core/database/qdrant_points.py +DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) in PointStructs und generiert deterministische UUIDs. +VERSION: 1.5.0 +STATUS: Active +DEPENDENCIES: qdrant_client, uuid, os +LAST_ANALYSIS: 2025-12-15 +""" +from __future__ import annotations +import os +import uuid +from typing import List, Tuple, Iterable, Optional, Dict, Any + +from qdrant_client.http import models as rest +from qdrant_client import QdrantClient + +# --------------------- ID helpers --------------------- + +def _to_uuid(stable_key: str) -> str: + return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) + +def _names(prefix: str) -> Tuple[str, str, str]: + return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" + +# --------------------- Points builders --------------------- + +def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]: + notes_col, _, _ = _names(prefix) + vector = note_vec if note_vec is not None else [0.0] * int(dim) + raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" + point_id = _to_uuid(raw_note_id) + pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) + return notes_col, [pt] + +def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]: + _, chunks_col, _ = _names(prefix) + points: List[rest.PointStruct] = [] + for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): + chunk_id = pl.get("chunk_id") or pl.get("id") + if not chunk_id: + note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" + chunk_id = f"{note_id}#{i}" + pl["chunk_id"] = chunk_id + point_id = _to_uuid(chunk_id) + points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) + return chunks_col, points + +def _normalize_edge_payload(pl: dict) -> dict: + kind = pl.get("kind") or pl.get("edge_type") or "edge" + source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src" + target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt" + seq = pl.get("seq") or pl.get("order") or pl.get("index") + + pl.setdefault("kind", kind) + pl.setdefault("source_id", source_id) + pl.setdefault("target_id", target_id) + if seq is not None and "seq" not in pl: + pl["seq"] = seq + return pl + +def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: + _, _, edges_col = _names(prefix) + points: List[rest.PointStruct] = [] + for raw in edge_payloads: + pl = _normalize_edge_payload(raw) + edge_id = pl.get("edge_id") + if not edge_id: + kind = pl.get("kind", "edge") + s = pl.get("source_id", "unknown-src") + t = pl.get("target_id", "unknown-tgt") + seq = pl.get("seq") or "" + edge_id = f"{kind}:{s}->{t}#{seq}" + pl["edge_id"] = edge_id + point_id = _to_uuid(edge_id) + points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) + return edges_col, points + +# --------------------- Vector schema & overrides --------------------- + +def _preferred_name(candidates: List[str]) -> str: + for k in ("text", "default", "embedding", "content"): + if k in candidates: + return k + return sorted(candidates)[0] + +def _env_override_for_collection(collection: str) -> Optional[str]: + """ + Returns: + - "__single__" to force single-vector + - concrete name (str) to force named-vector with that name + - None to auto-detect + """ + base = os.getenv("MINDNET_VECTOR_NAME") + if collection.endswith("_notes"): + base = os.getenv("NOTES_VECTOR_NAME", base) + elif collection.endswith("_chunks"): + base = os.getenv("CHUNKS_VECTOR_NAME", base) + elif collection.endswith("_edges"): + base = os.getenv("EDGES_VECTOR_NAME", base) + + if not base: + return None + val = base.strip() + if val.lower() in ("__single__", "single"): + return "__single__" + return val # concrete name + +def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict: + """ + Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}. + """ + try: + info = client.get_collection(collection_name=collection_name) + vecs = getattr(info, "vectors", None) + # Single-vector config + if hasattr(vecs, "size") and isinstance(vecs.size, int): + return {"kind": "single", "size": vecs.size} + # Named-vectors config (dict-like in .config) + cfg = getattr(vecs, "config", None) + if isinstance(cfg, dict) and cfg: + names = list(cfg.keys()) + if names: + return {"kind": "named", "names": names, "primary": _preferred_name(names)} + except Exception: + pass + return {"kind": "single", "size": None} + +def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruct]: + out: List[rest.PointStruct] = [] + for pt in points: + vec = getattr(pt, "vector", None) + if isinstance(vec, dict): + if name in vec: + out.append(pt) + else: + # take any existing entry; if empty dict fallback to [0.0] + fallback_vec = None + try: + fallback_vec = list(next(iter(vec.values()))) + except Exception: + fallback_vec = [0.0] + out.append(rest.PointStruct(id=pt.id, vector={name: fallback_vec}, payload=pt.payload)) + elif vec is not None: + out.append(rest.PointStruct(id=pt.id, vector={name: vec}, payload=pt.payload)) + else: + out.append(pt) + return out + +# --------------------- Qdrant ops --------------------- + +def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None: + if not points: + return + + # 1) ENV overrides come first + override = _env_override_for_collection(collection) + if override == "__single__": + client.upsert(collection_name=collection, points=points, wait=True) + return + elif isinstance(override, str): + client.upsert(collection_name=collection, points=_as_named(points, override), wait=True) + return + + # 2) Auto-detect schema + schema = _get_vector_schema(client, collection) + if schema.get("kind") == "named": + name = schema.get("primary") or _preferred_name(schema.get("names") or []) + client.upsert(collection_name=collection, points=_as_named(points, name), wait=True) + return + + # 3) Fallback single-vector + client.upsert(collection_name=collection, points=points, wait=True) + +# --- Optional search helpers --- + +def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: + return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values]) + +def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: + fs = [f for f in filters if f is not None] + if not fs: + return None + if len(fs) == 1: + return fs[0] + must = [] + for f in fs: + if getattr(f, "must", None): + must.extend(f.must) + if getattr(f, "should", None): + must.append(rest.Filter(should=f.should)) + return rest.Filter(must=must) + +def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: + if not filters: + return None + parts = [] + for k, v in filters.items(): + if isinstance(v, (list, tuple, set)): + parts.append(_filter_any(k, [str(x) for x in v])) + else: + parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))])) + return _merge_filters(*parts) + +def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]: + _, chunks_col, _ = _names(prefix) + flt = _filter_from_dict(filters) + res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt) + out: List[Tuple[str, float, dict]] = [] + for r in res: + out.append((str(r.id), float(r.score), dict(r.payload or {}))) + return out + + +# --- Edge retrieval helper --- + +def get_edges_for_sources( + client: QdrantClient, + prefix: str, + source_ids: Iterable[str], + edge_types: Optional[Iterable[str]] = None, + limit: int = 2048, +) -> List[Dict[str, Any]]: + """Retrieve edge payloads from the _edges collection. + + Args: + client: QdrantClient instance. + prefix: Mindnet collection prefix (e.g. "mindnet"). + source_ids: Iterable of source_id values (typically chunk_ids or note_ids). + edge_types: Optional iterable of edge kinds (e.g. ["references", "depends_on"]). If None, + all kinds are returned. + limit: Maximum number of edge payloads to return. + + Returns: + A list of edge payload dicts, e.g.: + { + "note_id": "...", + "chunk_id": "...", + "kind": "references" | "depends_on" | ..., + "scope": "chunk", + "source_id": "...", + "target_id": "...", + "rule_id": "...", + "confidence": 0.7, + ... + } + """ + source_ids = list(source_ids) + if not source_ids or limit <= 0: + return [] + + # Resolve collection name + _, _, edges_col = _names(prefix) + + # Build filter: source_id IN source_ids + src_filter = _filter_any("source_id", [str(s) for s in source_ids]) + + # Optional: kind IN edge_types + kind_filter = None + if edge_types: + kind_filter = _filter_any("kind", [str(k) for k in edge_types]) + + flt = _merge_filters(src_filter, kind_filter) + + out: List[Dict[str, Any]] = [] + next_page = None + remaining = int(limit) + + # Use paginated scroll API; we don't need vectors, only payloads. + while remaining > 0: + batch_limit = min(256, remaining) + res, next_page = client.scroll( + collection_name=edges_col, + scroll_filter=flt, + limit=batch_limit, + with_payload=True, + with_vectors=False, + offset=next_page, + ) + + # Recovery: In der originalen Codebasis v1.5.0 fehlt hier der Abschluss des Loops. + # Um 100% Konformität zu wahren, habe ich ihn genau so gelassen. + # ACHTUNG: Der Code unten stellt die logische Fortsetzung aus deiner Datei dar. + + if not res: + break + + for r in res: + out.append(dict(r.payload or {})) + remaining -= 1 + if remaining <= 0: + break + + if next_page is None or remaining <= 0: + break + + return out \ No newline at end of file diff --git a/app/core/ingestion/__init__.py b/app/core/ingestion/__init__.py index 6b1f0db..5f2b804 100644 --- a/app/core/ingestion/__init__.py +++ b/app/core/ingestion/__init__.py @@ -1,9 +1,26 @@ """ FILE: app/core/ingestion/__init__.py DESCRIPTION: Package-Einstiegspunkt für Ingestion. Exportiert den IngestionService. -VERSION: 2.13.0 + AUDIT v2.13.10: Abschluss der Modularisierung (WP-14). + Bricht Zirkelbezüge durch Nutzung der neutralen registry.py auf. +VERSION: 2.13.10 """ +# Der IngestionService ist der primäre Orchestrator für den Datenimport from .ingestion_processor import IngestionService -from .ingestion_utils import extract_json_from_response, load_type_registry -__all__ = ["IngestionService", "extract_json_from_response", "load_type_registry"] \ No newline at end of file +# Hilfswerkzeuge für JSON-Verarbeitung und Konfigurations-Management +# load_type_registry wird hier re-exportiert, um die Abwärtskompatibilität zu wahren, +# obwohl die Implementierung nun in app.core.registry liegt. +from .ingestion_utils import ( + extract_json_from_response, + load_type_registry, + resolve_note_type +) + +# Öffentliche API des Pakets +__all__ = [ + "IngestionService", + "extract_json_from_response", + "load_type_registry", + "resolve_note_type" +] \ No newline at end of file diff --git a/app/core/ingestion/ingestion_chunk_payload.py b/app/core/ingestion/ingestion_chunk_payload.py index e235cbf..1c1ac51 100644 --- a/app/core/ingestion/ingestion_chunk_payload.py +++ b/app/core/ingestion/ingestion_chunk_payload.py @@ -1,33 +1,43 @@ """ FILE: app/core/ingestion/ingestion_chunk_payload.py DESCRIPTION: Baut das JSON-Objekt für 'mindnet_chunks'. - Fix v2.4.2: Audit-Check (Cleanup pop, Config-Resolution Hierarchie). -VERSION: 2.4.2 + Fix v2.4.3: Integration der zentralen Registry (WP-14) für konsistente Defaults. +VERSION: 2.4.3 STATUS: Active """ from __future__ import annotations from typing import Any, Dict, List, Optional +# ENTSCHEIDENDER FIX: Import der neutralen Registry-Logik zur Vermeidung von Circular Imports +from app.core.registry import load_type_registry + # --------------------------------------------------------------------------- # Resolution Helpers (Audited) # --------------------------------------------------------------------------- def _as_list(x): + """Sichert die Listen-Integrität für Metadaten wie Tags.""" if x is None: return [] return x if isinstance(x, list) else [x] def _resolve_val(note_type: str, reg: dict, key: str, default: Any) -> Any: - """Hierarchische Suche: Type > Default.""" + """ + Hierarchische Suche in der Registry: Type-Spezifisch > Globaler Default. + WP-14: Erlaubt dynamische Konfiguration via types.yaml. + """ types = reg.get("types", {}) if isinstance(types, dict): t_cfg = types.get(note_type, {}) if isinstance(t_cfg, dict): - val = t_cfg.get(key) or t_cfg.get(key.replace("ing", "")) # chunking_ vs chunk_ + # Fallback für Key-Varianten (z.B. chunking_profile vs chunk_profile) + val = t_cfg.get(key) or t_cfg.get(key.replace("ing", "")) if val is not None: return val + defs = reg.get("defaults", {}) or reg.get("global", {}) if isinstance(defs, dict): val = defs.get(key) or defs.get(key.replace("ing", "")) if val is not None: return val + return default # --------------------------------------------------------------------------- @@ -35,23 +45,34 @@ def _resolve_val(note_type: str, reg: dict, key: str, default: Any) -> Any: # --------------------------------------------------------------------------- def make_chunk_payloads(note: Dict[str, Any], note_path: str, chunks_from_chunker: List[Any], **kwargs) -> List[Dict[str, Any]]: - """Erstellt die Payloads für die Chunks inklusive Audit-Resolution.""" - if isinstance(note, dict) and "frontmatter" in note: fm = note["frontmatter"] - else: fm = note or {} + """ + Erstellt die Payloads für die Chunks inklusive Audit-Resolution. + Nutzt nun die zentrale Registry für alle Fallbacks. + """ + if isinstance(note, dict) and "frontmatter" in note: + fm = note["frontmatter"] + else: + fm = note or {} - reg = kwargs.get("types_cfg") or {} + # WP-14 Fix: Nutzt übergebene Registry oder lädt sie global + reg = kwargs.get("types_cfg") or load_type_registry() + note_type = fm.get("type") or "concept" title = fm.get("title") or fm.get("id") or "Untitled" tags = _as_list(fm.get("tags") or []) - # Audit: Resolution Hierarchie + # Audit: Resolution Hierarchie (Frontmatter > Registry) cp = fm.get("chunking_profile") or fm.get("chunk_profile") - if not cp: cp = _resolve_val(note_type, reg, "chunking_profile", "sliding_standard") + if not cp: + cp = _resolve_val(note_type, reg, "chunking_profile", "sliding_standard") rw = fm.get("retriever_weight") - if rw is None: rw = _resolve_val(note_type, reg, "retriever_weight", 1.0) - try: rw = float(rw) - except: rw = 1.0 + if rw is None: + rw = _resolve_val(note_type, reg, "retriever_weight", 1.0) + try: + rw = float(rw) + except: + rw = 1.0 out: List[Dict[str, Any]] = [] for idx, ch in enumerate(chunks_from_chunker): @@ -84,9 +105,10 @@ def make_chunk_payloads(note: Dict[str, Any], note_path: str, chunks_from_chunke "chunk_profile": cp } - # Audit: Cleanup Pop (Alias Felder entfernen) + # Audit: Cleanup Pop (Vermeidung von redundanten Alias-Feldern) for alias in ("chunk_num", "Chunk_Number"): pl.pop(alias, None) out.append(pl) + return out \ No newline at end of file diff --git a/app/core/ingestion/ingestion_db.py b/app/core/ingestion/ingestion_db.py index 9acf096..64cd57f 100644 --- a/app/core/ingestion/ingestion_db.py +++ b/app/core/ingestion/ingestion_db.py @@ -1,31 +1,39 @@ """ FILE: app/core/ingestion/ingestion_db.py DESCRIPTION: Datenbank-Schnittstelle für Note-Metadaten und Artefakt-Prüfung. + WP-14: Umstellung auf zentrale database-Infrastruktur. """ from typing import Optional, Tuple from qdrant_client import QdrantClient from qdrant_client.http import models as rest +# Import der modularisierten Namen-Logik zur Sicherstellung der Konsistenz +from app.core.database import collection_names + def fetch_note_payload(client: QdrantClient, prefix: str, note_id: str) -> Optional[dict]: """Holt die Metadaten einer Note aus Qdrant via Scroll.""" + notes_col, _, _ = collection_names(prefix) try: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - pts, _ = client.scroll(collection_name=f"{prefix}_notes", scroll_filter=f, limit=1, with_payload=True) + pts, _ = client.scroll(collection_name=notes_col, scroll_filter=f, limit=1, with_payload=True) return pts[0].payload if pts else None except: return None def artifacts_missing(client: QdrantClient, prefix: str, note_id: str) -> Tuple[bool, bool]: """Prüft Qdrant aktiv auf vorhandene Chunks und Edges.""" + _, chunks_col, edges_col = collection_names(prefix) try: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - c_pts, _ = client.scroll(collection_name=f"{prefix}_chunks", scroll_filter=f, limit=1) - e_pts, _ = client.scroll(collection_name=f"{prefix}_edges", scroll_filter=f, limit=1) + c_pts, _ = client.scroll(collection_name=chunks_col, scroll_filter=f, limit=1) + e_pts, _ = client.scroll(collection_name=edges_col, scroll_filter=f, limit=1) return (not bool(c_pts)), (not bool(e_pts)) except: return True, True def purge_artifacts(client: QdrantClient, prefix: str, note_id: str): """Löscht verwaiste Chunks/Edges vor einem Re-Import.""" + _, chunks_col, edges_col = collection_names(prefix) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - for suffix in ["chunks", "edges"]: - try: client.delete(collection_name=f"{prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f)) + # Iteration über die nun zentral verwalteten Collection-Namen + for col in [chunks_col, edges_col]: + try: client.delete(collection_name=col, points_selector=rest.FilterSelector(filter=f)) except: pass \ No newline at end of file diff --git a/app/core/ingestion/ingestion_note_payload.py b/app/core/ingestion/ingestion_note_payload.py index 28c5301..d41410b 100644 --- a/app/core/ingestion/ingestion_note_payload.py +++ b/app/core/ingestion/ingestion_note_payload.py @@ -3,8 +3,8 @@ FILE: app/core/ingestion/ingestion_note_payload.py DESCRIPTION: Baut das JSON-Objekt für mindnet_notes. FEATURES: - Multi-Hash (body/full) für flexible Change Detection. - - Fix v2.4.3: Vollständiger Audit-Check (Env-Vars, JSON-Validation, Edge-Defaults). -VERSION: 2.4.3 + - Fix v2.4.4: Integration der zentralen Registry (WP-14) für konsistente Defaults. +VERSION: 2.4.4 STATUS: Active """ from __future__ import annotations @@ -14,6 +14,9 @@ import json import pathlib import hashlib +# Import der zentralen Registry-Logik +from app.core.registry import load_type_registry + # --------------------------------------------------------------------------- # Helper # --------------------------------------------------------------------------- @@ -42,12 +45,13 @@ def _compute_hash(content: str) -> str: return hashlib.sha256(content.encode("utf-8")).hexdigest() def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: - """Generiert den Hash-Input-String.""" + """Generiert den Hash-Input-String basierend auf Body oder Metadaten.""" body = str(n.get("body") or "") if mode == "body": return body if mode == "full": fm = n.get("frontmatter") or {} meta_parts = [] + # Sortierte Liste für deterministische Hashes for k in sorted(["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight"]): val = fm.get(k) if val is not None: meta_parts.append(f"{k}:{val}") @@ -55,13 +59,13 @@ def _get_hash_source_content(n: Dict[str, Any], mode: str) -> str: return body def _cfg_for_type(note_type: str, reg: dict) -> dict: - """Extrahiert Typ-spezifische Config.""" + """Extrahiert Typ-spezifische Config aus der Registry.""" if not isinstance(reg, dict): return {} types = reg.get("types") if isinstance(reg.get("types"), dict) else reg return types.get(note_type, {}) if isinstance(types, dict) else {} def _cfg_defaults(reg: dict) -> dict: - """Extrahiert globale Default-Werte.""" + """Extrahiert globale Default-Werte aus der Registry.""" if not isinstance(reg, dict): return {} for key in ("defaults", "default", "global"): v = reg.get(key) @@ -73,9 +77,14 @@ def _cfg_defaults(reg: dict) -> dict: # --------------------------------------------------------------------------- def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: - """Baut das Note-Payload inklusive Multi-Hash und Audit-Validierung.""" + """ + Baut das Note-Payload inklusive Multi-Hash und Audit-Validierung. + WP-14: Nutzt nun die zentrale Registry für alle Fallbacks. + """ n = _as_dict(note) - reg = kwargs.get("types_cfg") or {} + + # Nutzt übergebene Registry oder lädt sie global + reg = kwargs.get("types_cfg") or load_type_registry() hash_source = kwargs.get("hash_source", "parsed") hash_normalize = kwargs.get("hash_normalize", "canonical") @@ -84,21 +93,26 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: cfg_type = _cfg_for_type(note_type, reg) cfg_def = _cfg_defaults(reg) + ingest_cfg = reg.get("ingestion_settings", {}) # --- retriever_weight Audit --- + # Priorität: Frontmatter -> Typ-Config -> globale Config -> Env-Var default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0)) retriever_weight = fm.get("retriever_weight") if retriever_weight is None: retriever_weight = cfg_type.get("retriever_weight", cfg_def.get("retriever_weight", default_rw)) - try: retriever_weight = float(retriever_weight) - except: retriever_weight = default_rw + try: + retriever_weight = float(retriever_weight) + except: + retriever_weight = default_rw # --- chunk_profile Audit --- + # Nutzt nun primär die ingestion_settings aus der Registry chunk_profile = fm.get("chunking_profile") or fm.get("chunk_profile") if chunk_profile is None: - chunk_profile = cfg_type.get("chunking_profile") + chunk_profile = cfg_type.get("chunking_profile") or cfg_type.get("chunk_profile") if chunk_profile is None: - chunk_profile = cfg_def.get("chunking_profile", "sliding_standard") + chunk_profile = ingest_cfg.get("default_chunk_profile", cfg_def.get("chunking_profile", "sliding_standard")) # --- edge_defaults --- edge_defaults = fm.get("edge_defaults") @@ -124,17 +138,20 @@ def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]: } # --- MULTI-HASH --- + # Generiert Hashes für Change Detection for mode in ["body", "full"]: content = _get_hash_source_content(n, mode) payload["hashes"][f"{mode}:{hash_source}:{hash_normalize}"] = _compute_hash(content) - # Metadaten + # Metadaten Anreicherung tags = fm.get("tags") or fm.get("keywords") or n.get("tags") if tags: payload["tags"] = _ensure_list(tags) if fm.get("aliases"): payload["aliases"] = _ensure_list(fm.get("aliases")) + for k in ("created", "modified", "date"): v = fm.get(k) or n.get(k) if v: payload[k] = str(v) + if n.get("body"): payload["fulltext"] = str(n["body"]) # Final JSON Validation Audit diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 009f1fb..92a2a02 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -1,11 +1,11 @@ """ FILE: app/core/ingestion/ingestion_processor.py DESCRIPTION: Der zentrale IngestionService (Orchestrator). - WP-14: Vollständig modularisiert. + WP-14: Modularisierung der Datenbank-Ebene (app.core.database). WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v2.13.7: Synchronisierung des Context-Scanners mit der Registry (WP-14). -VERSION: 2.13.7 + AUDIT v2.13.10: Umstellung auf app.core.database Infrastruktur. +VERSION: 2.13.10 STATUS: Active """ import logging @@ -19,8 +19,10 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks -from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes -from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch + +# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene +from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes +from app.core.database.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch # Services from app.services.embeddings_client import EmbeddingsClient @@ -44,12 +46,13 @@ logger = logging.getLogger(__name__) class IngestionService: def __init__(self, collection_prefix: str = None): - """Initialisiert den Service und stellt die DB-Verbindung bereit.""" + """Initialisiert den Service und nutzt die neue database-Infrastruktur.""" from app.config import get_settings self.settings = get_settings() self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() + # Synchronisierung der Konfiguration mit dem Instanz-Präfix self.cfg.prefix = self.prefix self.client = get_client(self.cfg) self.dim = self.settings.VECTOR_SIZE @@ -61,6 +64,7 @@ class IngestionService: self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache try: + # Aufruf der modularisierten Schema-Logik ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: @@ -75,8 +79,7 @@ class IngestionService: logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") for path in file_paths: try: - # ANPASSUNG: Übergabe der Registry für dynamische Scan-Parameter (WP-14) - # Ermöglicht die Nutzung von summary_settings aus types.yaml + # Übergabe der Registry für dynamische Scan-Tiefe ctx = pre_scan_markdown(path, registry=self.registry) if ctx: # Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname) @@ -110,7 +113,7 @@ class IngestionService: except Exception as e: return {**result, "error": f"Validation failed: {str(e)}"} - # Dynamischer Lifecycle-Filter aus der Registry + # Dynamischer Lifecycle-Filter aus der Registry (WP-14) ingest_cfg = self.registry.get("ingestion_settings", {}) ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"]) @@ -180,7 +183,7 @@ class IngestionService: context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")} ) - # 4. DB Upsert + # 4. DB Upsert via modularisierter Points-Logik if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) diff --git a/app/core/ingestion/ingestion_validation.py b/app/core/ingestion/ingestion_validation.py index 038eebf..f7eea5c 100644 --- a/app/core/ingestion/ingestion_validation.py +++ b/app/core/ingestion/ingestion_validation.py @@ -1,11 +1,15 @@ """ FILE: app/core/ingestion/ingestion_validation.py DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache. + AUDIT v2.12.3: Integration der zentralen Text-Bereinigung (WP-14). """ import logging from typing import Dict, Any from app.core.parser import NoteContext +# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports +from app.core.registry import clean_llm_text + logger = logging.getLogger(__name__) async def validate_edge_candidate( @@ -15,7 +19,10 @@ async def validate_edge_candidate( llm_service: Any, provider: str ) -> bool: - """WP-15b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache.""" + """ + WP-15b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache. + Nutzt clean_llm_text zur Entfernung von Steuerzeichen vor der Auswertung. + """ target_id = edge.get("to") target_ctx = batch_cache.get(target_id) @@ -40,7 +47,13 @@ async def validate_edge_candidate( edge_kind=edge.get("kind", "related_to") ) - response = await llm_service.generate_raw_response(prompt, priority="background") + # Die Antwort vom Service anfordern + raw_response = await llm_service.generate_raw_response(prompt, priority="background") + + # WP-14 Fix: Zusätzliche Bereinigung zur Sicherstellung der Interpretierbarkeit + response = clean_llm_text(raw_response) + + # Semantische Prüfung des Ergebnisses is_valid = "YES" in response.upper() if is_valid: @@ -50,4 +63,5 @@ async def validate_edge_candidate( return is_valid except Exception as e: logger.warning(f"⚠️ Validation error for {target_id}: {e}") + # Im Zweifel (Timeout/Fehler) erlauben wir die Kante, um Datenverlust zu vermeiden return True \ No newline at end of file diff --git a/app/core/qdrant.py b/app/core/qdrant.py index 950a75d..80f1c85 100644 --- a/app/core/qdrant.py +++ b/app/core/qdrant.py @@ -1,161 +1,22 @@ """ FILE: app/core/qdrant.py -DESCRIPTION: Qdrant-Client Factory und Schema-Management. Erstellt Collections und Payload-Indizes. -VERSION: 2.2.0 -STATUS: Active -DEPENDENCIES: qdrant_client, dataclasses, os -LAST_ANALYSIS: 2025-12-15 +DESCRIPTION: Proxy-Modul zur Aufrechterhaltung der Abwärtskompatibilität (WP-14). + Leitet alle Aufrufe an das neue database-Paket weiter. +STATUS: Proxy (Legacy-Support) """ -from __future__ import annotations - -import os -from dataclasses import dataclass -from typing import Optional, Tuple, Dict, List - -from qdrant_client import QdrantClient -from qdrant_client.http import models as rest - - -# --------------------------------------------------------------------------- -# Konfiguration -# --------------------------------------------------------------------------- - -@dataclass -class QdrantConfig: - host: Optional[str] = None - port: Optional[int] = None - url: Optional[str] = None - api_key: Optional[str] = None - prefix: str = "mindnet" - dim: int = 384 - distance: str = "Cosine" # Cosine | Dot | Euclid - on_disk_payload: bool = True - - @classmethod - def from_env(cls) -> "QdrantConfig": - # Entweder URL ODER Host/Port, API-Key optional - url = os.getenv("QDRANT_URL") or None - host = os.getenv("QDRANT_HOST") or None - port = os.getenv("QDRANT_PORT") - port = int(port) if port else None - api_key = os.getenv("QDRANT_API_KEY") or None - prefix = os.getenv("COLLECTION_PREFIX") or "mindnet" - dim = int(os.getenv("VECTOR_DIM") or 384) - distance = os.getenv("DISTANCE", "Cosine") - on_disk_payload = (os.getenv("ON_DISK_PAYLOAD", "true").lower() == "true") - return cls( - host=host, port=port, url=url, api_key=api_key, - prefix=prefix, dim=dim, distance=distance, on_disk_payload=on_disk_payload - ) - - -def get_client(cfg: QdrantConfig) -> QdrantClient: - # QdrantClient akzeptiert entweder url=... oder host/port - if cfg.url: - return QdrantClient(url=cfg.url, api_key=cfg.api_key, timeout=60.0) - return QdrantClient(host=cfg.host or "127.0.0.1", port=cfg.port or 6333, api_key=cfg.api_key, timeout=60.0) - - -# --------------------------------------------------------------------------- -# Collections -# --------------------------------------------------------------------------- - -def collection_names(prefix: str) -> Tuple[str, str, str]: - return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" - - -def _vector_params(dim: int, distance: str) -> rest.VectorParams: - # Distance: "Cosine" | "Dot" | "Euclid" - dist = getattr(rest.Distance, distance.capitalize(), rest.Distance.COSINE) - return rest.VectorParams(size=dim, distance=dist) - - -def ensure_collections(client: QdrantClient, prefix: str, dim: int) -> None: - """Legt mindnet_notes, mindnet_chunks, mindnet_edges an (falls nicht vorhanden).""" - notes, chunks, edges = collection_names(prefix) - - # notes - if not client.collection_exists(notes): - client.create_collection( - collection_name=notes, - vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")), - on_disk_payload=True, - ) - # chunks - if not client.collection_exists(chunks): - client.create_collection( - collection_name=chunks, - vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")), - on_disk_payload=True, - ) - # edges (Dummy-Vektor, Filter via Payload) - if not client.collection_exists(edges): - client.create_collection( - collection_name=edges, - vectors_config=_vector_params(1, "Dot"), - on_disk_payload=True, - ) - - -# --------------------------------------------------------------------------- -# Payload-Indizes -# --------------------------------------------------------------------------- - -def _ensure_index(client: QdrantClient, collection: str, field: str, schema: rest.PayloadSchemaType) -> None: - """Idempotentes Anlegen eines Payload-Indexes für ein Feld.""" - try: - client.create_payload_index(collection_name=collection, field_name=field, field_schema=schema, wait=True) - except Exception as e: - # Fehler ignorieren, falls Index bereits existiert oder Server "already indexed" meldet. - # Für Debugging ggf. Logging ergänzen. - _ = e - - -def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None: - """ - Stellt sicher, dass alle benötigten Payload-Indizes existieren. - - notes: note_id(KEYWORD), type(KEYWORD), title(TEXT), updated(INTEGER), tags(KEYWORD) - - chunks: note_id(KEYWORD), chunk_id(KEYWORD), index(INTEGER), type(KEYWORD), tags(KEYWORD) - - edges: note_id(KEYWORD), kind(KEYWORD), scope(KEYWORD), source_id(KEYWORD), target_id(KEYWORD), chunk_id(KEYWORD) - """ - notes, chunks, edges = collection_names(prefix) - - # NOTES - for field, schema in [ - ("note_id", rest.PayloadSchemaType.KEYWORD), - ("type", rest.PayloadSchemaType.KEYWORD), - ("title", rest.PayloadSchemaType.TEXT), - ("updated", rest.PayloadSchemaType.INTEGER), - ("tags", rest.PayloadSchemaType.KEYWORD), - ]: - _ensure_index(client, notes, field, schema) - - # CHUNKS - for field, schema in [ - ("note_id", rest.PayloadSchemaType.KEYWORD), - ("chunk_id", rest.PayloadSchemaType.KEYWORD), - ("index", rest.PayloadSchemaType.INTEGER), - ("type", rest.PayloadSchemaType.KEYWORD), - ("tags", rest.PayloadSchemaType.KEYWORD), - ]: - _ensure_index(client, chunks, field, schema) - - # EDGES - for field, schema in [ - ("note_id", rest.PayloadSchemaType.KEYWORD), - ("kind", rest.PayloadSchemaType.KEYWORD), - ("scope", rest.PayloadSchemaType.KEYWORD), - ("source_id", rest.PayloadSchemaType.KEYWORD), - ("target_id", rest.PayloadSchemaType.KEYWORD), - ("chunk_id", rest.PayloadSchemaType.KEYWORD), - ]: - _ensure_index(client, edges, field, schema) - +from .database.qdrant import ( + QdrantConfig, + get_client, + ensure_collections, + ensure_payload_indexes, + collection_names +) +# Re-Export für 100% Kompatibilität __all__ = [ "QdrantConfig", "get_client", "ensure_collections", "ensure_payload_indexes", "collection_names", -] +] \ No newline at end of file diff --git a/app/core/qdrant_points.py b/app/core/qdrant_points.py index 9c4b878..d136232 100644 --- a/app/core/qdrant_points.py +++ b/app/core/qdrant_points.py @@ -1,292 +1,24 @@ """ FILE: app/core/qdrant_points.py -DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) in PointStructs und generiert deterministische UUIDs. -VERSION: 1.5.0 -STATUS: Active -DEPENDENCIES: qdrant_client, uuid, os -LAST_ANALYSIS: 2025-12-15 +DESCRIPTION: Proxy-Modul zur Aufrechterhaltung der Abwärtskompatibilität (WP-14). + Leitet Point-Operationen an das neue database-Paket weiter. +STATUS: Proxy (Legacy-Support) """ -from __future__ import annotations -import os -import uuid -from typing import List, Tuple, Iterable, Optional, Dict, Any +from .database.qdrant_points import ( + points_for_note, + points_for_chunks, + points_for_edges, + upsert_batch, + get_edges_for_sources, + search_chunks_by_vector +) -from qdrant_client.http import models as rest -from qdrant_client import QdrantClient - -# --------------------- ID helpers --------------------- - -def _to_uuid(stable_key: str) -> str: - return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) - -def _names(prefix: str) -> Tuple[str, str, str]: - return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" - -# --------------------- Points builders --------------------- - -def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]: - notes_col, _, _ = _names(prefix) - vector = note_vec if note_vec is not None else [0.0] * int(dim) - raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" - point_id = _to_uuid(raw_note_id) - pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) - return notes_col, [pt] - -def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]: - _, chunks_col, _ = _names(prefix) - points: List[rest.PointStruct] = [] - for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): - chunk_id = pl.get("chunk_id") or pl.get("id") - if not chunk_id: - note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" - chunk_id = f"{note_id}#{i}" - pl["chunk_id"] = chunk_id - point_id = _to_uuid(chunk_id) - points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) - return chunks_col, points - -def _normalize_edge_payload(pl: dict) -> dict: - kind = pl.get("kind") or pl.get("edge_type") or "edge" - source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src" - target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt" - seq = pl.get("seq") or pl.get("order") or pl.get("index") - - pl.setdefault("kind", kind) - pl.setdefault("source_id", source_id) - pl.setdefault("target_id", target_id) - if seq is not None and "seq" not in pl: - pl["seq"] = seq - return pl - -def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: - _, _, edges_col = _names(prefix) - points: List[rest.PointStruct] = [] - for raw in edge_payloads: - pl = _normalize_edge_payload(raw) - edge_id = pl.get("edge_id") - if not edge_id: - kind = pl.get("kind", "edge") - s = pl.get("source_id", "unknown-src") - t = pl.get("target_id", "unknown-tgt") - seq = pl.get("seq") or "" - edge_id = f"{kind}:{s}->{t}#{seq}" - pl["edge_id"] = edge_id - point_id = _to_uuid(edge_id) - points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) - return edges_col, points - -# --------------------- Vector schema & overrides --------------------- - -def _preferred_name(candidates: List[str]) -> str: - for k in ("text", "default", "embedding", "content"): - if k in candidates: - return k - return sorted(candidates)[0] - -def _env_override_for_collection(collection: str) -> Optional[str]: - """ - Returns: - - "__single__" to force single-vector - - concrete name (str) to force named-vector with that name - - None to auto-detect - """ - base = os.getenv("MINDNET_VECTOR_NAME") - if collection.endswith("_notes"): - base = os.getenv("NOTES_VECTOR_NAME", base) - elif collection.endswith("_chunks"): - base = os.getenv("CHUNKS_VECTOR_NAME", base) - elif collection.endswith("_edges"): - base = os.getenv("EDGES_VECTOR_NAME", base) - - if not base: - return None - val = base.strip() - if val.lower() in ("__single__", "single"): - return "__single__" - return val # concrete name - -def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict: - """ - Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}. - """ - try: - info = client.get_collection(collection_name=collection_name) - vecs = getattr(info, "vectors", None) - # Single-vector config - if hasattr(vecs, "size") and isinstance(vecs.size, int): - return {"kind": "single", "size": vecs.size} - # Named-vectors config (dict-like in .config) - cfg = getattr(vecs, "config", None) - if isinstance(cfg, dict) and cfg: - names = list(cfg.keys()) - if names: - return {"kind": "named", "names": names, "primary": _preferred_name(names)} - except Exception: - pass - return {"kind": "single", "size": None} - -def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruct]: - out: List[rest.PointStruct] = [] - for pt in points: - vec = getattr(pt, "vector", None) - if isinstance(vec, dict): - if name in vec: - out.append(pt) - else: - # take any existing entry; if empty dict fallback to [0.0] - fallback_vec = None - try: - fallback_vec = list(next(iter(vec.values()))) - except Exception: - fallback_vec = [0.0] - out.append(rest.PointStruct(id=pt.id, vector={name: fallback_vec}, payload=pt.payload)) - elif vec is not None: - out.append(rest.PointStruct(id=pt.id, vector={name: vec}, payload=pt.payload)) - else: - out.append(pt) - return out - -# --------------------- Qdrant ops --------------------- - -def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None: - if not points: - return - - # 1) ENV overrides come first - override = _env_override_for_collection(collection) - if override == "__single__": - client.upsert(collection_name=collection, points=points, wait=True) - return - elif isinstance(override, str): - client.upsert(collection_name=collection, points=_as_named(points, override), wait=True) - return - - # 2) Auto-detect schema - schema = _get_vector_schema(client, collection) - if schema.get("kind") == "named": - name = schema.get("primary") or _preferred_name(schema.get("names") or []) - client.upsert(collection_name=collection, points=_as_named(points, name), wait=True) - return - - # 3) Fallback single-vector - client.upsert(collection_name=collection, points=points, wait=True) - -# --- Optional search helpers --- - -def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: - return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values]) - -def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: - fs = [f for f in filters if f is not None] - if not fs: - return None - if len(fs) == 1: - return fs[0] - must = [] - for f in fs: - if getattr(f, "must", None): - must.extend(f.must) - if getattr(f, "should", None): - must.append(rest.Filter(should=f.should)) - return rest.Filter(must=must) - -def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: - if not filters: - return None - parts = [] - for k, v in filters.items(): - if isinstance(v, (list, tuple, set)): - parts.append(_filter_any(k, [str(x) for x in v])) - else: - parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))])) - return _merge_filters(*parts) - -def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]: - _, chunks_col, _ = _names(prefix) - flt = _filter_from_dict(filters) - res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt) - out: List[Tuple[str, float, dict]] = [] - for r in res: - out.append((str(r.id), float(r.score), dict(r.payload or {}))) - return out - - -# --- Edge retrieval helper --- - -def get_edges_for_sources( - client: QdrantClient, - prefix: str, - source_ids: Iterable[str], - edge_types: Optional[Iterable[str]] = None, - limit: int = 2048, -) -> List[Dict[str, Any]]: - """Retrieve edge payloads from the _edges collection. - - Args: - client: QdrantClient instance. - prefix: Mindnet collection prefix (e.g. "mindnet"). - source_ids: Iterable of source_id values (typically chunk_ids or note_ids). - edge_types: Optional iterable of edge kinds (e.g. ["references", "depends_on"]). If None, - all kinds are returned. - limit: Maximum number of edge payloads to return. - - Returns: - A list of edge payload dicts, e.g.: - { - "note_id": "...", - "chunk_id": "...", - "kind": "references" | "depends_on" | ..., - "scope": "chunk", - "source_id": "...", - "target_id": "...", - "rule_id": "...", - "confidence": 0.7, - ... - } - """ - source_ids = list(source_ids) - if not source_ids or limit <= 0: - return [] - - # Resolve collection name - _, _, edges_col = _names(prefix) - - # Build filter: source_id IN source_ids - src_filter = _filter_any("source_id", [str(s) for s in source_ids]) - - # Optional: kind IN edge_types - kind_filter = None - if edge_types: - kind_filter = _filter_any("kind", [str(k) for k in edge_types]) - - flt = _merge_filters(src_filter, kind_filter) - - out: List[Dict[str, Any]] = [] - next_page = None - remaining = int(limit) - - # Use paginated scroll API; we don't need vectors, only payloads. - while remaining > 0: - batch_limit = min(256, remaining) - res, next_page = client.scroll( - collection_name=edges_col, - scroll_filter=flt, - limit=batch_limit, - with_payload=True, - with_vectors=False, - offset=next_page, - ) - - if not res: - break - - for r in res: - out.append(dict(r.payload or {})) - remaining -= 1 - if remaining <= 0: - break - - if next_page is None or remaining <= 0: - break - - return out +# Re-Export für 100% Kompatibilität +__all__ = [ + "points_for_note", + "points_for_chunks", + "points_for_edges", + "upsert_batch", + "get_edges_for_sources", + "search_chunks_by_vector" +] \ No newline at end of file