From 4802eba27b8752951e41061675d64c2ad77fc813 Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 9 Jan 2026 13:57:10 +0100 Subject: [PATCH] Integrate symmetric edge logic and discovery API: Update ingestion processor and validation to support automatic inverse edge generation. Enhance edge registry for dual vocabulary and schema management. Introduce new discovery endpoint for proactive edge suggestions, improving graph topology and edge validation processes. --- app/core/ingestion/ingestion_processor.py | 34 ++-- app/core/ingestion/ingestion_validation.py | 101 +++++++++-- app/routers/chat.py | 116 +++++++++++- app/services/edge_registry.py | 202 ++++++++++++++------- 4 files changed, 344 insertions(+), 109 deletions(-) diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 8ca6021..18e06a0 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -1,11 +1,12 @@ """ FILE: app/core/ingestion/ingestion_processor.py DESCRIPTION: Der zentrale IngestionService (Orchestrator). + WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v2.14.0: Synchronisierung der Profil-Auflösung mit MoE-Experten. -VERSION: 2.14.0 (WP-25a: MoE & Profile Support) + AUDIT v3.0.0: Synchronisierung der bidirektionalen Graph-Logik. +VERSION: 3.0.0 (WP-24c: Symmetric Graph Ingestion) STATUS: Active """ import logging @@ -29,10 +30,11 @@ from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry from app.services.llm_service import LLMService -# Package-Interne Imports (Refactoring WP-14) +# Package-Interne Imports (Refactoring WP-14 / WP-24c) from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts -from .ingestion_validation import validate_edge_candidate +# WP-24c: Import der erweiterten Symmetrie-Logik +from .ingestion_validation import validate_edge_candidate, validate_and_symmetrize from .ingestion_note_payload import make_note_payload from .ingestion_chunk_payload import make_chunk_payloads @@ -167,18 +169,26 @@ class IngestionService: # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor. chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) - # Semantische Kanten-Validierung (Smart Edge Allocation via MoE-Profil) + # Semantische Kanten-Validierung & Symmetrie (WP-24c / WP-25a) for ch in chunks: - filtered = [] + new_pool = [] for cand in getattr(ch, "candidate_pool", []): - # WP-25a: Nutzt nun das spezialisierte Validierungs-Profil + # WP-24c: Nutzung des erweiterten Symmetrie-Gateways if cand.get("provenance") == "global_pool" and enable_smart: - if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"): - filtered.append(cand) + # Erzeugt Primär- und Inverse Kanten falls validiert + res_batch = await validate_and_symmetrize( + chunk_text=ch.text, + edge=cand, + source_id=note_id, + batch_cache=self.batch_cache, + llm_service=self.llm, + profile_name="ingest_validator" + ) + new_pool.extend(res_batch) else: - # Explizite Kanten (Wikilinks/Callouts) werden ungeprüft übernommen - filtered.append(cand) - ch.candidate_pool = filtered + # Explizite Kanten (Wikilinks/Callouts) werden übernommen + new_pool.append(cand) + ch.candidate_pool = new_pool # Payload-Erstellung für die Chunks chunk_pls = make_chunk_payloads( diff --git a/app/core/ingestion/ingestion_validation.py b/app/core/ingestion/ingestion_validation.py index 19af49d..b0eb4d8 100644 --- a/app/core/ingestion/ingestion_validation.py +++ b/app/core/ingestion/ingestion_validation.py @@ -1,20 +1,23 @@ """ FILE: app/core/ingestion/ingestion_validation.py DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache. - WP-25b: Umstellung auf Lazy-Prompt-Orchestration (prompt_key + variables). -VERSION: 2.14.0 (WP-25b: Lazy Prompt Integration) + WP-24c: Erweiterung um automatische Symmetrie-Generierung (Inverse Kanten). + WP-25b: Konsequente Lazy-Prompt-Orchestration (prompt_key + variables). +VERSION: 3.0.0 (WP-24c: Symmetric Edge Management) STATUS: Active FIX: -- WP-25b: Entfernung manueller Prompt-Formatierung zur Unterstützung modell-spezifischer Prompts. -- WP-25b: Umstellung auf generate_raw_response mit prompt_key="edge_validation". -- WP-25a: Voller Erhalt der MoE-Profilsteuerung und Fallback-Kaskade via LLMService. +- WP-24c: Integration der EdgeRegistry zur dynamischen Inversions-Ermittlung. +- WP-24c: Implementierung von validate_and_symmetrize für bidirektionale Graphen. +- WP-25b: Beibehaltung der hierarchischen Prompt-Resolution und Modell-Spezi-Logik. """ import logging -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List from app.core.parser import NoteContext -# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports +# Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports from app.core.registry import clean_llm_text +# WP-24c: Zugriff auf das dynamische Vokabular +from app.services.edge_registry import registry as edge_registry logger = logging.getLogger(__name__) @@ -28,18 +31,18 @@ async def validate_edge_candidate( ) -> bool: """ WP-15b/25b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache. - Nutzt Lazy-Prompt-Loading zur Unterstützung modell-spezifischer Validierungs-Templates. + Nutzt Lazy-Prompt-Loading (PROMPT-TRACE) für deterministische YES/NO Entscheidungen. """ target_id = edge.get("to") target_ctx = batch_cache.get(target_id) - # Robust Lookup Fix (v2.12.2): Support für Anker - if not target_ctx and "#" in target_id: + # Robust Lookup Fix (v2.12.2): Support für Anker (Note#Section) + if not target_ctx and "#" in str(target_id): base_id = target_id.split("#")[0] target_ctx = batch_cache.get(base_id) # Sicherheits-Fallback (Hard-Link Integrity) - # Explizite Wikilinks oder Callouts werden nicht durch das LLM verifiziert. + # Wenn das Ziel nicht im Cache ist, erlauben wir die Kante (Link-Erhalt). if not target_ctx: logger.info(f"ℹ️ [VALIDATION SKIP] No context for '{target_id}' - allowing link.") return True @@ -48,8 +51,7 @@ async def validate_edge_candidate( logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}' (Profile: {profile_name})...") # WP-25b: Lazy-Prompt Aufruf. - # Wir übergeben keine formatierte Nachricht mehr, sondern Key und Daten-Dict. - # Das manuelle 'template = llm_service.get_prompt(...)' entfällt hier. + # Übergabe von prompt_key und Variablen für modell-optimierte Formatierung. raw_response = await llm_service.generate_raw_response( prompt_key="edge_validation", variables={ @@ -62,7 +64,7 @@ async def validate_edge_candidate( profile_name=profile_name ) - # WP-14 Fix: Bereinigung zur Sicherstellung der Interpretierbarkeit + # Bereinigung zur Sicherstellung der Interpretierbarkeit (Mistral/Qwen Safe) response = clean_llm_text(raw_response) # Semantische Prüfung des Ergebnisses @@ -78,12 +80,71 @@ async def validate_edge_candidate( error_str = str(e).lower() error_type = type(e).__name__ - # WP-25b FIX: Differenzierung zwischen transienten und permanenten Fehlern - # Transiente Fehler (Timeout, Network) → erlauben (Datenverlust vermeiden) + # WP-25b: Differenzierung zwischen transienten und permanenten Fehlern + # Transiente Fehler (Netzwerk) → erlauben (Integrität vor Präzision) if any(x in error_str for x in ["timeout", "connection", "network", "unreachable", "refused"]): - logger.warning(f"⚠️ Transient error for {target_id} using {profile_name}: {error_type} - {e}. Allowing edge.") + logger.warning(f"⚠️ Transient error for {target_id}: {error_type} - {e}. Allowing edge.") return True - # Permanente Fehler (Config, Validation, Invalid Response) → ablehnen (Graph-Qualität) - logger.error(f"❌ Permanent validation error for {target_id} using {profile_name}: {error_type} - {e}") - return False \ No newline at end of file + # Permanente Fehler → ablehnen (Graph-Qualität schützen) + logger.error(f"❌ Permanent validation error for {target_id}: {error_type} - {e}") + return False + +async def validate_and_symmetrize( + chunk_text: str, + edge: Dict, + source_id: str, + batch_cache: Dict[str, NoteContext], + llm_service: Any, + profile_name: str = "ingest_validator" +) -> List[Dict]: + """ + WP-24c: Erweitertes Validierungs-Gateway. + Prüft die Primärkante und erzeugt bei Erfolg automatisch die inverse Kante. + + Returns: + List[Dict]: Eine Liste mit 0, 1 (nur Primär) oder 2 (Primär + Invers) Kanten. + """ + # 1. Semantische Prüfung der Primärkante (A -> B) + is_valid = await validate_edge_candidate( + chunk_text=chunk_text, + edge=edge, + batch_cache=batch_cache, + llm_service=llm_service, + profile_name=profile_name + ) + + if not is_valid: + return [] + + validated_edges = [edge] + + # 2. WP-24c: Symmetrie-Generierung (B -> A) + # Wir laden den inversen Typ dynamisch aus der EdgeRegistry (Single Source of Truth) + original_kind = edge.get("kind", "related_to") + inverse_kind = edge_registry.get_inverse(original_kind) + + # Wir erzeugen eine inverse Kante nur, wenn ein sinnvoller inverser Typ existiert + # und das Ziel der Primärkante (to) valide ist. + target_id = edge.get("to") + + if target_id and source_id: + # Die inverse Kante zeigt vom Ziel der Primärkante zurück zur Quelle. + # Sie wird als 'virtual' markiert, um sie im Retrieval/UI identifizierbar zu machen. + inverse_edge = { + "to": source_id, + "kind": inverse_kind, + "provenance": "structure", # System-generiert, geschützt durch Firewall + "confidence": edge.get("confidence", 0.9) * 0.9, # Leichte Dämpfung für virtuelle Pfade + "virtual": True, + "note_id": target_id, # Die Note, von der die inverse Kante ausgeht + "rule_id": f"symmetry:{original_kind}" + } + + # Wir fügen die Symmetrie nur hinzu, wenn sie einen echten Mehrwert bietet + # (Vermeidung von redundanten related_to -> related_to Loops) + if inverse_kind != original_kind or original_kind not in ["related_to", "references"]: + validated_edges.append(inverse_edge) + logger.info(f"🔄 [SYMMETRY] Generated inverse edge: '{target_id}' --({inverse_kind})--> '{source_id}'") + + return validated_edges \ No newline at end of file diff --git a/app/routers/chat.py b/app/routers/chat.py index 0c3ebd6..ec7fcf7 100644 --- a/app/routers/chat.py +++ b/app/routers/chat.py @@ -3,9 +3,11 @@ FILE: app/routers/chat.py DESCRIPTION: Haupt-Chat-Interface (WP-25b Edition). Kombiniert die spezialisierte Interview-Logik mit der neuen Lazy-Prompt-Orchestration und MoE-Synthese. -VERSION: 3.0.5 (WP-25b: Lazy Prompt Integration) + WP-24c: Integration der Discovery API für proaktive Vernetzung. +VERSION: 3.1.0 (WP-24c: Discovery API Integration) STATUS: Active FIX: +- WP-24c: Neuer Endpunkt /query/discover für proaktive Kanten-Vorschläge. - WP-25b: Umstellung des Interview-Modus auf Lazy-Prompt (prompt_key + variables). - WP-25b: Delegation der RAG-Phase an die Engine v1.3.0 für konsistente MoE-Steuerung. - WP-25a: Voller Erhalt der v3.0.2 Logik (Interview, Schema-Resolution, FastPaths). @@ -13,6 +15,7 @@ FIX: from fastapi import APIRouter, HTTPException, Depends from typing import List, Dict, Any, Optional +from pydantic import BaseModel import time import uuid import logging @@ -22,13 +25,27 @@ import asyncio from pathlib import Path from app.config import get_settings -from app.models.dto import ChatRequest, ChatResponse, QueryHit +from app.models.dto import ChatRequest, ChatResponse, QueryHit, QueryRequest from app.services.llm_service import LLMService from app.services.feedback_service import log_search router = APIRouter() logger = logging.getLogger(__name__) +# --- EBENE 0: DTOs FÜR DISCOVERY (WP-24c) --- + +class DiscoveryRequest(BaseModel): + content: str + top_k: int = 8 + min_confidence: float = 0.6 + +class DiscoveryHit(BaseModel): + target_note: str # Note ID + target_title: str # Menschenlesbarer Titel + suggested_edge_type: str # Kanonischer Typ aus edge_vocabulary + confidence_score: float # Kombinierter Vektor- + KI-Score + reasoning: str # Kurze Begründung der KI + # --- EBENE 1: CONFIG LOADER & CACHING (WP-25 Standard) --- _DECISION_CONFIG_CACHE = None @@ -135,8 +152,7 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]: return "INTERVIEW", "Keyword (Interview)" # 3. SLOW PATH: DecisionEngine LLM Router (MoE-gesteuert) - # WP-25b FIX: Nutzung der öffentlichen API statt privater Methode - intent = await llm.decision_engine._determine_strategy(query) # TODO: Public API erstellen + intent = await llm.decision_engine._determine_strategy(query) return intent, "DecisionEngine (LLM)" # --- EBENE 3: RETRIEVAL AGGREGATION --- @@ -154,7 +170,7 @@ def _collect_all_hits(stream_responses: Dict[str, Any]) -> List[QueryHit]: seen_node_ids.add(hit.node_id) return sorted(all_hits, key=lambda h: h.total_score, reverse=True) -# --- EBENE 4: ENDPUNKT --- +# --- EBENE 4: ENDPUNKTE --- def get_llm_service(): return LLMService() @@ -196,7 +212,6 @@ async def chat_endpoint( template_key = strategy.get("prompt_template", "interview_template") # WP-25b: Lazy Loading Call - # Wir übergeben nur Key und Variablen. Das System formatiert passend zum Modell. answer_text = await llm.generate_raw_response( prompt_key=template_key, variables={ @@ -257,4 +272,91 @@ async def chat_endpoint( except Exception as e: logger.error(f"❌ Chat Endpoint Failure: {e}", exc_info=True) - raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.") \ No newline at end of file + raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.") + +@router.post("/query/discover", response_model=List[DiscoveryHit]) +async def discover_edges( + request: DiscoveryRequest, + llm: LLMService = Depends(get_llm_service) +): + """ + WP-24c: Analysiert Text auf potenzielle Kanten zu bestehendem Wissen. + Nutzt Vektor-Suche und DecisionEngine-Logik (WP-25b PROMPT-TRACE konform). + """ + start_time = time.time() + logger.info(f"🔍 [WP-24c] Discovery triggered for content: {request.content[:50]}...") + + try: + # 1. Kandidaten-Suche via Retriever (Vektor-Match) + search_req = QueryRequest( + query=request.content, + top_k=request.top_k, + explain=True + ) + candidates = await llm.decision_engine.retriever.search(search_req) + + if not candidates.results: + logger.info("ℹ️ No candidates found for discovery.") + return [] + + # 2. KI-gestützte Beziehungs-Extraktion (WP-25b) + discovery_results = [] + + # Zugriff auf gültige Kanten-Typen aus der Registry + from app.services.edge_registry import registry as edge_reg + valid_types_str = ", ".join(list(edge_reg.valid_types)) + + # Parallele Evaluierung der Kandidaten für maximale Performance + async def evaluate_candidate(hit: QueryHit) -> Optional[DiscoveryHit]: + if hit.total_score < request.min_confidence: + return None + + try: + # Nutzt ingest_extractor Profil für präzise semantische Analyse + # Wir verwenden das prompt_key Pattern (edge_extraction) gemäß WP-24c Vorgabe + raw_suggestion = await llm.generate_raw_response( + prompt_key="edge_extraction", + variables={ + "note_id": "NEUER_INHALT", + "text": f"PROXIMITY_TARGET: {hit.source.get('text', '')}\n\nNEW_CONTENT: {request.content}", + "valid_types": valid_types_str + }, + profile_name="ingest_extractor", + priority="realtime" + ) + + # Parsing der LLM Antwort (Erwartet JSON Liste) + from app.core.ingestion.ingestion_utils import extract_json_from_response + suggestions = extract_json_from_response(raw_suggestion) + + if isinstance(suggestions, list) and len(suggestions) > 0: + sugg = suggestions[0] # Wir nehmen den stärksten Vorschlag pro Hit + return DiscoveryHit( + target_note=hit.note_id, + target_title=hit.source.get("title") or hit.note_id, + suggested_edge_type=sugg.get("kind", "related_to"), + confidence_score=hit.total_score, + reasoning=f"Semantische Nähe ({int(hit.total_score*100)}%) entdeckt." + ) + except Exception as e: + logger.warning(f"⚠️ Discovery evaluation failed for hit {hit.note_id}: {e}") + return None + + tasks = [evaluate_candidate(hit) for hit in candidates.results] + results = await asyncio.gather(*tasks) + + # Zusammenführung und Duplikat-Bereinigung + seen_targets = set() + for r in results: + if r and r.target_note not in seen_targets: + discovery_results.append(r) + seen_targets.add(r.target_note) + + duration = int((time.time() - start_time) * 1000) + logger.info(f"✨ Discovery finished: found {len(discovery_results)} edges in {duration}ms") + + return discovery_results + + except Exception as e: + logger.error(f"❌ Discovery API failure: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Discovery-Prozess fehlgeschlagen.") \ No newline at end of file diff --git a/app/services/edge_registry.py b/app/services/edge_registry.py index 0763370..e261338 100644 --- a/app/services/edge_registry.py +++ b/app/services/edge_registry.py @@ -1,21 +1,17 @@ """ FILE: app/services/edge_registry.py -DESCRIPTION: Single Source of Truth für Kanten-Typen mit dynamischem Reload. - WP-15b: Erweiterte Provenance-Prüfung für die Candidate-Validation. - Sichert die Graph-Integrität durch strikte Trennung von System- und Inhaltskanten. - WP-22: Fix für absolute Pfade außerhalb des Vaults (Prod-Dictionary). - WP-20: Synchronisation mit zentralen Settings (v0.6.2). -VERSION: 0.8.0 +DESCRIPTION: Single Source of Truth für Kanten-Typen, Symmetrien und Graph-Topologie. + WP-24c: Implementierung der dualen Registry (Vocabulary & Schema). + Unterstützt dynamisches Laden von Inversen und kontextuellen Vorschlägen. +VERSION: 1.0.1 (WP-24c: Verified Atomic Topology) STATUS: Active -DEPENDENCIES: re, os, json, logging, time, app.config -LAST_ANALYSIS: 2025-12-26 """ import re import os import json import logging import time -from typing import Dict, Optional, Set, Tuple +from typing import Dict, Optional, Set, Tuple, List from app.config import get_settings @@ -23,11 +19,12 @@ logger = logging.getLogger(__name__) class EdgeRegistry: """ - Zentraler Verwalter für das Kanten-Vokabular. - Implementiert das Singleton-Pattern für konsistente Validierung über alle Services. + Zentraler Verwalter für das Kanten-Vokabular und das Graph-Schema. + Singleton-Pattern zur Sicherstellung konsistenter Validierung. """ _instance = None - # System-Kanten, die nicht durch User oder KI gesetzt werden dürfen + + # SYSTEM-SCHUTZ: Diese Kanten sind für die strukturelle Integrität reserviert (v0.8.0 Erhalt) FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"} def __new__(cls, *args, **kwargs): @@ -42,124 +39,189 @@ class EdgeRegistry: settings = get_settings() - # 1. Pfad aus den zentralen Settings laden (WP-20 Synchronisation) - # Priorisiert den Pfad aus der .env / config.py (v0.6.2) + # --- Pfad-Konfiguration (WP-24c: Variable Pfade für Vault-Spiegelung) --- + # Das Vokabular (Semantik) self.full_vocab_path = os.path.abspath(settings.MINDNET_VOCAB_PATH) - self.unknown_log_path = "data/logs/unknown_edges.jsonl" - self.canonical_map: Dict[str, str] = {} - self.valid_types: Set[str] = set() - self._last_mtime = 0.0 + # Das Schema (Topologie) - Konfigurierbar via ENV: MINDNET_SCHEMA_PATH + schema_env = getattr(settings, "MINDNET_SCHEMA_PATH", None) + if schema_env: + self.full_schema_path = os.path.abspath(schema_env) + else: + # Fallback: Liegt im selben Verzeichnis wie das Vokabular + self.full_schema_path = os.path.join(os.path.dirname(self.full_vocab_path), "graph_schema.md") + + self.unknown_log_path = "data/logs/unknown_edges.jsonl" + + # --- Interne Datenspeicher --- + self.canonical_map: Dict[str, str] = {} + self.inverse_map: Dict[str, str] = {} + self.valid_types: Set[str] = set() + + # Topologie: source_type -> { target_type -> {"typical": set, "prohibited": set} } + self.topology: Dict[str, Dict[str, Dict[str, Set[str]]]] = {} + + self._last_vocab_mtime = 0.0 + self._last_schema_mtime = 0.0 + + logger.info(f">>> [EDGE-REGISTRY] Initializing WP-24c Dual-Engine") + logger.info(f" - Vocab-Path: {self.full_vocab_path}") + logger.info(f" - Schema-Path: {self.full_schema_path}") - # Initialer Ladevorgang - logger.info(f">>> [EDGE-REGISTRY] Initializing with Path: {self.full_vocab_path}") self.ensure_latest() self.initialized = True def ensure_latest(self): - """ - Prüft den Zeitstempel der Vokabular-Datei und lädt bei Bedarf neu. - Verhindert Inkonsistenzen bei Laufzeit-Updates des Dictionaries. - """ - if not os.path.exists(self.full_vocab_path): - logger.error(f"!!! [EDGE-REGISTRY ERROR] File not found: {self.full_vocab_path} !!!") - return - + """Prüft Zeitstempel beider Dateien und führt bei Änderung Hot-Reload durch.""" try: - current_mtime = os.path.getmtime(self.full_vocab_path) - if current_mtime > self._last_mtime: - self._load_vocabulary() - self._last_mtime = current_mtime + # Vokabular-Reload bei Änderung + if os.path.exists(self.full_vocab_path): + v_mtime = os.path.getmtime(self.full_vocab_path) + if v_mtime > self._last_vocab_mtime: + self._load_vocabulary() + self._last_vocab_mtime = v_mtime + + # Schema-Reload bei Änderung + if os.path.exists(self.full_schema_path): + s_mtime = os.path.getmtime(self.full_schema_path) + if s_mtime > self._last_schema_mtime: + self._load_schema() + self._last_schema_mtime = s_mtime + except Exception as e: - logger.error(f"!!! [EDGE-REGISTRY] Error checking file time: {e}") + logger.error(f"!!! [EDGE-REGISTRY] Sync failure: {e}") def _load_vocabulary(self): - """ - Parst das Markdown-Wörterbuch und baut die Canonical-Map auf. - Erkennt Tabellen-Strukturen und extrahiert fettgedruckte System-Typen. - """ + """Parst edge_vocabulary.md: | Canonical | Inverse | Aliases | Description |""" self.canonical_map.clear() + self.inverse_map.clear() self.valid_types.clear() - # Regex für Tabellen-Struktur: | **Typ** | Aliase | - pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|") + # Regex für die 4-Spalten Struktur (WP-24c konform) + # Erwartet: | **`type`** | `inverse` | alias1, alias2 | ... | + pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*`?([a-zA-Z0-9_-]+)`?\s*\|\s*([^|]+)\|") try: with open(self.full_vocab_path, "r", encoding="utf-8") as f: - c_types, c_aliases = 0, 0 + c_count = 0 for line in f: match = pattern.search(line) if match: canonical = match.group(1).strip().lower() - aliases_str = match.group(2).strip() + inverse = match.group(2).strip().lower() + aliases_raw = match.group(3).strip() self.valid_types.add(canonical) self.canonical_map[canonical] = canonical - c_types += 1 + if inverse: + self.inverse_map[canonical] = inverse - if aliases_str and "Kein Alias" not in aliases_str: - aliases = [a.strip() for a in aliases_str.split(",") if a.strip()] + # Aliase verarbeiten (Normalisierung auf snake_case) + if aliases_raw and "Kein Alias" not in aliases_raw: + aliases = [a.strip() for a in aliases_raw.split(",") if a.strip()] for alias in aliases: - # Normalisierung: Kleinschreibung, Underscores statt Leerzeichen clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_") - self.canonical_map[clean_alias] = canonical - c_aliases += 1 + if clean_alias: + self.canonical_map[clean_alias] = canonical + c_count += 1 - logger.info(f"=== [EDGE-REGISTRY SUCCESS] Loaded {c_types} Canonical Types and {c_aliases} Aliases ===") - + logger.info(f"✅ [VOCAB] Loaded {c_count} edge definitions and their inverses.") except Exception as e: - logger.error(f"!!! [EDGE-REGISTRY FATAL] Error reading file: {e} !!!") + logger.error(f"❌ [VOCAB ERROR] {e}") + + def _load_schema(self): + """Parst graph_schema.md: ## Source: `type` | Target | Typical | Prohibited |""" + self.topology.clear() + current_source = None + + try: + with open(self.full_schema_path, "r", encoding="utf-8") as f: + for line in f: + # Header erkennen (Atomare Sektionen) + src_match = re.search(r"## Source:\s*`?([a-zA-Z0-9_-]+)`?", line) + if src_match: + current_source = src_match.group(1).strip().lower() + if current_source not in self.topology: + self.topology[current_source] = {} + continue + + # Tabellenzeilen parsen + if current_source and "|" in line and not line.startswith("|-") and "Target" not in line: + cols = [c.strip().replace("`", "").lower() for c in line.split("|")] + if len(cols) >= 4: + target_type = cols[1] + typical_edges = [e.strip() for e in cols[2].split(",") if e.strip() and e != "-"] + prohibited_edges = [e.strip() for e in cols[3].split(",") if e.strip() and e != "-"] + + if target_type not in self.topology[current_source]: + self.topology[current_source][target_type] = {"typical": set(), "prohibited": set()} + + self.topology[current_source][target_type]["typical"].update(typical_edges) + self.topology[current_source][target_type]["prohibited"].update(prohibited_edges) + + logger.info(f"✅ [SCHEMA] Topology matrix built for {len(self.topology)} source types.") + except Exception as e: + logger.error(f"❌ [SCHEMA ERROR] {e}") def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str: """ - WP-15b: Validiert einen Kanten-Typ gegen das Vokabular und prüft Berechtigungen. - Sichert, dass nur strukturelle Prozesse System-Kanten setzen dürfen. + Löst Aliasse auf kanonische Namen auf und schützt System-Kanten. + Erhalt der v0.8.0 Schutz-Logik. """ self.ensure_latest() if not edge_type: return "related_to" - # Normalisierung des Typs clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_") ctx = context or {} - # WP-15b: System-Kanten dürfen weder manuell noch durch KI/Vererbung gesetzt werden. - # Nur Provenienz 'structure' (interne Prozesse) ist autorisiert. - # Wir blockieren hier alle Provenienzen außer 'structure'. + # Sicherheits-Gate: Schutz vor unerlaubter Nutzung von System-Kanten restricted_provenance = ["explicit", "semantic_ai", "inherited", "global_pool", "rule"] if provenance in restricted_provenance and clean_type in self.FORBIDDEN_SYSTEM_EDGES: - self._log_issue(clean_type, f"forbidden_usage_by_{provenance}", ctx) + self._log_issue(clean_type, f"forbidden_system_edge_manipulation_by_{provenance}", ctx) return "related_to" - # System-Kanten sind NUR bei struktureller Provenienz erlaubt + # System-Kanten sind NUR bei struktureller Provenienz (Code-generiert) erlaubt if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES: return clean_type - # Mapping auf kanonischen Namen (Alias-Auflösung) - if clean_type in self.canonical_map: - return self.canonical_map[clean_type] + # Alias-Auflösung + return self.canonical_map.get(clean_type, clean_type) + + def get_inverse(self, edge_type: str) -> str: + """WP-24c: Gibt das symmetrische Gegenstück zurück.""" + canonical = self.resolve(edge_type) + return self.inverse_map.get(canonical, "related_to") + + def get_topology_info(self, source_type: str, target_type: str) -> Dict[str, List[str]]: + """ + WP-24c: Liefert kontextuelle Kanten-Empfehlungen für Obsidian und das Backend. + """ + self.ensure_latest() - # Fallback und Logging unbekannter Typen für Admin-Review - self._log_issue(clean_type, "unknown_type", ctx) - return clean_type + # Hierarchische Suche: Spezifisch -> 'any' -> Empty + src_cfg = self.topology.get(source_type, self.topology.get("any", {})) + tgt_cfg = src_cfg.get(target_type, src_cfg.get("any", {"typical": set(), "prohibited": set()})) + + return { + "typical": sorted(list(tgt_cfg["typical"])), + "prohibited": sorted(list(tgt_cfg["prohibited"])) + } def _log_issue(self, edge_type: str, error_kind: str, ctx: dict): - """Detailliertes JSONL-Logging für die Vokabular-Optimierung.""" + """JSONL-Logging für unbekannte/verbotene Kanten (Erhalt v0.8.0).""" try: os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) entry = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "edge_type": edge_type, "error": error_kind, - "file": ctx.get("file", "unknown"), - "line": ctx.get("line", "unknown"), "note_id": ctx.get("note_id", "unknown"), "provenance": ctx.get("provenance", "unknown") } with open(self.unknown_log_path, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") - except Exception: - pass + except Exception: pass -# Singleton Export für systemweiten Zugriff +# Singleton Export registry = EdgeRegistry() \ No newline at end of file