diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 8ca6021..18e06a0 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -1,11 +1,12 @@ """ FILE: app/core/ingestion/ingestion_processor.py DESCRIPTION: Der zentrale IngestionService (Orchestrator). + WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v2.14.0: Synchronisierung der Profil-Auflösung mit MoE-Experten. -VERSION: 2.14.0 (WP-25a: MoE & Profile Support) + AUDIT v3.0.0: Synchronisierung der bidirektionalen Graph-Logik. +VERSION: 3.0.0 (WP-24c: Symmetric Graph Ingestion) STATUS: Active """ import logging @@ -29,10 +30,11 @@ from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry from app.services.llm_service import LLMService -# Package-Interne Imports (Refactoring WP-14) +# Package-Interne Imports (Refactoring WP-14 / WP-24c) from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts -from .ingestion_validation import validate_edge_candidate +# WP-24c: Import der erweiterten Symmetrie-Logik +from .ingestion_validation import validate_edge_candidate, validate_and_symmetrize from .ingestion_note_payload import make_note_payload from .ingestion_chunk_payload import make_chunk_payloads @@ -167,18 +169,26 @@ class IngestionService: # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor. chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) - # Semantische Kanten-Validierung (Smart Edge Allocation via MoE-Profil) + # Semantische Kanten-Validierung & Symmetrie (WP-24c / WP-25a) for ch in chunks: - filtered = [] + new_pool = [] for cand in getattr(ch, "candidate_pool", []): - # WP-25a: Nutzt nun das spezialisierte Validierungs-Profil + # WP-24c: Nutzung des erweiterten Symmetrie-Gateways if cand.get("provenance") == "global_pool" and enable_smart: - if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"): - filtered.append(cand) + # Erzeugt Primär- und Inverse Kanten falls validiert + res_batch = await validate_and_symmetrize( + chunk_text=ch.text, + edge=cand, + source_id=note_id, + batch_cache=self.batch_cache, + llm_service=self.llm, + profile_name="ingest_validator" + ) + new_pool.extend(res_batch) else: - # Explizite Kanten (Wikilinks/Callouts) werden ungeprüft übernommen - filtered.append(cand) - ch.candidate_pool = filtered + # Explizite Kanten (Wikilinks/Callouts) werden übernommen + new_pool.append(cand) + ch.candidate_pool = new_pool # Payload-Erstellung für die Chunks chunk_pls = make_chunk_payloads( diff --git a/app/core/ingestion/ingestion_validation.py b/app/core/ingestion/ingestion_validation.py index 19af49d..b0eb4d8 100644 --- a/app/core/ingestion/ingestion_validation.py +++ b/app/core/ingestion/ingestion_validation.py @@ -1,20 +1,23 @@ """ FILE: app/core/ingestion/ingestion_validation.py DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache. - WP-25b: Umstellung auf Lazy-Prompt-Orchestration (prompt_key + variables). -VERSION: 2.14.0 (WP-25b: Lazy Prompt Integration) + WP-24c: Erweiterung um automatische Symmetrie-Generierung (Inverse Kanten). + WP-25b: Konsequente Lazy-Prompt-Orchestration (prompt_key + variables). +VERSION: 3.0.0 (WP-24c: Symmetric Edge Management) STATUS: Active FIX: -- WP-25b: Entfernung manueller Prompt-Formatierung zur Unterstützung modell-spezifischer Prompts. -- WP-25b: Umstellung auf generate_raw_response mit prompt_key="edge_validation". -- WP-25a: Voller Erhalt der MoE-Profilsteuerung und Fallback-Kaskade via LLMService. +- WP-24c: Integration der EdgeRegistry zur dynamischen Inversions-Ermittlung. +- WP-24c: Implementierung von validate_and_symmetrize für bidirektionale Graphen. +- WP-25b: Beibehaltung der hierarchischen Prompt-Resolution und Modell-Spezi-Logik. """ import logging -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List from app.core.parser import NoteContext -# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports +# Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports from app.core.registry import clean_llm_text +# WP-24c: Zugriff auf das dynamische Vokabular +from app.services.edge_registry import registry as edge_registry logger = logging.getLogger(__name__) @@ -28,18 +31,18 @@ async def validate_edge_candidate( ) -> bool: """ WP-15b/25b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache. - Nutzt Lazy-Prompt-Loading zur Unterstützung modell-spezifischer Validierungs-Templates. + Nutzt Lazy-Prompt-Loading (PROMPT-TRACE) für deterministische YES/NO Entscheidungen. """ target_id = edge.get("to") target_ctx = batch_cache.get(target_id) - # Robust Lookup Fix (v2.12.2): Support für Anker - if not target_ctx and "#" in target_id: + # Robust Lookup Fix (v2.12.2): Support für Anker (Note#Section) + if not target_ctx and "#" in str(target_id): base_id = target_id.split("#")[0] target_ctx = batch_cache.get(base_id) # Sicherheits-Fallback (Hard-Link Integrity) - # Explizite Wikilinks oder Callouts werden nicht durch das LLM verifiziert. + # Wenn das Ziel nicht im Cache ist, erlauben wir die Kante (Link-Erhalt). if not target_ctx: logger.info(f"ℹ️ [VALIDATION SKIP] No context for '{target_id}' - allowing link.") return True @@ -48,8 +51,7 @@ async def validate_edge_candidate( logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}' (Profile: {profile_name})...") # WP-25b: Lazy-Prompt Aufruf. - # Wir übergeben keine formatierte Nachricht mehr, sondern Key und Daten-Dict. - # Das manuelle 'template = llm_service.get_prompt(...)' entfällt hier. + # Übergabe von prompt_key und Variablen für modell-optimierte Formatierung. raw_response = await llm_service.generate_raw_response( prompt_key="edge_validation", variables={ @@ -62,7 +64,7 @@ async def validate_edge_candidate( profile_name=profile_name ) - # WP-14 Fix: Bereinigung zur Sicherstellung der Interpretierbarkeit + # Bereinigung zur Sicherstellung der Interpretierbarkeit (Mistral/Qwen Safe) response = clean_llm_text(raw_response) # Semantische Prüfung des Ergebnisses @@ -78,12 +80,71 @@ async def validate_edge_candidate( error_str = str(e).lower() error_type = type(e).__name__ - # WP-25b FIX: Differenzierung zwischen transienten und permanenten Fehlern - # Transiente Fehler (Timeout, Network) → erlauben (Datenverlust vermeiden) + # WP-25b: Differenzierung zwischen transienten und permanenten Fehlern + # Transiente Fehler (Netzwerk) → erlauben (Integrität vor Präzision) if any(x in error_str for x in ["timeout", "connection", "network", "unreachable", "refused"]): - logger.warning(f"⚠️ Transient error for {target_id} using {profile_name}: {error_type} - {e}. Allowing edge.") + logger.warning(f"⚠️ Transient error for {target_id}: {error_type} - {e}. Allowing edge.") return True - # Permanente Fehler (Config, Validation, Invalid Response) → ablehnen (Graph-Qualität) - logger.error(f"❌ Permanent validation error for {target_id} using {profile_name}: {error_type} - {e}") - return False \ No newline at end of file + # Permanente Fehler → ablehnen (Graph-Qualität schützen) + logger.error(f"❌ Permanent validation error for {target_id}: {error_type} - {e}") + return False + +async def validate_and_symmetrize( + chunk_text: str, + edge: Dict, + source_id: str, + batch_cache: Dict[str, NoteContext], + llm_service: Any, + profile_name: str = "ingest_validator" +) -> List[Dict]: + """ + WP-24c: Erweitertes Validierungs-Gateway. + Prüft die Primärkante und erzeugt bei Erfolg automatisch die inverse Kante. + + Returns: + List[Dict]: Eine Liste mit 0, 1 (nur Primär) oder 2 (Primär + Invers) Kanten. + """ + # 1. Semantische Prüfung der Primärkante (A -> B) + is_valid = await validate_edge_candidate( + chunk_text=chunk_text, + edge=edge, + batch_cache=batch_cache, + llm_service=llm_service, + profile_name=profile_name + ) + + if not is_valid: + return [] + + validated_edges = [edge] + + # 2. WP-24c: Symmetrie-Generierung (B -> A) + # Wir laden den inversen Typ dynamisch aus der EdgeRegistry (Single Source of Truth) + original_kind = edge.get("kind", "related_to") + inverse_kind = edge_registry.get_inverse(original_kind) + + # Wir erzeugen eine inverse Kante nur, wenn ein sinnvoller inverser Typ existiert + # und das Ziel der Primärkante (to) valide ist. + target_id = edge.get("to") + + if target_id and source_id: + # Die inverse Kante zeigt vom Ziel der Primärkante zurück zur Quelle. + # Sie wird als 'virtual' markiert, um sie im Retrieval/UI identifizierbar zu machen. + inverse_edge = { + "to": source_id, + "kind": inverse_kind, + "provenance": "structure", # System-generiert, geschützt durch Firewall + "confidence": edge.get("confidence", 0.9) * 0.9, # Leichte Dämpfung für virtuelle Pfade + "virtual": True, + "note_id": target_id, # Die Note, von der die inverse Kante ausgeht + "rule_id": f"symmetry:{original_kind}" + } + + # Wir fügen die Symmetrie nur hinzu, wenn sie einen echten Mehrwert bietet + # (Vermeidung von redundanten related_to -> related_to Loops) + if inverse_kind != original_kind or original_kind not in ["related_to", "references"]: + validated_edges.append(inverse_edge) + logger.info(f"🔄 [SYMMETRY] Generated inverse edge: '{target_id}' --({inverse_kind})--> '{source_id}'") + + return validated_edges \ No newline at end of file diff --git a/app/routers/chat.py b/app/routers/chat.py index 0c3ebd6..ec7fcf7 100644 --- a/app/routers/chat.py +++ b/app/routers/chat.py @@ -3,9 +3,11 @@ FILE: app/routers/chat.py DESCRIPTION: Haupt-Chat-Interface (WP-25b Edition). Kombiniert die spezialisierte Interview-Logik mit der neuen Lazy-Prompt-Orchestration und MoE-Synthese. -VERSION: 3.0.5 (WP-25b: Lazy Prompt Integration) + WP-24c: Integration der Discovery API für proaktive Vernetzung. +VERSION: 3.1.0 (WP-24c: Discovery API Integration) STATUS: Active FIX: +- WP-24c: Neuer Endpunkt /query/discover für proaktive Kanten-Vorschläge. - WP-25b: Umstellung des Interview-Modus auf Lazy-Prompt (prompt_key + variables). - WP-25b: Delegation der RAG-Phase an die Engine v1.3.0 für konsistente MoE-Steuerung. - WP-25a: Voller Erhalt der v3.0.2 Logik (Interview, Schema-Resolution, FastPaths). @@ -13,6 +15,7 @@ FIX: from fastapi import APIRouter, HTTPException, Depends from typing import List, Dict, Any, Optional +from pydantic import BaseModel import time import uuid import logging @@ -22,13 +25,27 @@ import asyncio from pathlib import Path from app.config import get_settings -from app.models.dto import ChatRequest, ChatResponse, QueryHit +from app.models.dto import ChatRequest, ChatResponse, QueryHit, QueryRequest from app.services.llm_service import LLMService from app.services.feedback_service import log_search router = APIRouter() logger = logging.getLogger(__name__) +# --- EBENE 0: DTOs FÜR DISCOVERY (WP-24c) --- + +class DiscoveryRequest(BaseModel): + content: str + top_k: int = 8 + min_confidence: float = 0.6 + +class DiscoveryHit(BaseModel): + target_note: str # Note ID + target_title: str # Menschenlesbarer Titel + suggested_edge_type: str # Kanonischer Typ aus edge_vocabulary + confidence_score: float # Kombinierter Vektor- + KI-Score + reasoning: str # Kurze Begründung der KI + # --- EBENE 1: CONFIG LOADER & CACHING (WP-25 Standard) --- _DECISION_CONFIG_CACHE = None @@ -135,8 +152,7 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]: return "INTERVIEW", "Keyword (Interview)" # 3. SLOW PATH: DecisionEngine LLM Router (MoE-gesteuert) - # WP-25b FIX: Nutzung der öffentlichen API statt privater Methode - intent = await llm.decision_engine._determine_strategy(query) # TODO: Public API erstellen + intent = await llm.decision_engine._determine_strategy(query) return intent, "DecisionEngine (LLM)" # --- EBENE 3: RETRIEVAL AGGREGATION --- @@ -154,7 +170,7 @@ def _collect_all_hits(stream_responses: Dict[str, Any]) -> List[QueryHit]: seen_node_ids.add(hit.node_id) return sorted(all_hits, key=lambda h: h.total_score, reverse=True) -# --- EBENE 4: ENDPUNKT --- +# --- EBENE 4: ENDPUNKTE --- def get_llm_service(): return LLMService() @@ -196,7 +212,6 @@ async def chat_endpoint( template_key = strategy.get("prompt_template", "interview_template") # WP-25b: Lazy Loading Call - # Wir übergeben nur Key und Variablen. Das System formatiert passend zum Modell. answer_text = await llm.generate_raw_response( prompt_key=template_key, variables={ @@ -257,4 +272,91 @@ async def chat_endpoint( except Exception as e: logger.error(f"❌ Chat Endpoint Failure: {e}", exc_info=True) - raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.") \ No newline at end of file + raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.") + +@router.post("/query/discover", response_model=List[DiscoveryHit]) +async def discover_edges( + request: DiscoveryRequest, + llm: LLMService = Depends(get_llm_service) +): + """ + WP-24c: Analysiert Text auf potenzielle Kanten zu bestehendem Wissen. + Nutzt Vektor-Suche und DecisionEngine-Logik (WP-25b PROMPT-TRACE konform). + """ + start_time = time.time() + logger.info(f"🔍 [WP-24c] Discovery triggered for content: {request.content[:50]}...") + + try: + # 1. Kandidaten-Suche via Retriever (Vektor-Match) + search_req = QueryRequest( + query=request.content, + top_k=request.top_k, + explain=True + ) + candidates = await llm.decision_engine.retriever.search(search_req) + + if not candidates.results: + logger.info("ℹ️ No candidates found for discovery.") + return [] + + # 2. KI-gestützte Beziehungs-Extraktion (WP-25b) + discovery_results = [] + + # Zugriff auf gültige Kanten-Typen aus der Registry + from app.services.edge_registry import registry as edge_reg + valid_types_str = ", ".join(list(edge_reg.valid_types)) + + # Parallele Evaluierung der Kandidaten für maximale Performance + async def evaluate_candidate(hit: QueryHit) -> Optional[DiscoveryHit]: + if hit.total_score < request.min_confidence: + return None + + try: + # Nutzt ingest_extractor Profil für präzise semantische Analyse + # Wir verwenden das prompt_key Pattern (edge_extraction) gemäß WP-24c Vorgabe + raw_suggestion = await llm.generate_raw_response( + prompt_key="edge_extraction", + variables={ + "note_id": "NEUER_INHALT", + "text": f"PROXIMITY_TARGET: {hit.source.get('text', '')}\n\nNEW_CONTENT: {request.content}", + "valid_types": valid_types_str + }, + profile_name="ingest_extractor", + priority="realtime" + ) + + # Parsing der LLM Antwort (Erwartet JSON Liste) + from app.core.ingestion.ingestion_utils import extract_json_from_response + suggestions = extract_json_from_response(raw_suggestion) + + if isinstance(suggestions, list) and len(suggestions) > 0: + sugg = suggestions[0] # Wir nehmen den stärksten Vorschlag pro Hit + return DiscoveryHit( + target_note=hit.note_id, + target_title=hit.source.get("title") or hit.note_id, + suggested_edge_type=sugg.get("kind", "related_to"), + confidence_score=hit.total_score, + reasoning=f"Semantische Nähe ({int(hit.total_score*100)}%) entdeckt." + ) + except Exception as e: + logger.warning(f"⚠️ Discovery evaluation failed for hit {hit.note_id}: {e}") + return None + + tasks = [evaluate_candidate(hit) for hit in candidates.results] + results = await asyncio.gather(*tasks) + + # Zusammenführung und Duplikat-Bereinigung + seen_targets = set() + for r in results: + if r and r.target_note not in seen_targets: + discovery_results.append(r) + seen_targets.add(r.target_note) + + duration = int((time.time() - start_time) * 1000) + logger.info(f"✨ Discovery finished: found {len(discovery_results)} edges in {duration}ms") + + return discovery_results + + except Exception as e: + logger.error(f"❌ Discovery API failure: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Discovery-Prozess fehlgeschlagen.") \ No newline at end of file diff --git a/app/services/edge_registry.py b/app/services/edge_registry.py index 0763370..e261338 100644 --- a/app/services/edge_registry.py +++ b/app/services/edge_registry.py @@ -1,21 +1,17 @@ """ FILE: app/services/edge_registry.py -DESCRIPTION: Single Source of Truth für Kanten-Typen mit dynamischem Reload. - WP-15b: Erweiterte Provenance-Prüfung für die Candidate-Validation. - Sichert die Graph-Integrität durch strikte Trennung von System- und Inhaltskanten. - WP-22: Fix für absolute Pfade außerhalb des Vaults (Prod-Dictionary). - WP-20: Synchronisation mit zentralen Settings (v0.6.2). -VERSION: 0.8.0 +DESCRIPTION: Single Source of Truth für Kanten-Typen, Symmetrien und Graph-Topologie. + WP-24c: Implementierung der dualen Registry (Vocabulary & Schema). + Unterstützt dynamisches Laden von Inversen und kontextuellen Vorschlägen. +VERSION: 1.0.1 (WP-24c: Verified Atomic Topology) STATUS: Active -DEPENDENCIES: re, os, json, logging, time, app.config -LAST_ANALYSIS: 2025-12-26 """ import re import os import json import logging import time -from typing import Dict, Optional, Set, Tuple +from typing import Dict, Optional, Set, Tuple, List from app.config import get_settings @@ -23,11 +19,12 @@ logger = logging.getLogger(__name__) class EdgeRegistry: """ - Zentraler Verwalter für das Kanten-Vokabular. - Implementiert das Singleton-Pattern für konsistente Validierung über alle Services. + Zentraler Verwalter für das Kanten-Vokabular und das Graph-Schema. + Singleton-Pattern zur Sicherstellung konsistenter Validierung. """ _instance = None - # System-Kanten, die nicht durch User oder KI gesetzt werden dürfen + + # SYSTEM-SCHUTZ: Diese Kanten sind für die strukturelle Integrität reserviert (v0.8.0 Erhalt) FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"} def __new__(cls, *args, **kwargs): @@ -42,124 +39,189 @@ class EdgeRegistry: settings = get_settings() - # 1. Pfad aus den zentralen Settings laden (WP-20 Synchronisation) - # Priorisiert den Pfad aus der .env / config.py (v0.6.2) + # --- Pfad-Konfiguration (WP-24c: Variable Pfade für Vault-Spiegelung) --- + # Das Vokabular (Semantik) self.full_vocab_path = os.path.abspath(settings.MINDNET_VOCAB_PATH) - self.unknown_log_path = "data/logs/unknown_edges.jsonl" - self.canonical_map: Dict[str, str] = {} - self.valid_types: Set[str] = set() - self._last_mtime = 0.0 + # Das Schema (Topologie) - Konfigurierbar via ENV: MINDNET_SCHEMA_PATH + schema_env = getattr(settings, "MINDNET_SCHEMA_PATH", None) + if schema_env: + self.full_schema_path = os.path.abspath(schema_env) + else: + # Fallback: Liegt im selben Verzeichnis wie das Vokabular + self.full_schema_path = os.path.join(os.path.dirname(self.full_vocab_path), "graph_schema.md") + + self.unknown_log_path = "data/logs/unknown_edges.jsonl" + + # --- Interne Datenspeicher --- + self.canonical_map: Dict[str, str] = {} + self.inverse_map: Dict[str, str] = {} + self.valid_types: Set[str] = set() + + # Topologie: source_type -> { target_type -> {"typical": set, "prohibited": set} } + self.topology: Dict[str, Dict[str, Dict[str, Set[str]]]] = {} + + self._last_vocab_mtime = 0.0 + self._last_schema_mtime = 0.0 + + logger.info(f">>> [EDGE-REGISTRY] Initializing WP-24c Dual-Engine") + logger.info(f" - Vocab-Path: {self.full_vocab_path}") + logger.info(f" - Schema-Path: {self.full_schema_path}") - # Initialer Ladevorgang - logger.info(f">>> [EDGE-REGISTRY] Initializing with Path: {self.full_vocab_path}") self.ensure_latest() self.initialized = True def ensure_latest(self): - """ - Prüft den Zeitstempel der Vokabular-Datei und lädt bei Bedarf neu. - Verhindert Inkonsistenzen bei Laufzeit-Updates des Dictionaries. - """ - if not os.path.exists(self.full_vocab_path): - logger.error(f"!!! [EDGE-REGISTRY ERROR] File not found: {self.full_vocab_path} !!!") - return - + """Prüft Zeitstempel beider Dateien und führt bei Änderung Hot-Reload durch.""" try: - current_mtime = os.path.getmtime(self.full_vocab_path) - if current_mtime > self._last_mtime: - self._load_vocabulary() - self._last_mtime = current_mtime + # Vokabular-Reload bei Änderung + if os.path.exists(self.full_vocab_path): + v_mtime = os.path.getmtime(self.full_vocab_path) + if v_mtime > self._last_vocab_mtime: + self._load_vocabulary() + self._last_vocab_mtime = v_mtime + + # Schema-Reload bei Änderung + if os.path.exists(self.full_schema_path): + s_mtime = os.path.getmtime(self.full_schema_path) + if s_mtime > self._last_schema_mtime: + self._load_schema() + self._last_schema_mtime = s_mtime + except Exception as e: - logger.error(f"!!! [EDGE-REGISTRY] Error checking file time: {e}") + logger.error(f"!!! [EDGE-REGISTRY] Sync failure: {e}") def _load_vocabulary(self): - """ - Parst das Markdown-Wörterbuch und baut die Canonical-Map auf. - Erkennt Tabellen-Strukturen und extrahiert fettgedruckte System-Typen. - """ + """Parst edge_vocabulary.md: | Canonical | Inverse | Aliases | Description |""" self.canonical_map.clear() + self.inverse_map.clear() self.valid_types.clear() - # Regex für Tabellen-Struktur: | **Typ** | Aliase | - pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|") + # Regex für die 4-Spalten Struktur (WP-24c konform) + # Erwartet: | **`type`** | `inverse` | alias1, alias2 | ... | + pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*`?([a-zA-Z0-9_-]+)`?\s*\|\s*([^|]+)\|") try: with open(self.full_vocab_path, "r", encoding="utf-8") as f: - c_types, c_aliases = 0, 0 + c_count = 0 for line in f: match = pattern.search(line) if match: canonical = match.group(1).strip().lower() - aliases_str = match.group(2).strip() + inverse = match.group(2).strip().lower() + aliases_raw = match.group(3).strip() self.valid_types.add(canonical) self.canonical_map[canonical] = canonical - c_types += 1 + if inverse: + self.inverse_map[canonical] = inverse - if aliases_str and "Kein Alias" not in aliases_str: - aliases = [a.strip() for a in aliases_str.split(",") if a.strip()] + # Aliase verarbeiten (Normalisierung auf snake_case) + if aliases_raw and "Kein Alias" not in aliases_raw: + aliases = [a.strip() for a in aliases_raw.split(",") if a.strip()] for alias in aliases: - # Normalisierung: Kleinschreibung, Underscores statt Leerzeichen clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_") - self.canonical_map[clean_alias] = canonical - c_aliases += 1 + if clean_alias: + self.canonical_map[clean_alias] = canonical + c_count += 1 - logger.info(f"=== [EDGE-REGISTRY SUCCESS] Loaded {c_types} Canonical Types and {c_aliases} Aliases ===") - + logger.info(f"✅ [VOCAB] Loaded {c_count} edge definitions and their inverses.") except Exception as e: - logger.error(f"!!! [EDGE-REGISTRY FATAL] Error reading file: {e} !!!") + logger.error(f"❌ [VOCAB ERROR] {e}") + + def _load_schema(self): + """Parst graph_schema.md: ## Source: `type` | Target | Typical | Prohibited |""" + self.topology.clear() + current_source = None + + try: + with open(self.full_schema_path, "r", encoding="utf-8") as f: + for line in f: + # Header erkennen (Atomare Sektionen) + src_match = re.search(r"## Source:\s*`?([a-zA-Z0-9_-]+)`?", line) + if src_match: + current_source = src_match.group(1).strip().lower() + if current_source not in self.topology: + self.topology[current_source] = {} + continue + + # Tabellenzeilen parsen + if current_source and "|" in line and not line.startswith("|-") and "Target" not in line: + cols = [c.strip().replace("`", "").lower() for c in line.split("|")] + if len(cols) >= 4: + target_type = cols[1] + typical_edges = [e.strip() for e in cols[2].split(",") if e.strip() and e != "-"] + prohibited_edges = [e.strip() for e in cols[3].split(",") if e.strip() and e != "-"] + + if target_type not in self.topology[current_source]: + self.topology[current_source][target_type] = {"typical": set(), "prohibited": set()} + + self.topology[current_source][target_type]["typical"].update(typical_edges) + self.topology[current_source][target_type]["prohibited"].update(prohibited_edges) + + logger.info(f"✅ [SCHEMA] Topology matrix built for {len(self.topology)} source types.") + except Exception as e: + logger.error(f"❌ [SCHEMA ERROR] {e}") def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str: """ - WP-15b: Validiert einen Kanten-Typ gegen das Vokabular und prüft Berechtigungen. - Sichert, dass nur strukturelle Prozesse System-Kanten setzen dürfen. + Löst Aliasse auf kanonische Namen auf und schützt System-Kanten. + Erhalt der v0.8.0 Schutz-Logik. """ self.ensure_latest() if not edge_type: return "related_to" - # Normalisierung des Typs clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_") ctx = context or {} - # WP-15b: System-Kanten dürfen weder manuell noch durch KI/Vererbung gesetzt werden. - # Nur Provenienz 'structure' (interne Prozesse) ist autorisiert. - # Wir blockieren hier alle Provenienzen außer 'structure'. + # Sicherheits-Gate: Schutz vor unerlaubter Nutzung von System-Kanten restricted_provenance = ["explicit", "semantic_ai", "inherited", "global_pool", "rule"] if provenance in restricted_provenance and clean_type in self.FORBIDDEN_SYSTEM_EDGES: - self._log_issue(clean_type, f"forbidden_usage_by_{provenance}", ctx) + self._log_issue(clean_type, f"forbidden_system_edge_manipulation_by_{provenance}", ctx) return "related_to" - # System-Kanten sind NUR bei struktureller Provenienz erlaubt + # System-Kanten sind NUR bei struktureller Provenienz (Code-generiert) erlaubt if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES: return clean_type - # Mapping auf kanonischen Namen (Alias-Auflösung) - if clean_type in self.canonical_map: - return self.canonical_map[clean_type] + # Alias-Auflösung + return self.canonical_map.get(clean_type, clean_type) + + def get_inverse(self, edge_type: str) -> str: + """WP-24c: Gibt das symmetrische Gegenstück zurück.""" + canonical = self.resolve(edge_type) + return self.inverse_map.get(canonical, "related_to") + + def get_topology_info(self, source_type: str, target_type: str) -> Dict[str, List[str]]: + """ + WP-24c: Liefert kontextuelle Kanten-Empfehlungen für Obsidian und das Backend. + """ + self.ensure_latest() - # Fallback und Logging unbekannter Typen für Admin-Review - self._log_issue(clean_type, "unknown_type", ctx) - return clean_type + # Hierarchische Suche: Spezifisch -> 'any' -> Empty + src_cfg = self.topology.get(source_type, self.topology.get("any", {})) + tgt_cfg = src_cfg.get(target_type, src_cfg.get("any", {"typical": set(), "prohibited": set()})) + + return { + "typical": sorted(list(tgt_cfg["typical"])), + "prohibited": sorted(list(tgt_cfg["prohibited"])) + } def _log_issue(self, edge_type: str, error_kind: str, ctx: dict): - """Detailliertes JSONL-Logging für die Vokabular-Optimierung.""" + """JSONL-Logging für unbekannte/verbotene Kanten (Erhalt v0.8.0).""" try: os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True) entry = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "edge_type": edge_type, "error": error_kind, - "file": ctx.get("file", "unknown"), - "line": ctx.get("line", "unknown"), "note_id": ctx.get("note_id", "unknown"), "provenance": ctx.get("provenance", "unknown") } with open(self.unknown_log_path, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") - except Exception: - pass + except Exception: pass -# Singleton Export für systemweiten Zugriff +# Singleton Export registry = EdgeRegistry() \ No newline at end of file