diff --git a/app/core/database/qdrant_points.py b/app/core/database/qdrant_points.py index 3db3b6f..c943f36 100644 --- a/app/core/database/qdrant_points.py +++ b/app/core/database/qdrant_points.py @@ -1,9 +1,10 @@ """ FILE: app/core/database/qdrant_points.py -DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) in PointStructs und generiert deterministische UUIDs. -VERSION: 1.5.2 (WP-Fix: Atomic Consistency & Canonical Edge IDs) +DESCRIPTION: Object-Mapper für Qdrant. Konvertiert JSON-Payloads (Notes, Chunks, Edges) + in PointStructs und generiert deterministische UUIDs. +VERSION: 1.5.3 (WP-Fix: Centralized Identity Enforcement) STATUS: Active -DEPENDENCIES: qdrant_client, uuid, os +DEPENDENCIES: qdrant_client, uuid, os, app.core.graph.graph_utils LAST_ANALYSIS: 2026-01-10 """ from __future__ import annotations @@ -14,6 +15,9 @@ from typing import List, Tuple, Iterable, Optional, Dict, Any from qdrant_client.http import models as rest from qdrant_client import QdrantClient +# WP-24c: Import der zentralen Identitäts-Logik zur Vermeidung von ID-Drift +from app.core.graph.graph_utils import _mk_edge_id + # --------------------- ID helpers --------------------- def _to_uuid(stable_key: str) -> str: @@ -26,19 +30,29 @@ def _to_uuid(stable_key: str) -> str: return str(uuid.uuid5(uuid.NAMESPACE_URL, str(stable_key))) def _names(prefix: str) -> Tuple[str, str, str]: + """Interne Auflösung der Collection-Namen basierend auf dem Präfix.""" return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" # --------------------- Points builders --------------------- def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]: + """Konvertiert Note-Metadaten in Qdrant Points.""" notes_col, _, _ = _names(prefix) + # Nutzt Null-Vektor als Fallback, falls kein Embedding vorhanden ist vector = note_vec if note_vec is not None else [0.0] * int(dim) + raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" point_id = _to_uuid(raw_note_id) - pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) + + pt = rest.PointStruct( + id=point_id, + vector=vector, + payload=note_payload + ) return notes_col, [pt] def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]: + """Konvertiert Chunks und deren Vektoren in Qdrant Points.""" _, chunks_col, _ = _names(prefix) points: List[rest.PointStruct] = [] for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): @@ -47,8 +61,13 @@ def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[Lis note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" chunk_id = f"{note_id}#{i}" pl["chunk_id"] = chunk_id + point_id = _to_uuid(chunk_id) - points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) + points.append(rest.PointStruct( + id=point_id, + vector=vec, + payload=pl + )) return chunks_col, points def _normalize_edge_payload(pl: dict) -> dict: @@ -76,30 +95,54 @@ def _normalize_edge_payload(pl: dict) -> dict: def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: """ Konvertiert Kanten-Payloads in PointStructs. - WP-24c: Nutzt strikte ID-Kanonisierung für die Symmetrie-Integrität. + WP-24c Audit v1.5.3: Nutzt die zentrale _mk_edge_id Funktion aus graph_utils. + Dies eliminiert den ID-Drift zwischen manuellen und virtuellen Kanten. """ _, _, edges_col = _names(prefix) points: List[rest.PointStruct] = [] + for raw in edge_payloads: pl = _normalize_edge_payload(raw) - # WP-24c: Deterministische ID-Generierung zur Kollisionsvermeidung + # Extraktion der Identitäts-Parameter kind = pl.get("kind", "edge") s = pl.get("source_id", "unknown-src") t = pl.get("target_id", "unknown-tgt") scope = pl.get("scope", "note") - # Stabiler Schlüssel für UUIDv5 - edge_id = f"edge:{kind}:{s}:{t}:{scope}" - pl["edge_id"] = edge_id + # Optionale Differenzierung (falls von graph_derive_edges gesetzt) + rule_id = pl.get("rule_id") + variant = pl.get("variant") - point_id = _to_uuid(edge_id) - points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) + try: + # Aufruf der Single-Source-of-Truth für IDs + point_id = _mk_edge_id( + kind=kind, + s=s, + t=t, + scope=scope, + rule_id=rule_id, + variant=variant + ) + + # Synchronisierung des Payloads mit der berechneten ID + pl["edge_id"] = point_id + + points.append(rest.PointStruct( + id=point_id, + vector=[0.0], + payload=pl + )) + except ValueError as e: + # Fehlerhaft definierte Kanten werden übersprungen, um Pydantic-Crashes zu vermeiden + continue + return edges_col, points # --------------------- Vector schema & overrides --------------------- def _preferred_name(candidates: List[str]) -> str: + """Ermittelt den primären Vektor-Namen aus einer Liste von Kandidaten.""" for k in ("text", "default", "embedding", "content"): if k in candidates: return k @@ -107,10 +150,11 @@ def _preferred_name(candidates: List[str]) -> str: def _env_override_for_collection(collection: str) -> Optional[str]: """ + Prüft auf Umgebungsvariablen-Overrides für Vektor-Namen. Returns: - - "__single__" to force single-vector - - concrete name (str) to force named-vector with that name - - None to auto-detect + - "__single__" für erzwungenen Single-Vector Modus + - Name (str) für spezifischen Named-Vector + - None für automatische Erkennung """ base = os.getenv("MINDNET_VECTOR_NAME") if collection.endswith("_notes"): @@ -125,19 +169,17 @@ def _env_override_for_collection(collection: str) -> Optional[str]: val = base.strip() if val.lower() in ("__single__", "single"): return "__single__" - return val # concrete name + return val def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict: - """ - Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}. - """ + """Ermittelt das Vektor-Schema einer existierenden Collection via API.""" try: info = client.get_collection(collection_name=collection_name) vecs = getattr(info, "vectors", None) - # Single-vector config + # Prüfung auf Single-Vector Konfiguration if hasattr(vecs, "size") and isinstance(vecs.size, int): return {"kind": "single", "size": vecs.size} - # Named-vectors config (dict-like in .config) + # Prüfung auf Named-Vectors Konfiguration cfg = getattr(vecs, "config", None) if isinstance(cfg, dict) and cfg: names = list(cfg.keys()) @@ -148,6 +190,7 @@ def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict: return {"kind": "single", "size": None} def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruct]: + """Transformiert PointStructs in das Named-Vector Format.""" out: List[rest.PointStruct] = [] for pt in points: vec = getattr(pt, "vector", None) @@ -155,7 +198,6 @@ def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruc if name in vec: out.append(pt) else: - # take any existing entry; if empty dict fallback to [0.0] fallback_vec = None try: fallback_vec = list(next(iter(vec.values()))) @@ -172,13 +214,14 @@ def _as_named(points: List[rest.PointStruct], name: str) -> List[rest.PointStruc def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct], wait: bool = True) -> None: """ - Schreibt Points in eine Collection. - WP-Fix: Unterstützt den 'wait' Parameter (Default True für Kompatibilität zu v1.5.1). + Schreibt Points hocheffizient in eine Collection. + Unterstützt automatische Schema-Erkennung und Named-Vector Transformation. + WP-Fix: 'wait=True' ist Default für Datenkonsistenz zwischen den Ingest-Phasen. """ if not points: return - # 1) ENV overrides come first + # 1) ENV overrides prüfen override = _env_override_for_collection(collection) if override == "__single__": client.upsert(collection_name=collection, points=points, wait=wait) @@ -187,22 +230,24 @@ def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointS client.upsert(collection_name=collection, points=_as_named(points, override), wait=wait) return - # 2) Auto-detect schema + # 2) Automatische Schema-Erkennung (Live-Check) schema = _get_vector_schema(client, collection) if schema.get("kind") == "named": name = schema.get("primary") or _preferred_name(schema.get("names") or []) client.upsert(collection_name=collection, points=_as_named(points, name), wait=wait) return - # 3) Fallback single-vector + # 3) Fallback: Single-Vector Upsert client.upsert(collection_name=collection, points=points, wait=wait) # --- Optional search helpers --- def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: + """Hilfsfunktion für händische Filter-Konstruktion (Logical OR).""" return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values]) def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: + """Führt mehrere Filter-Objekte zu einem konsolidierten Filter zusammen.""" fs = [f for f in filters if f is not None] if not fs: return None @@ -217,6 +262,7 @@ def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: return rest.Filter(must=must) def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: + """Konvertiert ein Python-Dict in ein Qdrant-Filter Objekt.""" if not filters: return None parts = [] @@ -228,9 +274,17 @@ def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter return _merge_filters(*parts) def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]: + """Sucht semantisch ähnliche Chunks in der Vektordatenbank.""" _, chunks_col, _ = _names(prefix) flt = _filter_from_dict(filters) - res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt) + res = client.search( + collection_name=chunks_col, + query_vector=vector, + limit=top, + with_payload=True, + with_vectors=False, + query_filter=flt + ) out: List[Tuple[str, float, dict]] = [] for r in res: out.append((str(r.id), float(r.score), dict(r.payload or {}))) @@ -246,18 +300,18 @@ def get_edges_for_sources( edge_types: Optional[Iterable[str]] = None, limit: int = 2048, ) -> List[Dict[str, Any]]: - """Retrieve edge payloads from the _edges collection.""" + """Ruft alle Kanten ab, die von einer Menge von Quell-Notizen ausgehen.""" source_ids = list(source_ids) if not source_ids or limit <= 0: return [] - # Resolve collection name + # Namen der Edges-Collection auflösen _, _, edges_col = _names(prefix) - # Build filter: source_id IN source_ids + # Filter-Bau: source_id IN source_ids src_filter = _filter_any("source_id", [str(s) for s in source_ids]) - # Optional: kind IN edge_types + # Optionaler Filter auf den Kanten-Typ kind_filter = None if edge_types: kind_filter = _filter_any("kind", [str(k) for k in edge_types]) @@ -268,7 +322,7 @@ def get_edges_for_sources( next_page = None remaining = int(limit) - # Use paginated scroll API + # Paginated Scroll API (NUR Payload, keine Vektoren) while remaining > 0: batch_limit = min(256, remaining) res, next_page = client.scroll( diff --git a/app/core/graph/graph_utils.py b/app/core/graph/graph_utils.py index a05982b..cb0d371 100644 --- a/app/core/graph/graph_utils.py +++ b/app/core/graph/graph_utils.py @@ -1,12 +1,11 @@ """ FILE: app/core/graph/graph_utils.py DESCRIPTION: Basale Werkzeuge, ID-Generierung und Provenance-Konfiguration für den Graphen. - AUDIT v1.6.1: - - Wiederherstellung der Funktion '_edge' (Fix für ImportError). - - Rückkehr zu UUIDv5 für Qdrant-Kompatibilität (Fix für Pydantic-Crash). - - Beibehaltung der Section-Logik (variant) in der ID-Generierung. - - Integration der .env Pfad-Auflösung. -VERSION: 1.6.1 (WP-24c: Circular Dependency & Identity Fix) + AUDIT v1.6.2: + - Festlegung des globalen Standards für Kanten-IDs (WP-24c). + - Fix für ImportError (_edge Funktion wiederhergestellt). + - Integration der .env Pfad-Auflösung für Schema und Vokabular. +VERSION: 1.6.2 (WP-24c: Global Identity Standard) STATUS: Active """ import os @@ -19,7 +18,7 @@ try: except ImportError: yaml = None -# WP-15b: Prioritäten-Ranking für die De-Duplizierung +# WP-15b: Prioritäten-Ranking für die De-Duplizierung von Kanten unterschiedlicher Herkunft PROVENANCE_PRIORITY = { "explicit:wikilink": 1.00, "inline:rel": 0.95, @@ -29,7 +28,7 @@ PROVENANCE_PRIORITY = { "structure:order": 0.95, # next/prev "explicit:note_scope": 1.00, "derived:backlink": 0.90, - "edge_defaults": 0.70 # Heuristik (types.yaml) + "edge_defaults": 0.70 # Heuristik basierend auf types.yaml } # --------------------------------------------------------------------------- @@ -49,58 +48,29 @@ def get_schema_path() -> str: # --------------------------------------------------------------------------- def _get(d: dict, *keys, default=None): - """Sicherer Zugriff auf verschachtelte Keys.""" + """Sicherer Zugriff auf tief verschachtelte Dictionary-Keys.""" for k in keys: if isinstance(d, dict) and k in d and d[k] is not None: return d[k] return default def _dedupe_seq(seq: Iterable[str]) -> List[str]: - """Dedupliziert Strings unter Beibehaltung der Reihenfolge.""" + """Dedupliziert eine Sequenz von Strings unter Beibehaltung der Reihenfolge.""" seen: Set[str] = set() out: List[str] = [] for s in seq: if s not in seen: - seen.add(s); out.append(s) + seen.add(s) + out.append(s) return out -def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None, variant: Optional[str] = None) -> str: - """ - Erzeugt eine deterministische UUIDv5. - - WP-Fix: Wir nutzen UUIDv5 statt BLAKE2s-Hex, um 100% kompatibel zu den - Pydantic-Erwartungen von Qdrant (Step 1) zu bleiben. - """ - # Basis-String für den deterministischen Hash - base = f"edge:{kind}:{s}->{t}#{scope}" - if rule_id: - base += f"|{rule_id}" - if variant: - base += f"|{variant}" # Ermöglicht eindeutige IDs für verschiedene Abschnitte - - # Nutzt den URL-Namespace für deterministische UUIDs - return str(uuid.uuid5(uuid.NAMESPACE_URL, base)) - -def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: - """ - Konstruiert ein Kanten-Payload für Qdrant. - Wiederhergestellt v1.6.1 (Erforderlich für graph_derive_edges.py). - """ - pl = { - "kind": kind, - "relation": kind, - "scope": scope, - "source_id": source_id, - "target_id": target_id, - "note_id": note_id, - } - if extra: pl.update(extra) - return pl - def parse_link_target(raw: str, current_note_id: Optional[str] = None) -> Tuple[str, Optional[str]]: """ - Trennt [[Target#Section]] in Target und Section. - Behandelt Self-Links ('#Section'), indem current_note_id eingesetzt wird. + Trennt einen Obsidian-Link [[Target#Section]] in seine Bestandteile Target und Section. + Behandelt Self-Links (z.B. [[#Ziele]]), indem die aktuelle note_id eingesetzt wird. + + Returns: + Tuple (target_id, target_section) """ if not raw: return "", None @@ -109,35 +79,93 @@ def parse_link_target(raw: str, current_note_id: Optional[str] = None) -> Tuple[ target = parts[0].strip() section = parts[1].strip() if len(parts) > 1 else None + # Spezialfall: Self-Link innerhalb derselben Datei if not target and section and current_note_id: target = current_note_id return target, section +def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None, variant: Optional[str] = None) -> str: + """ + WP-24c: DER GLOBALE STANDARD für Kanten-IDs. + Erzeugt eine deterministische UUIDv5. Dies stellt sicher, dass manuelle Links + und systemgenerierte Symmetrien dieselbe Point-ID in Qdrant erhalten. + + Args: + kind: Typ der Relation (z.B. 'mastered_by') + s: Kanonische ID der Quell-Note + t: Kanonische ID der Ziel-Note + scope: Granularität (Standard: 'note') + rule_id: Optionale ID der Regel (aus graph_derive_edges) + variant: Optionale Variante für multiple Links zum selben Ziel + """ + if not all([kind, s, t]): + raise ValueError(f"Incomplete data for edge ID: kind={kind}, src={s}, tgt={t}") + + # STRENGER STANDARD: Nutzt Doppelpunkte als Trenner. + # Jede manuelle Änderung an diesem String-Format führt zu doppelten Kanten in der DB! + base = f"edge:{kind}:{s}:{t}:{scope}" + + if rule_id: + base += f":{rule_id}" + if variant: + base += f":{variant}" + + # Nutzt den URL-Namespace für deterministische Reproduzierbarkeit + return str(uuid.uuid5(uuid.NAMESPACE_URL, base)) + +def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: + """ + Konstruiert ein standardisiertes Kanten-Payload für Qdrant. + Wird von graph_derive_edges.py benötigt. + """ + pl = { + "kind": kind, + "relation": kind, + "scope": scope, + "source_id": source_id, + "target_id": target_id, + "note_id": note_id, + "virtual": False # Standardmäßig explizit, solange nicht anders in Phase 2 gesetzt + } + if extra: + pl.update(extra) + return pl + # --------------------------------------------------------------------------- # Registry Operations # --------------------------------------------------------------------------- def load_types_registry() -> dict: - """Lädt die YAML-Registry.""" + """ + Lädt die zentrale YAML-Registry (types.yaml). + Pfad wird über die Umgebungsvariable MINDNET_TYPES_FILE gesteuert. + """ p = os.getenv("MINDNET_TYPES_FILE", "./config/types.yaml") - if not os.path.isfile(p) or yaml is None: + if not os.path.isfile(p) or yaml is None: return {} try: - with open(p, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} - except Exception: + with open(p, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + return data if data is not None else {} + except Exception: return {} def get_edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: - """Ermittelt Standard-Kanten für einen Typ.""" + """ + Ermittelt die konfigurierten Standard-Kanten für einen Note-Typ. + Greift bei Bedarf auf die globalen Defaults in der Registry zurück. + """ types_map = reg.get("types", reg) if isinstance(reg, dict) else {} if note_type and isinstance(types_map, dict): - t = types_map.get(note_type) - if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): - return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] + t_cfg = types_map.get(note_type) + if isinstance(t_cfg, dict) and isinstance(t_cfg.get("edge_defaults"), list): + return [str(x) for x in t_cfg["edge_defaults"]] + + # Fallback auf globale Defaults for key in ("defaults", "default", "global"): v = reg.get(key) if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] + return [] \ No newline at end of file diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 5681612..aa2423d 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -4,10 +4,10 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.4.1: Strikte 2-Phasen-Strategie (Authority-First). - Lösung des Ghost-ID Problems via Cache-Resolution. - Fix für Pydantic 'None'-ID Crash. -VERSION: 3.4.1 (WP-24c: Robust Global Orchestration) + AUDIT v3.4.2: Strikte 2-Phasen-Strategie (Authority-First). + Lösung des Ghost-ID Problems & Pydantic-Crash Fix. + Zentralisierte ID-Generierung zur Vermeidung von Duplikaten. +VERSION: 3.4.2 (WP-24c: Unified ID Orchestration) STATUS: Active """ import logging @@ -22,8 +22,8 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks -# WP-24c: Import für die deterministische ID-Vorabberechnung aus graph_utils -from app.core.graph.graph_utils import _mk_edge_id +# WP-24c: Import der zentralen Identitäts-Logik und Pfad-Getter +from app.core.graph.graph_utils import _mk_edge_id, get_vocab_path, get_schema_path # Datenbank-Ebene (Modularisierte database-Infrastruktur) from app.core.database.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes @@ -56,10 +56,16 @@ class IngestionService: from app.config import get_settings self.settings = get_settings() - # --- LOGGING CLEANUP (Header-Noise unterdrücken, Business erhalten) --- + # --- LOGGING CLEANUP --- for lib in ["httpx", "httpcore", "qdrant_client", "urllib3", "openai"]: logging.getLogger(lib).setLevel(logging.WARNING) + # WP-24c: Explizite Initialisierung der Registry mit .env Pfaden + edge_registry.initialize( + vocab_path=get_vocab_path(), + schema_path=get_schema_path() + ) + self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() self.cfg.prefix = self.prefix @@ -73,7 +79,6 @@ class IngestionService: embed_cfg = self.llm.profiles.get("embedding_expert", {}) self.dim = embed_cfg.get("dimensions") or self.settings.VECTOR_SIZE - # Festlegen des Change-Detection Modus self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE # WP-15b: Kontext-Gedächtnis für ID-Auflösung (Globaler Cache) @@ -83,7 +88,6 @@ class IngestionService: self.symmetry_buffer: List[Dict[str, Any]] = [] try: - # Aufruf der modularisierten Schema-Logik ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: @@ -113,7 +117,6 @@ class IngestionService: if ctx: self.batch_cache[ctx.note_id] = ctx self.batch_cache[ctx.title] = ctx - # Auch Dateinamen ohne Endung auflösbar machen self.batch_cache[os.path.splitext(os.path.basename(path))[0]] = ctx except Exception as e: logger.warning(f" ⚠️ Pre-scan fehlgeschlagen für {path}: {e}") @@ -142,7 +145,6 @@ class IngestionService: Sorgt dafür, dass virtuelle Kanten niemals Nutzer-Autorität überschreiben. """ if not self.symmetry_buffer: - logger.info("⏭️ Symmetrie-Puffer leer. Keine Aktion erforderlich.") return {"status": "skipped", "reason": "buffer_empty"} logger.info(f"🔄 PHASE 2: Validiere {len(self.symmetry_buffer)} Symmetrien gegen Live-DB...") @@ -151,7 +153,7 @@ class IngestionService: src, tgt, kind = v_edge.get("note_id"), v_edge.get("target_id"), v_edge.get("kind") if not src or not tgt: continue - # Deterministische ID berechnen (WP-24c Standard) + # WP-Fix v3.4.2: NUTZUNG DER ZENTRALEN FUNKTION STATT MANUELLEM STRING try: v_id = _mk_edge_id(kind, src, tgt, "note") except ValueError: @@ -162,16 +164,14 @@ class IngestionService: final_virtuals.append(v_edge) logger.info(f" 🔄 [SYMMETRY] Add inverse: {src} --({kind})--> {tgt}") else: - logger.debug(f" 🛡️ Schutz: Manuelle Kante verhindert Symmetrie {v_id}") + logger.info(f" 🛡️ [PROTECTED] Manuelle Kante gefunden. Symmetrie für {kind} unterdrückt.") if final_virtuals: - logger.info(f"📤 Schreibe {len(final_virtuals)} geschützte Symmetrie-Kanten in Qdrant.") col, pts = points_for_edges(self.prefix, final_virtuals) - # Nutzt upsert_batch mit wait=True für atomare Konsistenz upsert_batch(self.client, col, pts, wait=True) count = len(final_virtuals) - self.symmetry_buffer.clear() # Puffer nach Commit leeren + self.symmetry_buffer.clear() return {"status": "success", "added": count} async def process_file(self, file_path: str, vault_root: str, **kwargs) -> Dict[str, Any]: @@ -201,7 +201,6 @@ class IngestionService: note_id = note_pl.get("note_id") if not note_id: - logger.warning(f" ⚠️ Keine ID für {file_path}. Überspringe.") return {**result, "status": "error", "error": "missing_id"} logger.info(f"📄 Bearbeite: '{note_id}'") @@ -229,10 +228,7 @@ class IngestionService: if not self._is_valid_id(t_id): continue if cand.get("provenance") == "global_pool" and enable_smart: - # LLM Logging - logger.info(f" ⚖️ [VALIDATING] Relation to '{t_id}' via Experts...") is_valid = await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm) - logger.info(f" 🧠 [SMART EDGE] {t_id} -> {'✅ OK' if is_valid else '❌ SKIP'}") if is_valid: new_pool.append(cand) else: new_pool.append(cand) @@ -283,11 +279,10 @@ class IngestionService: if explicit_edges: col_e, pts_e = points_for_edges(self.prefix, explicit_edges) - # WICHTIG: wait=True garantiert, dass die Kanten indiziert sind, bevor Phase 2 prüft upsert_batch(self.client, col_e, pts_e, wait=True) logger.info(f" ✨ Phase 1 fertig: {len(explicit_edges)} explizite Kanten für '{note_id}'.") - return {"status": "success", "note_id": note_id, "edges_count": len(explicit_edges)} + return {"status": "success", "note_id": note_id} except Exception as e: logger.error(f"❌ Fehler bei {file_path}: {e}", exc_info=True) diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index 15c0b6f..107372b 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -2,9 +2,9 @@ # -*- coding: utf-8 -*- """ FILE: scripts/import_markdown.py -VERSION: 2.6.0 (2026-01-10) +VERSION: 2.6.1 (2026-01-10) STATUS: Active (Core) -COMPATIBILITY: IngestionProcessor v3.4.1+ +COMPATIBILITY: IngestionProcessor v3.4.2+, graph_utils v1.6.2+ Zweck: ------- @@ -13,57 +13,53 @@ Qdrant Vektor-Datenbank. Das Script ist darauf optimiert, die strukturelle Integ Wissensgraphen zu wahren und die manuelle Nutzer-Autorität vor automatisierten System-Eingriffen zu schützen. -Hintergrund der 2-Phasen-Strategie (Authority-First): ------------------------------------------------------- -Um das Problem der "Ghost-IDs" und der asynchronen Überschreibungen zu lösen, implementiert -dieses Script eine strikte Trennung der Schreibvorgänge: +Hintergrund der 2-Phasen-Schreibstrategie (Authority-First): +------------------------------------------------------------ +Um das Problem der "Ghost-IDs" (Links auf Titel statt IDs) und der asynchronen Überschreibungen +(Symmetrien löschen manuelle Kanten) zu lösen, implementiert dieses Script eine strikte +Trennung der Arbeitsabläufe: -1. PHASE 1: Authority Processing (Batch-Modus) - - Alle Dateien werden gescannt und verarbeitet. - - Notizen, Chunks und explizite (vom Nutzer gesetzte) Kanten werden sofort geschrieben. - - Durch die Verwendung von 'wait=True' in der Datenbank-Layer wird sichergestellt, - dass diese Informationen physisch indiziert sind, bevor der nächste Schritt erfolgt. +1. PASS 1: Global Context Discovery (Pre-Scan) + - Scannt den gesamten Vault, um ein Mapping von Titeln/Dateinamen zu Note-IDs aufzubauen. + - Dieser Cache wird dem IngestionService übergeben, damit Wikilinks wie [[Klaus]] + während der Verarbeitung sofort in die korrekte Zeitstempel-ID (z.B. 202601031726-klaus) + aufgelöst werden können. + - Dies verhindert die Erzeugung falscher UUIDs durch unaufgelöste Bezeichnungen. + +2. PHASE 1: Authority Processing (Schreib-Durchlauf) + - Alle validen Dateien werden in Batches verarbeitet. + - Notizen, Chunks und explizite (vom Nutzer manuell gesetzte) Kanten werden sofort geschrieben. + - Durch die Verwendung von 'wait=True' in der Datenbank-Layer (qdrant_points) wird + sichergestellt, dass diese Informationen physisch indiziert sind, bevor Phase 2 startet. - Symmetrische Gegenkanten werden während dieser Phase lediglich im Speicher gepuffert. -2. PHASE 2: Global Symmetry Commitment (Finaler Schritt) +3. PHASE 2: Global Symmetry Commitment (Integritäts-Sicherung) - Erst nach Abschluss aller Batches wird die Methode commit_vault_symmetries() aufgerufen. - Diese prüft die gepufferten Symmetrie-Vorschläge gegen die bereits existierende Nutzer-Autorität in der Datenbank. - - Existiert bereits eine manuelle Kante für dieselbe Verbindung, wird die automatische - Symmetrie unterdrückt. + - Dank der in graph_utils v1.6.2 zentralisierten ID-Logik (_mk_edge_id) erkennt das + System Kollisionen hunderprozentig: Existiert bereits eine manuelle Kante für dieselbe + Verbindung, wird die automatische Symmetrie unterdrückt. Detaillierte Funktionsweise: ---------------------------- -1. PASS 1: Global Pre-Scan - - Scannt rekursiv alle Markdown-Dateien im Vault. - - Schließt System-Ordner wie .trash, .obsidian, .sync sowie Vorlagen konsequent aus. - - Extrahiert Note-Kontext (ID, Titel, Dateiname) ohne DB-Schreibzugriff. - - Füllt den LocalBatchCache im IngestionService, der als Single-Source-of-Truth für - die spätere Link-Auflösung (Kanonisierung) dient. - - Dies stellt sicher, dass Wikilinks wie [[Klaus]] korrekt zu Zeitstempel-IDs wie - 202601031726-klaus aufgelöst werden, BEVOR eine UUID für die Kante berechnet wird. - -2. PASS 2: Semantic Processing - - Verarbeitet Dateien in konfigurierten Batches (Standard: 20 Dateien). - - Implementiert Cloud-Resilienz durch Semaphoren (max. 5 parallele Zugriffe). - - Nutzt die Mixture of Experts (MoE) Architektur zur semantischen Validierung von Links. - - Führt eine Hash-basierte Change Detection durch, um unnötige Schreibvorgänge zu vermeiden. - - Schreibt die Ergebnisse (Notes, Chunks, Explicit Edges) konsistent nach Qdrant. +- Ordner-Filter: Schließt System-Ordner wie .trash, .obsidian, .sync sowie Vorlagen konsequent aus. +- Cloud-Resilienz: Implementiert Semaphoren zur Begrenzung paralleler API-Zugriffe (max. 5). +- Mixture of Experts (MoE): Nutzt LLM-Validierung zur intelligenten Zuweisung von Kanten. +- Change Detection: Vergleicht Hashes, um redundante Schreibvorgänge zu vermeiden. Ergebnis-Interpretation: ------------------------ -- Log-Ausgabe: Liefert detaillierte Informationen über den Fortschritt, LLM-Entscheidungen - und die finale Symmetrie-Validierung. -- Statistiken: Gibt am Ende eine Zusammenfassung über verarbeitete, übersprungene und - fehlerhafte Dateien aus. -- Dry-Run: Ohne den Parameter --apply werden keine physischen Änderungen an der Datenbank - vorgenommen, der gesamte Workflow (inkl. LLM-Anfragen) wird jedoch simuliert. +- Log-Ausgabe: Zeigt detailliert den Fortschritt, LLM-Entscheidungen (✅ OK / ❌ SKIP) + und den Status der Symmetrie-Injektion. +- Statistiken: Gibt am Ende eine Zusammenfassung über Erfolg, Übersprungene (Hash identisch) + und Fehler (z.B. fehlendes Frontmatter). Verwendung: ----------- -- Regelmäßiger Import nach Änderungen im Vault. -- Initialer Aufbau eines neuen Wissensgraphen. -- Erzwingung einer Re-Indizierung mittels --force. +- Initialer Aufbau: python3 -m scripts.import_markdown --vault /pfad/zum/vault --apply +- Update-Lauf: Das Script erkennt Änderungen automatisch via Change Detection. +- Erzwingung: Mit --force wird die Hash-Prüfung ignoriert und alles neu indiziert. """ import asyncio @@ -75,7 +71,7 @@ from pathlib import Path from typing import List, Dict, Any from dotenv import load_dotenv -# Root Logger Setup:INFO-Level für volle Transparenz der fachlichen Prozesse +# Root Logger Setup: INFO-Level für volle Transparenz der fachlichen Prozesse logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' @@ -101,6 +97,7 @@ async def main_async(args): return # 1. Initialisierung des zentralen Ingestion-Services + # Nutzt IngestionProcessor v3.4.2 (initialisiert Registry mit .env Pfaden) logger.info(f"Initializing IngestionService (Prefix: {args.prefix})") service = IngestionService(collection_prefix=args.prefix) @@ -125,12 +122,14 @@ async def main_async(args): # ========================================================================= # PASS 1: Global Pre-Scan # Ziel: Aufbau eines vollständigen Mappings von Bezeichnungen zu stabilen IDs. + # WICHTIG: Dies ist die Voraussetzung für die korrekte ID-Generierung in Phase 1. # ========================================================================= logger.info(f"🔍 [Pass 1] Global Pre-Scan: Building context cache for {len(files)} files...") for f_path in files: try: # Extrahiert Frontmatter und Metadaten ohne DB-Last - ctx = pre_scan_markdown(str(f_path)) + # Nutzt service.registry zur Typ-Auflösung + ctx = pre_scan_markdown(str(f_path), registry=service.registry) if ctx: # Mehrfache Indizierung für maximale Trefferrate bei Wikilinks service.batch_cache[ctx.note_id] = ctx @@ -152,8 +151,8 @@ async def main_async(args): """Kapselt den Prozess-Aufruf mit Ressourcen-Limitierung.""" async with sem: try: - # Verwendet process_file (v3.4.1), das explizite Kanten sofort schreibt - # und Symmetrien für Phase 2 im Service-Puffer sammelt. + # Verwendet process_file (v3.4.2), das explizite Kanten sofort schreibt. + # Symmetrien werden im Service-Puffer gesammelt und NICHT sofort geschrieben. return await service.process_file( file_path=str(f_path), vault_root=str(vault_path), @@ -195,16 +194,18 @@ async def main_async(args): # ========================================================================= # PHASE 2: Global Symmetry Commitment # Ziel: Finale Integrität. Triggert erst, wenn Phase 1 komplett indiziert ist. + # Verwendet die identische ID-Logik aus graph_utils v1.6.2. # ========================================================================= if args.apply: logger.info(f"🔄 [Phase 2] Starting global symmetry injection for the entire vault...") try: - # Diese Methode prüft den Puffer gegen die nun vollständige Datenbank + # Diese Methode prüft den Puffer gegen die nun vollständige Datenbank. + # Verhindert Duplikate bei der 'Steinzeitaxt' durch Authority-Lookup. sym_res = await service.commit_vault_symmetries() if sym_res.get("status") == "success": logger.info(f"✅ Phase 2 abgeschlossen. Hinzugefügt: {sym_res.get('added', 0)} geschützte Symmetrien.") else: - logger.info(f"⏭️ Phase 2 übersprungen: {sym_res.get('reason', 'Keine Daten')}") + logger.info(f"⏭️ Phase 2 übersprungen: {sym_res.get('reason', 'Keine Daten oder bereits vorhanden')}") except Exception as e: logger.error(f"❌ Fehler in Phase 2: {e}") else: @@ -219,9 +220,11 @@ def main(): # Standard-Präfix aus Umgebungsvariable oder Fallback default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") + # Optionaler Vault-Root aus .env + default_vault = os.getenv("MINDNET_VAULT_ROOT", "./vault") parser = argparse.ArgumentParser(description="Mindnet Ingester: Two-Phase Markdown Import") - parser.add_argument("--vault", default="./vault", help="Pfad zum Obsidian Vault") + parser.add_argument("--vault", default=default_vault, help="Pfad zum Obsidian Vault") parser.add_argument("--prefix", default=default_prefix, help="Qdrant Collection Präfix") parser.add_argument("--force", action="store_true", help="Erzwingt Neu-Indizierung aller Dateien") parser.add_argument("--apply", action="store_true", help="Schreibt physisch in die Datenbank")