Update graph_utils.py and ingestion_processor.py to versions 1.2.0 and 3.1.9 respectively: Transition to deterministic UUIDs for edge ID generation to ensure Qdrant compatibility and prevent HTTP 400 errors. Enhance ID validation and streamline edge processing logic to improve robustness and prevent collisions with known system types. Adjust versioning and documentation accordingly.

This commit is contained in:
Lars 2026-01-09 21:46:47 +01:00
parent 72cf71fa87
commit 7ed82ad82e
2 changed files with 40 additions and 93 deletions

View File

@ -2,12 +2,13 @@
FILE: app/core/graph/graph_utils.py FILE: app/core/graph/graph_utils.py
DESCRIPTION: Basale Werkzeuge, ID-Generierung und Provenance-Konfiguration für den Graphen. DESCRIPTION: Basale Werkzeuge, ID-Generierung und Provenance-Konfiguration für den Graphen.
WP-24c: Integration der EdgeRegistry für dynamische Topologie-Defaults. WP-24c: Integration der EdgeRegistry für dynamische Topologie-Defaults.
AUDIT: Erweitert um parse_link_target für sauberes Section-Splitting. FIX v1.2.0: Umstellung auf deterministische UUIDs (Qdrant Kompatibilität).
VERSION: 1.1.0 (WP-24c: Dynamic Topology Implementation) VERSION: 1.2.0
STATUS: Active STATUS: Active
""" """
import os import os
import hashlib import hashlib
import uuid
import logging import logging
from typing import Iterable, List, Optional, Set, Any, Tuple from typing import Iterable, List, Optional, Set, Any, Tuple
@ -52,10 +53,8 @@ def _dedupe_seq(seq: Iterable[str]) -> List[str]:
def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None, variant: Optional[str] = None) -> str: def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] = None, variant: Optional[str] = None) -> str:
""" """
Erzeugt eine deterministische 12-Byte ID mittels BLAKE2s. Erzeugt eine deterministische UUID v5-konforme ID für Qdrant.
Behebt den 'HTTP 400 Bad Request', indem ein valides UUID-Format geliefert wird.
WP-Fix: 'variant' (z.B. Section) fließt in den Hash ein, um mehrere Kanten
zum gleichen Target-Node (aber unterschiedlichen Abschnitten) zu unterscheiden.
""" """
base = f"{kind}:{s}->{t}#{scope}" base = f"{kind}:{s}->{t}#{scope}"
if rule_id: if rule_id:
@ -63,7 +62,9 @@ def _mk_edge_id(kind: str, s: str, t: str, scope: str, rule_id: Optional[str] =
if variant: if variant:
base += f"|{variant}" base += f"|{variant}"
return hashlib.blake2s(base.encode("utf-8"), digest_size=12).hexdigest() # Wir erzeugen einen 16-Byte Hash (128 Bit) für die UUID-Konvertierung
hash_bytes = hashlib.blake2s(base.encode("utf-8"), digest_size=16).digest()
return str(uuid.UUID(bytes=hash_bytes))
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict: def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
"""Konstruiert ein Kanten-Payload für Qdrant.""" """Konstruiert ein Kanten-Payload für Qdrant."""
@ -108,23 +109,18 @@ def get_edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]:
WP-24c: Ermittelt Standard-Kanten (Typical Edges) für einen Notiz-Typ. WP-24c: Ermittelt Standard-Kanten (Typical Edges) für einen Notiz-Typ.
Nutzt die EdgeRegistry (graph_schema.md) als primäre Quelle. Nutzt die EdgeRegistry (graph_schema.md) als primäre Quelle.
""" """
# 1. Dynamische Abfrage über die neue Topologie-Engine (WP-24c)
# Behebt das Audit-Problem 1a/1b: Suche in graph_schema.md statt types.yaml
if note_type: if note_type:
topology = edge_registry.get_topology_info(note_type, "any") topology = edge_registry.get_topology_info(note_type, "any")
typical = topology.get("typical", []) typical = topology.get("typical", [])
if typical: if typical:
return typical return typical
# 2. Legacy-Fallback: Suche in der geladenen Registry (types.yaml)
# Sichert 100% Rückwärtskompatibilität, falls Reste in types.yaml verblieben sind.
types_map = reg.get("types", reg) if isinstance(reg, dict) else {} types_map = reg.get("types", reg) if isinstance(reg, dict) else {}
if note_type and isinstance(types_map, dict): if note_type and isinstance(types_map, dict):
t = types_map.get(note_type) t = types_map.get(note_type)
if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list): if isinstance(t, dict) and isinstance(t.get("edge_defaults"), list):
return [str(x) for x in t["edge_defaults"] if isinstance(x, str)] return [str(x) for x in t["edge_defaults"] if isinstance(x, str)]
# 3. Globaler Default-Fallback aus der Registry
for key in ("defaults", "default", "global"): for key in ("defaults", "default", "global"):
v = reg.get(key) v = reg.get(key)
if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list):

