diff --git a/app/core/graph/graph_utils.py b/app/core/graph/graph_utils.py index d0bd6a8..54acd36 100644 --- a/app/core/graph/graph_utils.py +++ b/app/core/graph/graph_utils.py @@ -2,12 +2,13 @@ FILE: app/core/graph/graph_utils.py DESCRIPTION: Basale Werkzeuge, ID-Generierung und Provenance-Konfiguration für den Graphen. WP-24c: Integration der EdgeRegistry für dynamische Topologie-Defaults. - AUDIT: Erweitert um parse_link_target für sauberes Section-Splitting. -VERSION: 1.1.0 (WP-24c: Dynamic Topology Implementation) + FIX v1.2.0: Umstellung auf deterministische UUIDs (Qdrant Kompatibilität). +VERSION: 1.2.0 STATUS: Active """ import os import hashlib +import uuid import logging from typing import Iterable, List, Optional, Set, Any, Tuple @@ -52,10 +53,8 @@ def _dedupe_seq(seq: Iterable[str]) -> List[str]: def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None, variant: Optional[str] = None) -> str: """ - Erzeugt eine deterministische 12-Byte ID mittels BLAKE2s. - - WP-Fix: 'variant' (z.B. Section) fließt in den Hash ein, um mehrere Kanten - zum gleichen Target-Node (aber unterschiedlichen Abschnitten) zu unterscheiden. + Erzeugt eine deterministische UUID v5-konforme ID für Qdrant. + Behebt den 'HTTP 400 Bad Request', indem ein valides UUID-Format geliefert wird. """ base = f"{kind}:{s}->{t}#{scope}" if rule_id: @@ -63,7 +62,9 @@ def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = if variant: base += f"|{variant}" - return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() + # Wir erzeugen einen 16-Byte Hash (128 Bit) für die UUID-Konvertierung + hash_bytes = hashlib.blake2s(base.encode("utf-8"), digest_size=16).digest() + return str(uuid.UUID(bytes=hash_bytes)) def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: """Konstruiert ein Kanten-Payload für Qdrant.""" @@ -108,23 +109,18 @@ def get_edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: WP-24c: Ermittelt Standard-Kanten (Typical Edges) für einen Notiz-Typ. Nutzt die EdgeRegistry (graph_schema.md) als primäre Quelle. """ - # 1. Dynamische Abfrage über die neue Topologie-Engine (WP-24c) - # Behebt das Audit-Problem 1a/1b: Suche in graph_schema.md statt types.yaml if note_type: topology = edge_registry.get_topology_info(note_type, "any") typical = topology.get("typical", []) if typical: return typical - # 2. Legacy-Fallback: Suche in der geladenen Registry (types.yaml) - # Sichert 100% Rückwärtskompatibilität, falls Reste in types.yaml verblieben sind. types_map = reg.get("types", reg) if isinstance(reg, dict) else {} if note_type and isinstance(types_map, dict): t = types_map.get(note_type) if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] - # 3. Globaler Default-Fallback aus der Registry for key in ("defaults", "default", "global"): v = reg.get(key) if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 2b0812a..5e07213 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,9 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.1.8: Fix für HTTP 400 (Bad Request) durch ID-Validierung - und Schutz vor System-Typ Kollisionen. -VERSION: 3.1.8 (WP-24c: Robust Symmetry & ID Validation) + AUDIT v3.1.9: Fix für TypeError (Sync-Check), ID-Validierung und UUID-Support. +VERSION: 3.1.9 (WP-24c: Robust Symmetry & Sync Fix) STATUS: Active """ import logging @@ -22,7 +21,7 @@ from app.core.parser import ( validate_required_frontmatter, NoteContext ) from app.core.chunking import assemble_chunks -# WP-24c: Import für die deterministische ID-Vorabberechnung +# WP-24c: Import für die deterministische ID-Vorabberechnung (nun UUID-basiert) from app.core.graph.graph_utils import _mk_edge_id # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene @@ -86,25 +85,21 @@ class IngestionService: def _is_valid_note_id(self, text: str) -> bool: """ - Prüft, ob ein String eine plausible Note-ID oder ein gültiger Titel ist. - Verhindert Symmetrie-Kanten zu Typ-Strings wie 'insight', 'event' oder 'source'. + WP-24c: Prüft, ob ein String eine plausible Note-ID oder ein gültiger Titel ist. + Verhindert Symmetrie-Kanten zu Meta-Begriffen wie 'insight' oder 'event'. """ if not text or len(text.strip()) < 3: return False - # 1. Bekannte System-Typen oder Meta-Daten Begriffe ausschließen - # Diese landen oft durch fehlerhafte Frontmatter-Einträge in der Referenz-Liste blacklisted = { "insight", "event", "source", "task", "project", - "person", "concept", "value", "principle", "trip", - "lesson", "decision", "requirement", "related_to" + "person", "concept", "value", "principle", "lesson", + "decision", "requirement", "related_to", "referenced_by" } - clean_text = text.lower().strip() - if clean_text in blacklisted: + if text.lower().strip() in blacklisted: return False - # 2. Ausschluss von zu langen Textfragmenten (wahrscheinlich kein Titel/ID) - if len(text) > 120: + if len(text) > 100: return False return True @@ -112,19 +107,14 @@ class IngestionService: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: """ WP-15b: Implementiert den Two-Pass Ingestion Workflow. - Pass 1: Pre-Scan füllt den Context-Cache. - Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung. """ - # Reset der Authority-Registry für den neuen Batch self.processed_explicit_ids.clear() logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") for path in file_paths: try: - # Übergabe der Registry für dynamische Scan-Tiefe ctx = pre_scan_markdown(path, registry=self.registry) if ctx: - # Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname) self.batch_cache[ctx.note_id] = ctx self.batch_cache[ctx.title] = ctx fname = os.path.splitext(os.path.basename(path))[0] @@ -155,7 +145,6 @@ class IngestionService: except Exception as e: return {**result, "error": f"Validation failed: {str(e)}"} - # Dynamischer Lifecycle-Filter ingest_cfg = self.registry.get("ingestion_settings", {}) ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"]) @@ -172,10 +161,7 @@ class IngestionService: ) note_id = note_pl["note_id"] - # Abgleich mit der Datenbank old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) - - # Prüfung gegen den konfigurierten Hash-Modus check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(check_key) new_hash = note_pl.get("hashes", {}).get(check_key) @@ -199,32 +185,21 @@ class IngestionService: chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) - # Semantische Kanten-Validierung for ch in chunks: new_pool = [] for cand in getattr(ch, "candidate_pool", []): if cand.get("provenance") == "global_pool" and enable_smart: is_valid = await validate_edge_candidate( - ch.text, - cand, - self.batch_cache, - self.llm, - profile_name="ingest_validator" + ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator" ) - if is_valid: - new_pool.append(cand) + if is_valid: new_pool.append(cand) else: new_pool.append(cand) ch.candidate_pool = new_pool - chunk_pls = make_chunk_payloads( - fm, note_pl["path"], chunks, file_path=file_path, - types_cfg=self.registry - ) - + chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry) vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] - # Aggregation aller Kanten raw_edges = build_edges_for_note( note_id, chunk_pls, note_level_references=note_pl.get("references", []), @@ -234,69 +209,50 @@ class IngestionService: # --- WP-24c: Symmetrie-Injektion (Authority Implementation) --- final_edges = [] - # PHASE 1: Alle expliziten Kanten vorverarbeiten und registrieren + # PHASE 1: Alle expliziten Kanten registrieren for e in raw_edges: target_raw = e.get("target_id") + if not self._is_valid_note_id(target_raw): continue - # Robustheits-Check: Ist das Ziel eine valide Note-ID? - if not self._is_valid_note_id(target_raw): - continue + resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit")) + e.update({ + "kind": resolved_kind, "origin_note_id": note_id, + "virtual": False, "confidence": 1.0 + }) - resolved_kind = edge_registry.resolve( - e.get("kind", "related_to"), - provenance=e.get("provenance", "explicit"), - context={"file": file_path, "note_id": note_id} - ) - e["kind"] = resolved_kind - e["origin_note_id"] = note_id - e["virtual"] = False - e["confidence"] = e.get("confidence", 1.0) # Volle Gewichtung - - # Registrierung der ID im Laufzeit-Schutz (Authority) edge_id = _mk_edge_id(resolved_kind, note_id, target_raw, e.get("scope", "note")) self.processed_explicit_ids.add(edge_id) - final_edges.append(e) - # PHASE 2: Symmetrische Kanten (Invers) mit Authority-Schutz erzeugen + # PHASE 2: Symmetrische Kanten (Invers) explicit_only = [x for x in final_edges if not x.get("virtual")] - for e in explicit_only: kind = e["kind"] - inverse_kind = edge_registry.get_inverse(kind) + inv_kind = edge_registry.get_inverse(kind) target_raw = e.get("target_id") - - # ID-Resolution target_ctx = self.batch_cache.get(target_raw) - target_canonical_id = target_ctx.note_id if target_ctx else target_raw + target_id = target_ctx.note_id if target_ctx else target_raw - # Validierung für Symmetrie-Erzeugung (Kein Self-Loop, valide ID) - if (inverse_kind and target_canonical_id and target_canonical_id != note_id and self._is_valid_note_id(target_canonical_id)): + if (inv_kind and target_id and target_id != note_id and self._is_valid_note_id(target_id)): + potential_id = _mk_edge_id(inv_kind, target_id, note_id, e.get("scope", "note")) - potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note")) - - # AUTHORITY-CHECK: Batch-Gedächtnis oder Datenbank is_in_batch = potential_id in self.processed_explicit_ids + # FIX v3.1.9: Kein 'await' verwenden, da die DB-Funktion synchron ist! is_in_db = False if not is_in_batch: - # Real-Time DB Check verhindert 400 Bad Request durch vorherige ID-Validierung - is_in_db = await is_explicit_edge_present(self.client, self.prefix, potential_id) + is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id) if not is_in_batch and not is_in_db: - if (inverse_kind != kind or kind not in ["related_to", "references"]): + if (inv_kind != kind or kind not in ["related_to", "references"]): inv_edge = e.copy() inv_edge.update({ - "note_id": target_canonical_id, - "target_id": note_id, - "kind": inverse_kind, - "virtual": True, - "provenance": "structure", - "confidence": 1.0, # Gewichtung bleibt gleich laut Nutzerwunsch + "note_id": target_id, "target_id": note_id, "kind": inv_kind, + "virtual": True, "provenance": "structure", "confidence": 1.0, "origin_note_id": note_id }) final_edges.append(inv_edge) - logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}") + logger.info(f"🔄 [SYMMETRY] Built inverse: {target_id} --({inv_kind})--> {note_id}") edges = final_edges @@ -305,7 +261,6 @@ class IngestionService: if purge_before and old_payload: purge_artifacts(self.client, self.prefix, note_id) - # Speichern n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) upsert_batch(self.client, n_name, n_pts) @@ -318,12 +273,8 @@ class IngestionService: upsert_batch(self.client, f"{self.prefix}_edges", e_pts) return { - "path": file_path, - "status": "success", - "changed": True, - "note_id": note_id, - "chunks_count": len(chunk_pls), - "edges_count": len(edges) + "path": file_path, "status": "success", "changed": True, "note_id": note_id, + "chunks_count": len(chunk_pls), "edges_count": len(edges) } except Exception as e: logger.error(f"Processing failed: {e}", exc_info=True)