Integrate symmetric edge logic and discovery API: Update ingestion processor and validation to support automatic inverse edge generation. Enhance edge registry for dual vocabulary and schema management. Introduce new discovery endpoint for proactive edge suggestions, improving graph topology and edge validation processes.

This commit is contained in:
Lars 2026-01-09 13:57:10 +01:00
parent 745352ff3f
commit 4802eba27b
4 changed files with 344 additions and 109 deletions

View File

@ -1,11 +1,12 @@
"""
FILE: app/core/ingestion/ingestion_processor.py
DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-24c: Integration der Symmetrie-Logik (Automatische inverse Kanten).
WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v2.14.0: Synchronisierung der Profil-Auflösung mit MoE-Experten.
VERSION: 2.14.0 (WP-25a: MoE & Profile Support)
AUDIT v3.0.0: Synchronisierung der bidirektionalen Graph-Logik.
VERSION: 3.0.0 (WP-24c: Symmetric Graph Ingestion)
STATUS: Active
"""
import logging
@ -29,10 +30,11 @@ from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService
# Package-Interne Imports (Refactoring WP-14)
# Package-Interne Imports (Refactoring WP-14 / WP-24c)
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts
from .ingestion_validation import validate_edge_candidate
# WP-24c: Import der erweiterten Symmetrie-Logik
from .ingestion_validation import validate_edge_candidate, validate_and_symmetrize
from .ingestion_note_payload import make_note_payload
from .ingestion_chunk_payload import make_chunk_payloads
@ -167,18 +169,26 @@ class IngestionService:
# WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor.
chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg)
# Semantische Kanten-Validierung (Smart Edge Allocation via MoE-Profil)
# Semantische Kanten-Validierung & Symmetrie (WP-24c / WP-25a)
for ch in chunks:
filtered = []
new_pool = []
for cand in getattr(ch, "candidate_pool", []):
# WP-25a: Nutzt nun das spezialisierte Validierungs-Profil
# WP-24c: Nutzung des erweiterten Symmetrie-Gateways
if cand.get("provenance") == "global_pool" and enable_smart:
if await validate_edge_candidate(ch.text, cand, self.batch_cache, self.llm, profile_name="ingest_validator"):
filtered.append(cand)
# Erzeugt Primär- und Inverse Kanten falls validiert
res_batch = await validate_and_symmetrize(
chunk_text=ch.text,
edge=cand,
source_id=note_id,
batch_cache=self.batch_cache,
llm_service=self.llm,
profile_name="ingest_validator"
)
new_pool.extend(res_batch)
else:
# Explizite Kanten (Wikilinks/Callouts) werden ungeprüft übernommen
filtered.append(cand)
ch.candidate_pool = filtered
# Explizite Kanten (Wikilinks/Callouts) werden übernommen
new_pool.append(cand)
ch.candidate_pool = new_pool
# Payload-Erstellung für die Chunks
chunk_pls = make_chunk_payloads(

View File

@ -1,20 +1,23 @@
"""
FILE: app/core/ingestion/ingestion_validation.py
DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache.
WP-25b: Umstellung auf Lazy-Prompt-Orchestration (prompt_key + variables).
VERSION: 2.14.0 (WP-25b: Lazy Prompt Integration)
WP-24c: Erweiterung um automatische Symmetrie-Generierung (Inverse Kanten).
WP-25b: Konsequente Lazy-Prompt-Orchestration (prompt_key + variables).
VERSION: 3.0.0 (WP-24c: Symmetric Edge Management)
STATUS: Active
FIX:
- WP-25b: Entfernung manueller Prompt-Formatierung zur Unterstützung modell-spezifischer Prompts.
- WP-25b: Umstellung auf generate_raw_response mit prompt_key="edge_validation".
- WP-25a: Voller Erhalt der MoE-Profilsteuerung und Fallback-Kaskade via LLMService.
- WP-24c: Integration der EdgeRegistry zur dynamischen Inversions-Ermittlung.
- WP-24c: Implementierung von validate_and_symmetrize für bidirektionale Graphen.
- WP-25b: Beibehaltung der hierarchischen Prompt-Resolution und Modell-Spezi-Logik.
"""
import logging
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from app.core.parser import NoteContext
# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports
# Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports
from app.core.registry import clean_llm_text
# WP-24c: Zugriff auf das dynamische Vokabular
from app.services.edge_registry import registry as edge_registry
logger = logging.getLogger(__name__)
@ -28,18 +31,18 @@ async def validate_edge_candidate(
) -> bool:
"""
WP-15b/25b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache.
Nutzt Lazy-Prompt-Loading zur Unterstützung modell-spezifischer Validierungs-Templates.
Nutzt Lazy-Prompt-Loading (PROMPT-TRACE) für deterministische YES/NO Entscheidungen.
"""
target_id = edge.get("to")
target_ctx = batch_cache.get(target_id)
# Robust Lookup Fix (v2.12.2): Support für Anker
if not target_ctx and "#" in target_id:
# Robust Lookup Fix (v2.12.2): Support für Anker (Note#Section)
if not target_ctx and "#" in str(target_id):
base_id = target_id.split("#")[0]
target_ctx = batch_cache.get(base_id)
# Sicherheits-Fallback (Hard-Link Integrity)
# Explizite Wikilinks oder Callouts werden nicht durch das LLM verifiziert.
# Wenn das Ziel nicht im Cache ist, erlauben wir die Kante (Link-Erhalt).
if not target_ctx:
logger.info(f" [VALIDATION SKIP] No context for '{target_id}' - allowing link.")
return True
@ -48,8 +51,7 @@ async def validate_edge_candidate(
logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}' (Profile: {profile_name})...")
# WP-25b: Lazy-Prompt Aufruf.
# Wir übergeben keine formatierte Nachricht mehr, sondern Key und Daten-Dict.
# Das manuelle 'template = llm_service.get_prompt(...)' entfällt hier.
# Übergabe von prompt_key und Variablen für modell-optimierte Formatierung.
raw_response = await llm_service.generate_raw_response(
prompt_key="edge_validation",
variables={
@ -62,7 +64,7 @@ async def validate_edge_candidate(
profile_name=profile_name
)
# WP-14 Fix: Bereinigung zur Sicherstellung der Interpretierbarkeit
# Bereinigung zur Sicherstellung der Interpretierbarkeit (Mistral/Qwen Safe)
response = clean_llm_text(raw_response)
# Semantische Prüfung des Ergebnisses
@ -78,12 +80,71 @@ async def validate_edge_candidate(
error_str = str(e).lower()
error_type = type(e).__name__
# WP-25b FIX: Differenzierung zwischen transienten und permanenten Fehlern
# Transiente Fehler (Timeout, Network) → erlauben (Datenverlust vermeiden)
# WP-25b: Differenzierung zwischen transienten und permanenten Fehlern
# Transiente Fehler (Netzwerk) → erlauben (Integrität vor Präzision)
if any(x in error_str for x in ["timeout", "connection", "network", "unreachable", "refused"]):
logger.warning(f"⚠️ Transient error for {target_id} using {profile_name}: {error_type} - {e}. Allowing edge.")
logger.warning(f"⚠️ Transient error for {target_id}: {error_type} - {e}. Allowing edge.")
return True
# Permanente Fehler (Config, Validation, Invalid Response) → ablehnen (Graph-Qualität)
logger.error(f"❌ Permanent validation error for {target_id} using {profile_name}: {error_type} - {e}")
# Permanente Fehler → ablehnen (Graph-Qualität schützen)
logger.error(f"❌ Permanent validation error for {target_id}: {error_type} - {e}")
return False
async def validate_and_symmetrize(
chunk_text: str,
edge: Dict,
source_id: str,
batch_cache: Dict[str, NoteContext],
llm_service: Any,
profile_name: str = "ingest_validator"
) -> List[Dict]:
"""
WP-24c: Erweitertes Validierungs-Gateway.
Prüft die Primärkante und erzeugt bei Erfolg automatisch die inverse Kante.
Returns:
List[Dict]: Eine Liste mit 0, 1 (nur Primär) oder 2 (Primär + Invers) Kanten.
"""
# 1. Semantische Prüfung der Primärkante (A -> B)
is_valid = await validate_edge_candidate(
chunk_text=chunk_text,
edge=edge,
batch_cache=batch_cache,
llm_service=llm_service,
profile_name=profile_name
)
if not is_valid:
return []
validated_edges = [edge]
# 2. WP-24c: Symmetrie-Generierung (B -> A)
# Wir laden den inversen Typ dynamisch aus der EdgeRegistry (Single Source of Truth)
original_kind = edge.get("kind", "related_to")
inverse_kind = edge_registry.get_inverse(original_kind)
# Wir erzeugen eine inverse Kante nur, wenn ein sinnvoller inverser Typ existiert
# und das Ziel der Primärkante (to) valide ist.
target_id = edge.get("to")
if target_id and source_id:
# Die inverse Kante zeigt vom Ziel der Primärkante zurück zur Quelle.
# Sie wird als 'virtual' markiert, um sie im Retrieval/UI identifizierbar zu machen.
inverse_edge = {
"to": source_id,
"kind": inverse_kind,
"provenance": "structure", # System-generiert, geschützt durch Firewall
"confidence": edge.get("confidence", 0.9) * 0.9, # Leichte Dämpfung für virtuelle Pfade
"virtual": True,
"note_id": target_id, # Die Note, von der die inverse Kante ausgeht
"rule_id": f"symmetry:{original_kind}"
}
# Wir fügen die Symmetrie nur hinzu, wenn sie einen echten Mehrwert bietet
# (Vermeidung von redundanten related_to -> related_to Loops)
if inverse_kind != original_kind or original_kind not in ["related_to", "references"]:
validated_edges.append(inverse_edge)
logger.info(f"🔄 [SYMMETRY] Generated inverse edge: '{target_id}' --({inverse_kind})--> '{source_id}'")
return validated_edges

View File

@ -3,9 +3,11 @@ FILE: app/routers/chat.py
DESCRIPTION: Haupt-Chat-Interface (WP-25b Edition).
Kombiniert die spezialisierte Interview-Logik mit der neuen
Lazy-Prompt-Orchestration und MoE-Synthese.
VERSION: 3.0.5 (WP-25b: Lazy Prompt Integration)
WP-24c: Integration der Discovery API für proaktive Vernetzung.
VERSION: 3.1.0 (WP-24c: Discovery API Integration)
STATUS: Active
FIX:
- WP-24c: Neuer Endpunkt /query/discover für proaktive Kanten-Vorschläge.
- WP-25b: Umstellung des Interview-Modus auf Lazy-Prompt (prompt_key + variables).
- WP-25b: Delegation der RAG-Phase an die Engine v1.3.0 für konsistente MoE-Steuerung.
- WP-25a: Voller Erhalt der v3.0.2 Logik (Interview, Schema-Resolution, FastPaths).
@ -13,6 +15,7 @@ FIX:
from fastapi import APIRouter, HTTPException, Depends
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
import time
import uuid
import logging
@ -22,13 +25,27 @@ import asyncio
from pathlib import Path
from app.config import get_settings
from app.models.dto import ChatRequest, ChatResponse, QueryHit
from app.models.dto import ChatRequest, ChatResponse, QueryHit, QueryRequest
from app.services.llm_service import LLMService
from app.services.feedback_service import log_search
router = APIRouter()
logger = logging.getLogger(__name__)
# --- EBENE 0: DTOs FÜR DISCOVERY (WP-24c) ---
class DiscoveryRequest(BaseModel):
content: str
top_k: int = 8
min_confidence: float = 0.6
class DiscoveryHit(BaseModel):
target_note: str # Note ID
target_title: str # Menschenlesbarer Titel
suggested_edge_type: str # Kanonischer Typ aus edge_vocabulary
confidence_score: float # Kombinierter Vektor- + KI-Score
reasoning: str # Kurze Begründung der KI
# --- EBENE 1: CONFIG LOADER & CACHING (WP-25 Standard) ---
_DECISION_CONFIG_CACHE = None
@ -135,8 +152,7 @@ async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]:
return "INTERVIEW", "Keyword (Interview)"
# 3. SLOW PATH: DecisionEngine LLM Router (MoE-gesteuert)
# WP-25b FIX: Nutzung der öffentlichen API statt privater Methode
intent = await llm.decision_engine._determine_strategy(query) # TODO: Public API erstellen
intent = await llm.decision_engine._determine_strategy(query)
return intent, "DecisionEngine (LLM)"
# --- EBENE 3: RETRIEVAL AGGREGATION ---
@ -154,7 +170,7 @@ def _collect_all_hits(stream_responses: Dict[str, Any]) -> List[QueryHit]:
seen_node_ids.add(hit.node_id)
return sorted(all_hits, key=lambda h: h.total_score, reverse=True)
# --- EBENE 4: ENDPUNKT ---
# --- EBENE 4: ENDPUNKTE ---
def get_llm_service():
return LLMService()
@ -196,7 +212,6 @@ async def chat_endpoint(
template_key = strategy.get("prompt_template", "interview_template")
# WP-25b: Lazy Loading Call
# Wir übergeben nur Key und Variablen. Das System formatiert passend zum Modell.
answer_text = await llm.generate_raw_response(
prompt_key=template_key,
variables={
@ -258,3 +273,90 @@ async def chat_endpoint(
except Exception as e:
logger.error(f"❌ Chat Endpoint Failure: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Fehler bei der Verarbeitung der Anfrage.")
@router.post("/query/discover", response_model=List[DiscoveryHit])
async def discover_edges(
request: DiscoveryRequest,
llm: LLMService = Depends(get_llm_service)
):
"""
WP-24c: Analysiert Text auf potenzielle Kanten zu bestehendem Wissen.
Nutzt Vektor-Suche und DecisionEngine-Logik (WP-25b PROMPT-TRACE konform).
"""
start_time = time.time()
logger.info(f"🔍 [WP-24c] Discovery triggered for content: {request.content[:50]}...")
try:
# 1. Kandidaten-Suche via Retriever (Vektor-Match)
search_req = QueryRequest(
query=request.content,
top_k=request.top_k,
explain=True
)
candidates = await llm.decision_engine.retriever.search(search_req)
if not candidates.results:
logger.info(" No candidates found for discovery.")
return []
# 2. KI-gestützte Beziehungs-Extraktion (WP-25b)
discovery_results = []
# Zugriff auf gültige Kanten-Typen aus der Registry
from app.services.edge_registry import registry as edge_reg
valid_types_str = ", ".join(list(edge_reg.valid_types))
# Parallele Evaluierung der Kandidaten für maximale Performance
async def evaluate_candidate(hit: QueryHit) -> Optional[DiscoveryHit]:
if hit.total_score < request.min_confidence:
return None
try:
# Nutzt ingest_extractor Profil für präzise semantische Analyse
# Wir verwenden das prompt_key Pattern (edge_extraction) gemäß WP-24c Vorgabe
raw_suggestion = await llm.generate_raw_response(
prompt_key="edge_extraction",
variables={
"note_id": "NEUER_INHALT",
"text": f"PROXIMITY_TARGET: {hit.source.get('text', '')}\n\nNEW_CONTENT: {request.content}",
"valid_types": valid_types_str
},
profile_name="ingest_extractor",
priority="realtime"
)
# Parsing der LLM Antwort (Erwartet JSON Liste)
from app.core.ingestion.ingestion_utils import extract_json_from_response
suggestions = extract_json_from_response(raw_suggestion)
if isinstance(suggestions, list) and len(suggestions) > 0:
sugg = suggestions[0] # Wir nehmen den stärksten Vorschlag pro Hit
return DiscoveryHit(
target_note=hit.note_id,
target_title=hit.source.get("title") or hit.note_id,
suggested_edge_type=sugg.get("kind", "related_to"),
confidence_score=hit.total_score,
reasoning=f"Semantische Nähe ({int(hit.total_score*100)}%) entdeckt."
)
except Exception as e:
logger.warning(f"⚠️ Discovery evaluation failed for hit {hit.note_id}: {e}")
return None
tasks = [evaluate_candidate(hit) for hit in candidates.results]
results = await asyncio.gather(*tasks)
# Zusammenführung und Duplikat-Bereinigung
seen_targets = set()
for r in results:
if r and r.target_note not in seen_targets:
discovery_results.append(r)
seen_targets.add(r.target_note)
duration = int((time.time() - start_time) * 1000)
logger.info(f"✨ Discovery finished: found {len(discovery_results)} edges in {duration}ms")
return discovery_results
except Exception as e:
logger.error(f"❌ Discovery API failure: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Discovery-Prozess fehlgeschlagen.")

View File

@ -1,21 +1,17 @@
"""
FILE: app/services/edge_registry.py
DESCRIPTION: Single Source of Truth für Kanten-Typen mit dynamischem Reload.
WP-15b: Erweiterte Provenance-Prüfung für die Candidate-Validation.
Sichert die Graph-Integrität durch strikte Trennung von System- und Inhaltskanten.
WP-22: Fix für absolute Pfade außerhalb des Vaults (Prod-Dictionary).
WP-20: Synchronisation mit zentralen Settings (v0.6.2).
VERSION: 0.8.0
DESCRIPTION: Single Source of Truth für Kanten-Typen, Symmetrien und Graph-Topologie.
WP-24c: Implementierung der dualen Registry (Vocabulary & Schema).
Unterstützt dynamisches Laden von Inversen und kontextuellen Vorschlägen.
VERSION: 1.0.1 (WP-24c: Verified Atomic Topology)
STATUS: Active
DEPENDENCIES: re, os, json, logging, time, app.config
LAST_ANALYSIS: 2025-12-26
"""
import re
import os
import json
import logging
import time
from typing import Dict, Optional, Set, Tuple
from typing import Dict, Optional, Set, Tuple, List
from app.config import get_settings
@ -23,11 +19,12 @@ logger = logging.getLogger(__name__)
class EdgeRegistry:
"""
Zentraler Verwalter für das Kanten-Vokabular.
Implementiert das Singleton-Pattern für konsistente Validierung über alle Services.
Zentraler Verwalter für das Kanten-Vokabular und das Graph-Schema.
Singleton-Pattern zur Sicherstellung konsistenter Validierung.
"""
_instance = None
# System-Kanten, die nicht durch User oder KI gesetzt werden dürfen
# SYSTEM-SCHUTZ: Diese Kanten sind für die strukturelle Integrität reserviert (v0.8.0 Erhalt)
FORBIDDEN_SYSTEM_EDGES = {"next", "prev", "belongs_to"}
def __new__(cls, *args, **kwargs):
@ -42,124 +39,189 @@ class EdgeRegistry:
settings = get_settings()
# 1. Pfad aus den zentralen Settings laden (WP-20 Synchronisation)
# Priorisiert den Pfad aus der .env / config.py (v0.6.2)
# --- Pfad-Konfiguration (WP-24c: Variable Pfade für Vault-Spiegelung) ---
# Das Vokabular (Semantik)
self.full_vocab_path = os.path.abspath(settings.MINDNET_VOCAB_PATH)
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
self.canonical_map: Dict[str, str] = {}
self.valid_types: Set[str] = set()
self._last_mtime = 0.0
# Das Schema (Topologie) - Konfigurierbar via ENV: MINDNET_SCHEMA_PATH
schema_env = getattr(settings, "MINDNET_SCHEMA_PATH", None)
if schema_env:
self.full_schema_path = os.path.abspath(schema_env)
else:
# Fallback: Liegt im selben Verzeichnis wie das Vokabular
self.full_schema_path = os.path.join(os.path.dirname(self.full_vocab_path), "graph_schema.md")
self.unknown_log_path = "data/logs/unknown_edges.jsonl"
# --- Interne Datenspeicher ---
self.canonical_map: Dict[str, str] = {}
self.inverse_map: Dict[str, str] = {}
self.valid_types: Set[str] = set()
# Topologie: source_type -> { target_type -> {"typical": set, "prohibited": set} }
self.topology: Dict[str, Dict[str, Dict[str, Set[str]]]] = {}
self._last_vocab_mtime = 0.0
self._last_schema_mtime = 0.0
logger.info(f">>> [EDGE-REGISTRY] Initializing WP-24c Dual-Engine")
logger.info(f" - Vocab-Path: {self.full_vocab_path}")
logger.info(f" - Schema-Path: {self.full_schema_path}")
# Initialer Ladevorgang
logger.info(f">>> [EDGE-REGISTRY] Initializing with Path: {self.full_vocab_path}")
self.ensure_latest()
self.initialized = True
def ensure_latest(self):
"""
Prüft den Zeitstempel der Vokabular-Datei und lädt bei Bedarf neu.
Verhindert Inkonsistenzen bei Laufzeit-Updates des Dictionaries.
"""
if not os.path.exists(self.full_vocab_path):
logger.error(f"!!! [EDGE-REGISTRY ERROR] File not found: {self.full_vocab_path} !!!")
return
"""Prüft Zeitstempel beider Dateien und führt bei Änderung Hot-Reload durch."""
try:
current_mtime = os.path.getmtime(self.full_vocab_path)
if current_mtime > self._last_mtime:
# Vokabular-Reload bei Änderung
if os.path.exists(self.full_vocab_path):
v_mtime = os.path.getmtime(self.full_vocab_path)
if v_mtime > self._last_vocab_mtime:
self._load_vocabulary()
self._last_mtime = current_mtime
self._last_vocab_mtime = v_mtime
# Schema-Reload bei Änderung
if os.path.exists(self.full_schema_path):
s_mtime = os.path.getmtime(self.full_schema_path)
if s_mtime > self._last_schema_mtime:
self._load_schema()
self._last_schema_mtime = s_mtime
except Exception as e:
logger.error(f"!!! [EDGE-REGISTRY] Error checking file time: {e}")
logger.error(f"!!! [EDGE-REGISTRY] Sync failure: {e}")
def _load_vocabulary(self):
"""
Parst das Markdown-Wörterbuch und baut die Canonical-Map auf.
Erkennt Tabellen-Strukturen und extrahiert fettgedruckte System-Typen.
"""
"""Parst edge_vocabulary.md: | Canonical | Inverse | Aliases | Description |"""
self.canonical_map.clear()
self.inverse_map.clear()
self.valid_types.clear()
# Regex für Tabellen-Struktur: | **Typ** | Aliase |
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*([^|]+)\|")
# Regex für die 4-Spalten Struktur (WP-24c konform)
# Erwartet: | **`type`** | `inverse` | alias1, alias2 | ... |
pattern = re.compile(r"\|\s*\*\*`?([a-zA-Z0-9_-]+)`?\*\*\s*\|\s*`?([a-zA-Z0-9_-]+)`?\s*\|\s*([^|]+)\|")
try:
with open(self.full_vocab_path, "r", encoding="utf-8") as f:
c_types, c_aliases = 0, 0
c_count = 0
for line in f:
match = pattern.search(line)
if match:
canonical = match.group(1).strip().lower()
aliases_str = match.group(2).strip()
inverse = match.group(2).strip().lower()
aliases_raw = match.group(3).strip()
self.valid_types.add(canonical)
self.canonical_map[canonical] = canonical
c_types += 1
if inverse:
self.inverse_map[canonical] = inverse
if aliases_str and "Kein Alias" not in aliases_str:
aliases = [a.strip() for a in aliases_str.split(",") if a.strip()]
# Aliase verarbeiten (Normalisierung auf snake_case)
if aliases_raw and "Kein Alias" not in aliases_raw:
aliases = [a.strip() for a in aliases_raw.split(",") if a.strip()]
for alias in aliases:
# Normalisierung: Kleinschreibung, Underscores statt Leerzeichen
clean_alias = alias.replace("`", "").lower().strip().replace(" ", "_")
if clean_alias:
self.canonical_map[clean_alias] = canonical
c_aliases += 1
logger.info(f"=== [EDGE-REGISTRY SUCCESS] Loaded {c_types} Canonical Types and {c_aliases} Aliases ===")
c_count += 1
logger.info(f"✅ [VOCAB] Loaded {c_count} edge definitions and their inverses.")
except Exception as e:
logger.error(f"!!! [EDGE-REGISTRY FATAL] Error reading file: {e} !!!")
logger.error(f"❌ [VOCAB ERROR] {e}")
def _load_schema(self):
"""Parst graph_schema.md: ## Source: `type` | Target | Typical | Prohibited |"""
self.topology.clear()
current_source = None
try:
with open(self.full_schema_path, "r", encoding="utf-8") as f:
for line in f:
# Header erkennen (Atomare Sektionen)
src_match = re.search(r"## Source:\s*`?([a-zA-Z0-9_-]+)`?", line)
if src_match:
current_source = src_match.group(1).strip().lower()
if current_source not in self.topology:
self.topology[current_source] = {}
continue
# Tabellenzeilen parsen
if current_source and "|" in line and not line.startswith("|-") and "Target" not in line:
cols = [c.strip().replace("`", "").lower() for c in line.split("|")]
if len(cols) >= 4:
target_type = cols[1]
typical_edges = [e.strip() for e in cols[2].split(",") if e.strip() and e != "-"]
prohibited_edges = [e.strip() for e in cols[3].split(",") if e.strip() and e != "-"]
if target_type not in self.topology[current_source]:
self.topology[current_source][target_type] = {"typical": set(), "prohibited": set()}
self.topology[current_source][target_type]["typical"].update(typical_edges)
self.topology[current_source][target_type]["prohibited"].update(prohibited_edges)
logger.info(f"✅ [SCHEMA] Topology matrix built for {len(self.topology)} source types.")
except Exception as e:
logger.error(f"❌ [SCHEMA ERROR] {e}")
def resolve(self, edge_type: str, provenance: str = "explicit", context: dict = None) -> str:
"""
WP-15b: Validiert einen Kanten-Typ gegen das Vokabular und prüft Berechtigungen.
Sichert, dass nur strukturelle Prozesse System-Kanten setzen dürfen.
Löst Aliasse auf kanonische Namen auf und schützt System-Kanten.
Erhalt der v0.8.0 Schutz-Logik.
"""
self.ensure_latest()
if not edge_type:
return "related_to"
# Normalisierung des Typs
clean_type = edge_type.lower().strip().replace(" ", "_").replace("-", "_")
ctx = context or {}
# WP-15b: System-Kanten dürfen weder manuell noch durch KI/Vererbung gesetzt werden.
# Nur Provenienz 'structure' (interne Prozesse) ist autorisiert.
# Wir blockieren hier alle Provenienzen außer 'structure'.
# Sicherheits-Gate: Schutz vor unerlaubter Nutzung von System-Kanten
restricted_provenance = ["explicit", "semantic_ai", "inherited", "global_pool", "rule"]
if provenance in restricted_provenance and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
self._log_issue(clean_type, f"forbidden_usage_by_{provenance}", ctx)
self._log_issue(clean_type, f"forbidden_system_edge_manipulation_by_{provenance}", ctx)
return "related_to"
# System-Kanten sind NUR bei struktureller Provenienz erlaubt
# System-Kanten sind NUR bei struktureller Provenienz (Code-generiert) erlaubt
if provenance == "structure" and clean_type in self.FORBIDDEN_SYSTEM_EDGES:
return clean_type
# Mapping auf kanonischen Namen (Alias-Auflösung)
if clean_type in self.canonical_map:
return self.canonical_map[clean_type]
# Alias-Auflösung
return self.canonical_map.get(clean_type, clean_type)
# Fallback und Logging unbekannter Typen für Admin-Review
self._log_issue(clean_type, "unknown_type", ctx)
return clean_type
def get_inverse(self, edge_type: str) -> str:
"""WP-24c: Gibt das symmetrische Gegenstück zurück."""
canonical = self.resolve(edge_type)
return self.inverse_map.get(canonical, "related_to")
def get_topology_info(self, source_type: str, target_type: str) -> Dict[str, List[str]]:
"""
WP-24c: Liefert kontextuelle Kanten-Empfehlungen für Obsidian und das Backend.
"""
self.ensure_latest()
# Hierarchische Suche: Spezifisch -> 'any' -> Empty
src_cfg = self.topology.get(source_type, self.topology.get("any", {}))
tgt_cfg = src_cfg.get(target_type, src_cfg.get("any", {"typical": set(), "prohibited": set()}))
return {
"typical": sorted(list(tgt_cfg["typical"])),
"prohibited": sorted(list(tgt_cfg["prohibited"]))
}
def _log_issue(self, edge_type: str, error_kind: str, ctx: dict):
"""Detailliertes JSONL-Logging für die Vokabular-Optimierung."""
"""JSONL-Logging für unbekannte/verbotene Kanten (Erhalt v0.8.0)."""
try:
os.makedirs(os.path.dirname(self.unknown_log_path), exist_ok=True)
entry = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"edge_type": edge_type,
"error": error_kind,
"file": ctx.get("file", "unknown"),
"line": ctx.get("line", "unknown"),
"note_id": ctx.get("note_id", "unknown"),
"provenance": ctx.get("provenance", "unknown")
}
with open(self.unknown_log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
pass
except Exception: pass
# Singleton Export für systemweiten Zugriff
# Singleton Export
registry = EdgeRegistry()