View File

@ -5,9 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.1.8: Fix für HTTP 400 (Bad Request) durch ID-Validierung AUDIT v3.1.9: Fix für TypeError (Sync-Check), ID-Validierung und UUID-Support.
und Schutz vor System-Typ Kollisionen. VERSION: 3.1.9 (WP-24c: Robust Symmetry & Sync Fix)
VERSION: 3.1.8 (WP-24c: Robust Symmetry & ID Validation)
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -22,7 +21,7 @@ from app.core.parser import (
validate_required_frontmatter, NoteContext validate_required_frontmatter, NoteContext
) )
from app.core.chunking import assemble_chunks from app.core.chunking import assemble_chunks
# WP-24c: Import für die deterministische ID-Vorabberechnung # WP-24c: Import für die deterministische ID-Vorabberechnung (nun UUID-basiert)
from app.core.graph.graph_utils import _mk_edge_id from app.core.graph.graph_utils import _mk_edge_id
# MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene # MODULARISIERUNG: Neue Import-Pfade für die Datenbank-Ebene
@ -86,25 +85,21 @@ class IngestionService:
def _is_valid_note_id(self, text: str) -> bool: def _is_valid_note_id(self, text: str) -> bool:
""" """
Prüft, ob ein String eine plausible Note-ID oder ein gültiger Titel ist. WP-24c: Prüft, ob ein String eine plausible Note-ID oder ein gültiger Titel ist.
Verhindert Symmetrie-Kanten zu Typ-Strings wie 'insight', 'event' oder 'source'. Verhindert Symmetrie-Kanten zu Meta-Begriffen wie 'insight' oder 'event'.
""" """
if not text or len(text.strip()) < 3: if not text or len(text.strip()) < 3:
return False return False
# 1. Bekannte System-Typen oder Meta-Daten Begriffe ausschließen
# Diese landen oft durch fehlerhafte Frontmatter-Einträge in der Referenz-Liste
blacklisted = { blacklisted = {
"insight", "event", "source", "task", "project", "insight", "event", "source", "task", "project",
"person", "concept", "value", "principle", "trip", "person", "concept", "value", "principle", "lesson",
"lesson", "decision", "requirement", "related_to" "decision", "requirement", "related_to", "referenced_by"
} }
clean_text = text.lower().strip() if text.lower().strip() in blacklisted:
if clean_text in blacklisted:
return False return False
# 2. Ausschluss von zu langen Textfragmenten (wahrscheinlich kein Titel/ID) if len(text) > 100:
if len(text) > 120:
return False return False
return True return True
@ -112,19 +107,14 @@ class IngestionService:
async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]:
""" """
WP-15b: Implementiert den Two-Pass Ingestion Workflow. WP-15b: Implementiert den Two-Pass Ingestion Workflow.
Pass 1: Pre-Scan füllt den Context-Cache.
Pass 2: Verarbeitung nutzt den Cache für die semantische Prüfung.
""" """
# Reset der Authority-Registry für den neuen Batch
self.processed_explicit_ids.clear() self.processed_explicit_ids.clear()
logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...")
for path in file_paths: for path in file_paths:
try: try:
# Übergabe der Registry für dynamische Scan-Tiefe
ctx = pre_scan_markdown(path, registry=self.registry) ctx = pre_scan_markdown(path, registry=self.registry)
if ctx: if ctx:
# Mehrfache Indizierung für robusten Look-up (ID, Titel, Dateiname)
self.batch_cache[ctx.note_id] = ctx self.batch_cache[ctx.note_id] = ctx
self.batch_cache[ctx.title] = ctx self.batch_cache[ctx.title] = ctx
fname = os.path.splitext(os.path.basename(path))[0] fname = os.path.splitext(os.path.basename(path))[0]
@ -155,7 +145,6 @@ class IngestionService:
except Exception as e: except Exception as e:
return {**result, "error": f"Validation failed: {str(e)}"} return {**result, "error": f"Validation failed: {str(e)}"}
# Dynamischer Lifecycle-Filter
ingest_cfg = self.registry.get("ingestion_settings", {}) ingest_cfg = self.registry.get("ingestion_settings", {})
ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"]) ignore_list = ingest_cfg.get("ignore_statuses", ["system", "template", "archive", "hidden"])
@ -172,10 +161,7 @@ class IngestionService:
) )
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
# Abgleich mit der Datenbank
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
# Prüfung gegen den konfigurierten Hash-Modus
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
old_hash = (old_payload or {}).get("hashes", {}).get(check_key) old_hash = (old_payload or {}).get("hashes", {}).get(check_key)
new_hash = note_pl.get("hashes", {}).get(check_key) new_hash = note_pl.get("hashes", {}).get(check_key)
@ -199,32 +185,21 @@ class IngestionService:
chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg)
# Semantische Kanten-Validierung
for ch in chunks: for ch in chunks:
new_pool = [] new_pool = []
for cand in getattr(ch, "candidate_pool", []): for cand in getattr(ch, "candidate_pool", []):
if cand.get("provenance") == "global_pool" and enable_smart: if cand.get("provenance") == "global_pool" and enable_smart:
is_valid = await validate_edge_candidate( is_valid = await validate_edge_candidate(
ch.text, ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"
cand,
self.batch_cache,
self.llm,
profile_name="ingest_validator"
) )
if is_valid: if is_valid: new_pool.append(cand)
new_pool.append(cand)
else: else:
new_pool.append(cand) new_pool.append(cand)
ch.candidate_pool = new_pool ch.candidate_pool = new_pool
chunk_pls = make_chunk_payloads( chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, file_path=file_path, types_cfg=self.registry)
fm, note_pl["path"], chunks, file_path=file_path,
types_cfg=self.registry
)
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Aggregation aller Kanten
raw_edges = build_edges_for_note( raw_edges = build_edges_for_note(
note_id, chunk_pls, note_id, chunk_pls,
note_level_references=note_pl.get("references", []), note_level_references=note_pl.get("references", []),
@ -234,69 +209,50 @@ class IngestionService:
# --- WP-24c: Symmetrie-Injektion (Authority Implementation) --- # --- WP-24c: Symmetrie-Injektion (Authority Implementation) ---
final_edges = [] final_edges = []
# PHASE 1: Alle expliziten Kanten vorverarbeiten und registrieren # PHASE 1: Alle expliziten Kanten registrieren
for e in raw_edges: for e in raw_edges:
target_raw = e.get("target_id") target_raw = e.get("target_id")
if not self._is_valid_note_id(target_raw): continue
# Robustheits-Check: Ist das Ziel eine valide Note-ID? resolved_kind = edge_registry.resolve(e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"))
if not self._is_valid_note_id(target_raw): e.update({
continue "kind": resolved_kind, "origin_note_id": note_id,
"virtual": False, "confidence": 1.0
})
resolved_kind = edge_registry.resolve(
e.get("kind", "related_to"),
provenance=e.get("provenance", "explicit"),
context={"file": file_path, "note_id": note_id}
)
e["kind"] = resolved_kind
e["origin_note_id"] = note_id
e["virtual"] = False
e["confidence"] = e.get("confidence", 1.0) # Volle Gewichtung
# Registrierung der ID im Laufzeit-Schutz (Authority)
edge_id = _mk_edge_id(resolved_kind, note_id, target_raw, e.get("scope", "note")) edge_id = _mk_edge_id(resolved_kind, note_id, target_raw, e.get("scope", "note"))
self.processed_explicit_ids.add(edge_id) self.processed_explicit_ids.add(edge_id)
final_edges.append(e) final_edges.append(e)
# PHASE 2: Symmetrische Kanten (Invers) mit Authority-Schutz erzeugen # PHASE 2: Symmetrische Kanten (Invers)
explicit_only = [x for x in final_edges if not x.get("virtual")] explicit_only = [x for x in final_edges if not x.get("virtual")]
for e in explicit_only: for e in explicit_only:
kind = e["kind"] kind = e["kind"]
inverse_kind = edge_registry.get_inverse(kind) inv_kind = edge_registry.get_inverse(kind)
target_raw = e.get("target_id") target_raw = e.get("target_id")
# ID-Resolution
target_ctx = self.batch_cache.get(target_raw) target_ctx = self.batch_cache.get(target_raw)
target_canonical_id = target_ctx.note_id if target_ctx else target_raw target_id = target_ctx.note_id if target_ctx else target_raw
# Validierung für Symmetrie-Erzeugung (Kein Self-Loop, valide ID) if (inv_kind and target_id and target_id != note_id and self._is_valid_note_id(target_id)):
if (inverse_kind and target_canonical_id and target_canonical_id != note_id and self._is_valid_note_id(target_canonical_id)): potential_id = _mk_edge_id(inv_kind, target_id, note_id, e.get("scope", "note"))
potential_id = _mk_edge_id(inverse_kind, target_canonical_id, note_id, e.get("scope", "note"))
# AUTHORITY-CHECK: Batch-Gedächtnis oder Datenbank
is_in_batch = potential_id in self.processed_explicit_ids is_in_batch = potential_id in self.processed_explicit_ids
# FIX v3.1.9: Kein 'await' verwenden, da die DB-Funktion synchron ist!
is_in_db = False is_in_db = False
if not is_in_batch: if not is_in_batch:
# Real-Time DB Check verhindert 400 Bad Request durch vorherige ID-Validierung is_in_db = is_explicit_edge_present(self.client, self.prefix, potential_id)
is_in_db = await is_explicit_edge_present(self.client, self.prefix, potential_id)
if not is_in_batch and not is_in_db: if not is_in_batch and not is_in_db:
if (inverse_kind != kind or kind not in ["related_to", "references"]): if (inv_kind != kind or kind not in ["related_to", "references"]):
inv_edge = e.copy() inv_edge = e.copy()
inv_edge.update({ inv_edge.update({
"note_id": target_canonical_id, "note_id": target_id, "target_id": note_id, "kind": inv_kind,
"target_id": note_id, "virtual": True, "provenance": "structure", "confidence": 1.0,
"kind": inverse_kind,
"virtual": True,
"provenance": "structure",
"confidence": 1.0, # Gewichtung bleibt gleich laut Nutzerwunsch
"origin_note_id": note_id "origin_note_id": note_id
}) })
final_edges.append(inv_edge) final_edges.append(inv_edge)
logger.info(f"🔄 [SYMMETRY] Built inverse: {target_canonical_id} --({inverse_kind})--> {note_id}") logger.info(f"🔄 [SYMMETRY] Built inverse: {target_id} --({inv_kind})--> {note_id}")
edges = final_edges edges = final_edges
@ -305,7 +261,6 @@ class IngestionService:
if purge_before and old_payload: if purge_before and old_payload:
purge_artifacts(self.client, self.prefix, note_id) purge_artifacts(self.client, self.prefix, note_id)
# Speichern
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts) upsert_batch(self.client, n_name, n_pts)
@ -318,12 +273,8 @@ class IngestionService:
upsert_batch(self.client, f"{self.prefix}_edges", e_pts) upsert_batch(self.client, f"{self.prefix}_edges", e_pts)
return { return {
"path": file_path, "path": file_path, "status": "success", "changed": True, "note_id": note_id,
"status": "success", "chunks_count": len(chunk_pls), "edges_count": len(edges)
"changed": True,
"note_id": note_id,
"chunks_count": len(chunk_pls),
"edges_count": len(edges)
} }
except Exception as e: except Exception as e:
logger.error(f"Processing failed: {e}", exc_info=True) logger.error(f"Processing failed: {e}", exc_info=True)