mindnet/app/core/graph/graph_utils.py
Lars 509efc9393 Implement WP-26 v1.3 (Phase 3): Enhance graph schema validation and edge handling
- Introduced a new function `load_graph_schema_full` to parse and cache both typical and prohibited edge types from the graph schema.
- Updated `load_graph_schema` to utilize the full schema for improved edge type extraction.
- Added `get_topology_info` to retrieve typical and prohibited edges for source/target pairs.
- Implemented `validate_intra_note_edge` and `validate_edge_against_schema` for schema validation of intra-note edges.
- Enhanced logging for schema validation outcomes and edge handling.
- Updated documentation to reflect new validation features and testing procedures.
2026-01-26 10:18:31 +01:00

500 lines
19 KiB
Python

"""
FILE: app/core/graph/graph_utils.py
DESCRIPTION: Basale Werkzeuge, ID-Generierung und Provenance-Konfiguration für den Graphen.
AUDIT v4.0.0:
- GOLD-STANDARD v4.0.0: Strikte 4-Parameter-ID für Kanten (kind, source, target, scope).
- Eliminiert ID-Inkonsistenz zwischen Phase 1 (Autorität) und Phase 2 (Symmetrie).
- rule_id und variant werden ignoriert in der ID-Generierung (nur im Payload gespeichert).
- Fix für das "Steinzeitaxt"-Problem durch konsistente ID-Generierung.
VERSION: 4.0.0 (WP-24c: Gold-Standard Identity)
STATUS: Active
"""
import os
import uuid
import hashlib
from typing import Dict, Iterable, List, Optional, Set, Any, Tuple
try:
import yaml
except ImportError:
yaml = None
# WP-26 v1.0: Provenance-Literale auf valide EdgeDTO-Werte reduziert
# Legacy-Prioritäten für interne Verarbeitung (werden zu source_hint gemappt)
PROVENANCE_PRIORITY = {
# Explizite Kanten (provenance: "explicit")
"explicit:wikilink": 1.00,
"inline:rel": 0.95,
"callout:edge": 0.90,
"explicit:callout": 0.90,
"explicit:note_scope": 1.00,
"explicit:note_zone": 1.00,
# Regel-basierte Kanten (provenance: "rule")
"derived:backlink": 0.90,
"edge_defaults": 0.70,
"schema_default": 0.85,
"inferred:section_transition": 0.85, # WP-26 v1.1: Automatische Section-Übergänge
# Struktur-Kanten (provenance: "structure")
"structure:belongs_to": 1.00,
"structure:order": 0.95,
# KI-generierte Kanten (provenance: "smart")
"semantic_ai": 0.90,
"global_pool": 0.80,
}
# WP-26 v1.0: Mapping von internen Provenance-Werten zu EdgeDTO-konformen Literalen
PROVENANCE_TO_DTO = {
# explicit
"explicit:wikilink": ("explicit", "wikilink"),
"explicit:callout": ("explicit", "callout"),
"explicit:note_scope": ("explicit", "note_scope"),
"explicit:note_zone": ("explicit", "note_zone"),
"inline:rel": ("explicit", "inline_rel"),
"callout:edge": ("explicit", "callout"),
"explicit": ("explicit", None),
# rule
"derived:backlink": ("rule", "backlink"),
"edge_defaults": ("rule", "edge_defaults"),
"schema_default": ("rule", "schema_default"),
"inferred:schema": ("rule", "schema_default"),
"inferred:section_transition": ("rule", "schema_default"), # WP-26 v1.1
"rule": ("rule", None),
# structure
"structure:belongs_to": ("structure", "belongs_to"),
"structure:order": ("structure", "order"),
"structure": ("structure", None),
# smart
"semantic_ai": ("smart", None),
"global_pool": ("smart", "global_pool"),
"smart": ("smart", None),
}
def normalize_provenance(internal_provenance: str) -> Tuple[str, Optional[str]]:
"""
WP-26 v1.0: Normalisiert interne Provenance-Werte zu EdgeDTO-konformen Literalen.
Args:
internal_provenance: Interner Provenance-String (z.B. "explicit:callout")
Returns:
Tuple (provenance, source_hint) mit validen EdgeDTO-Werten
"""
if internal_provenance in PROVENANCE_TO_DTO:
return PROVENANCE_TO_DTO[internal_provenance]
# Fallback: Versuche Präfix-Matching
if internal_provenance.startswith("explicit"):
return ("explicit", internal_provenance.split(":")[-1] if ":" in internal_provenance else None)
if internal_provenance.startswith("structure"):
return ("structure", internal_provenance.split(":")[-1] if ":" in internal_provenance else None)
if internal_provenance.startswith("rule") or internal_provenance.startswith("derived"):
return ("rule", internal_provenance.split(":")[-1] if ":" in internal_provenance else None)
# Default: explicit ohne source_hint
return ("explicit", None)
# ---------------------------------------------------------------------------
# Pfad-Auflösung (Integration der .env Umgebungsvariablen)
# ---------------------------------------------------------------------------
def get_vocab_path() -> str:
"""Liefert den Pfad zum Edge-Vokabular aus der .env oder den Default."""
return os.getenv("MINDNET_VOCAB_PATH", "/mindnet/vault/mindnet/_system/dictionary/edge_vocabulary.md")
def get_schema_path() -> str:
"""Liefert den Pfad zum Graph-Schema aus der .env oder den Default."""
return os.getenv("MINDNET_SCHEMA_PATH", "/mindnet/vault/mindnet/_system/dictionary/graph_schema.md")
# ---------------------------------------------------------------------------
# ID & String Helper
# ---------------------------------------------------------------------------
def _get(d: dict, *keys, default=None):
"""Sicherer Zugriff auf tief verschachtelte Dictionary-Keys."""
for k in keys:
if isinstance(d, dict) and k in d and d[k] is not None:
return d[k]
return default
def _dedupe_seq(seq: Iterable[str]) -> List[str]:
"""Dedupliziert eine Sequenz von Strings unter Beibehaltung der Reihenfolge."""
seen: Set[str] = set()
out: List[str] = []
for s in seq:
if s not in seen:
seen.add(s)
out.append(s)
return out
def parse_link_target(raw: str, current_note_id: Optional[str] = None) -> Tuple[str, Optional[str]]:
"""
Trennt einen Obsidian-Link [[Target#Section]] in seine Bestandteile Target und Section.
Behandelt Self-Links (z.B. [[#Ziele]]), indem die aktuelle note_id eingesetzt wird.
WP-26 v1.1: Extrahiert Block-ID aus Section-Strings.
- Wenn Section "^block-id" enthält, wird nur der Block-ID-Teil extrahiert
- Beispiel: "📖 Diagnose: Glioblastom ^kontext" -> section = "kontext"
- Beispiel: "^learning" -> section = "learning"
- Beispiel: " ^sit" (nur Block-ID) -> section = "sit"
Returns:
Tuple (target_id, target_section)
"""
if not raw:
return "", None
parts = raw.split("#", 1)
target = parts[0].strip()
section = parts[1].strip() if len(parts) > 1 else None
# WP-26 v1.1: Block-ID-Extraktion aus Section
# Wenn die Section ein "^" enthält, extrahiere nur den Block-ID-Teil
if section and "^" in section:
# Finde den ^block-id Teil
import re
block_id_match = re.search(r'\^([a-zA-Z0-9_-]+)', section)
if block_id_match:
# Ersetze die gesamte Section durch nur die Block-ID
section = block_id_match.group(1)
# Spezialfall: Self-Link innerhalb derselben Datei
if not target and section and current_note_id:
target = current_note_id
return target, section
def _mk_edge_id(kind: str, s: str, t: str, scope: str, target_section: Optional[str] = None) -> str:
"""
WP-24c v4.0.0: DER GLOBALE STANDARD für Kanten-IDs.
Erzeugt eine deterministische UUIDv5. Dies stellt sicher, dass manuelle Links
und systemgenerierte Symmetrien dieselbe Point-ID in Qdrant erhalten.
GOLD-STANDARD v4.0.0: Die ID basiert STRICT auf vier Parametern:
f"edge:{kind}:{source}:{target}:{scope}"
Die Parameter rule_id und variant werden IGNORIERT und fließen NICHT in die ID ein.
Sie können weiterhin im Payload gespeichert werden, haben aber keinen Einfluss auf die Identität.
Args:
kind: Typ der Relation (z.B. 'mastered_by')
s: Kanonische ID der Quell-Note
t: Kanonische ID der Ziel-Note
scope: Granularität (Standard: 'note')
rule_id: Optionale ID der Regel (aus graph_derive_edges) - IGNORIERT in ID-Generierung
variant: Optionale Variante für multiple Links zum selben Ziel - IGNORIERT in ID-Generierung
"""
if not all([kind, s, t]):
raise ValueError(f"Incomplete data for edge ID: kind={kind}, src={s}, tgt={t}")
# Der String enthält nun alle distinkten semantischen Merkmale
base = f"edge:{kind}:{s}:{t}:{scope}"
# Wenn ein Link auf eine spezifische Sektion zeigt, ist es eine andere Relation
if target_section:
base += f":{target_section}"
return str(uuid.uuid5(uuid.NAMESPACE_URL, base))
def _edge(kind: str, scope: str, source_id: str, target_id: str, note_id: str, extra: Optional[dict] = None) -> dict:
"""
Konstruiert ein standardisiertes Kanten-Payload für Qdrant.
WP-26 v1.0: Erweitert um is_internal Flag und Provenance-Normalisierung.
Args:
kind: Kantentyp (z.B. "derives", "caused_by")
scope: Granularität ("chunk" oder "note")
source_id: ID der Quelle (Chunk oder Note)
target_id: ID des Ziels (Chunk oder Note)
note_id: ID der Note (für Kontext)
extra: Zusätzliche Payload-Felder
"""
pl = {
"kind": kind,
"relation": kind,
"scope": scope,
"source_id": source_id,
"target_id": target_id,
"note_id": note_id,
"virtual": False # Standardmäßig explizit, solange nicht anders in Phase 2 gesetzt
}
# WP-26 v1.0: is_internal Flag berechnen
# Intra-Note-Edge: Source und Target gehören zur gleichen Note
source_note = source_id.split("#")[0] if "#" in source_id else source_id
target_note = target_id.split("#")[0] if "#" in target_id else target_id
pl["is_internal"] = (source_note == target_note) or (source_note == note_id and target_note == note_id)
if extra:
pl.update(extra)
# WP-26 v1.0: Provenance normalisieren, falls vorhanden
if "provenance" in extra:
internal_prov = extra["provenance"]
dto_prov, source_hint = normalize_provenance(internal_prov)
pl["provenance"] = dto_prov
if source_hint:
pl["source_hint"] = source_hint
return pl
# ---------------------------------------------------------------------------
# Registry Operations
# ---------------------------------------------------------------------------
def load_types_registry() -> dict:
"""
Lädt die zentrale YAML-Registry (types.yaml).
Pfad wird über die Umgebungsvariable MINDNET_TYPES_FILE gesteuert.
"""
p = os.getenv("MINDNET_TYPES_FILE", "./config/types.yaml")
if not os.path.isfile(p) or yaml is None:
return {}
try:
with open(p, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
return data if data is not None else {}
except Exception:
return {}
def get_edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]:
"""
Ermittelt die konfigurierten Standard-Kanten für einen Note-Typ.
Greift bei Bedarf auf die globalen Defaults in der Registry zurück.
"""
types_map = reg.get("types", reg) if isinstance(reg, dict) else {}
if note_type and isinstance(types_map, dict):
t_cfg = types_map.get(note_type)
if isinstance(t_cfg, dict) and isinstance(t_cfg.get("edge_defaults"), list):
return [str(x) for x in t_cfg["edge_defaults"]]
# Fallback auf globale Defaults
for key in ("defaults", "default", "global"):
v = reg.get(key)
if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list):
return [str(x) for x in v["edge_defaults"] if isinstance(x, str)]
return []
# ---------------------------------------------------------------------------
# WP-26 v1.1: Graph-Schema Parser für automatische Edge-Typ-Ableitung
# ---------------------------------------------------------------------------
# Cache für geladenes Schema (vermeidet mehrfaches Parsen)
_GRAPH_SCHEMA_CACHE: Optional[Dict[str, Dict[str, List[str]]]] = None
# WP-26 v1.3: Erweitertes Schema mit prohibited edges
_GRAPH_SCHEMA_FULL_CACHE: Optional[Dict[str, Dict[str, Dict[str, List[str]]]]] = None
def load_graph_schema() -> Dict[str, Dict[str, List[str]]]:
"""
WP-26 v1.1: Parst das graph_schema.md und extrahiert Typical Edge-Types.
Das Schema hat folgendes Format:
## Source: `experience`
| Target-Note-type | Typical Edge-Types | Prohibited Edge-Types |
| :--- | :--- | :--- |
| `event` | `caused_by` | `consists_of` |
Returns:
Dict[source_type, Dict[target_type, List[typical_edges]]]
Beispiel: {"experience": {"event": ["caused_by"], "insight": ["resulted_in"]}}
"""
global _GRAPH_SCHEMA_CACHE
if _GRAPH_SCHEMA_CACHE is not None:
return _GRAPH_SCHEMA_CACHE
# Nutze das erweiterte Schema und extrahiere nur typical
full_schema = load_graph_schema_full()
schema: Dict[str, Dict[str, List[str]]] = {}
for source_type, targets in full_schema.items():
schema[source_type] = {}
for target_type, edge_info in targets.items():
schema[source_type][target_type] = edge_info.get("typical", [])
_GRAPH_SCHEMA_CACHE = schema
return schema
def load_graph_schema_full() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
"""
WP-26 v1.3: Parst das graph_schema.md und extrahiert sowohl Typical als auch Prohibited Edge-Types.
Returns:
Dict[source_type, Dict[target_type, {"typical": [...], "prohibited": [...]}]]
Beispiel: {"experience": {"event": {"typical": ["caused_by"], "prohibited": ["consists_of"]}}}
"""
global _GRAPH_SCHEMA_FULL_CACHE
if _GRAPH_SCHEMA_FULL_CACHE is not None:
return _GRAPH_SCHEMA_FULL_CACHE
import re
import logging
logger = logging.getLogger(__name__)
schema_path = get_schema_path()
# Versuche verschiedene Pfade
paths_to_try = [
schema_path,
os.path.join(os.getcwd(), "config", "graph_schema.md"),
os.path.join(os.path.dirname(__file__), "..", "..", "..", "config", "graph_schema.md"),
]
# Falls MINDNET_OBSIDIAN_DICTIONARY gesetzt ist, nutze diesen Pfad
obsidian_dict = os.getenv("MINDNET_OBSIDIAN_DICTIONARY")
if obsidian_dict:
paths_to_try.insert(0, os.path.join(obsidian_dict, "graph_schema.md"))
content = None
for path in paths_to_try:
if os.path.isfile(path):
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
logger.debug(f"Graph-Schema geladen von: {path}")
break
except Exception as e:
logger.warning(f"Fehler beim Laden von {path}: {e}")
if not content:
logger.warning("Graph-Schema nicht gefunden. Fallback auf leeres Schema.")
_GRAPH_SCHEMA_FULL_CACHE = {}
return _GRAPH_SCHEMA_FULL_CACHE
schema: Dict[str, Dict[str, Dict[str, List[str]]]] = {}
current_source = None
# Regex für Source-Header: ## Source: `experience`
source_pattern = re.compile(r'^##\s+Source:\s*`(\w+)`', re.IGNORECASE)
# Regex für Tabellen-Zeile: | `event` | `caused_by` | `consists_of` |
# oder | `event` | `caused_by`, `resulted_in` | - |
table_row_pattern = re.compile(
r'^\|\s*`(\w+)`\s*\|\s*([^|]+)\s*\|\s*([^|]*)\s*\|'
)
edge_pattern = re.compile(r'`(\w+)`')
for line in content.split('\n'):
stripped = line.strip()
# Prüfe auf Source-Header
source_match = source_pattern.match(stripped)
if source_match:
current_source = source_match.group(1).lower()
if current_source not in schema:
schema[current_source] = {}
continue
# Prüfe auf Tabellen-Zeile (nur wenn wir einen Source haben)
if current_source:
row_match = table_row_pattern.match(stripped)
if row_match:
target_type = row_match.group(1).lower()
typical_edges_raw = row_match.group(2).strip()
prohibited_edges_raw = row_match.group(3).strip()
# Parse die Edge-Types
typical_edges = edge_pattern.findall(typical_edges_raw)
prohibited_edges = edge_pattern.findall(prohibited_edges_raw)
schema[current_source][target_type] = {
"typical": typical_edges,
"prohibited": prohibited_edges
}
logger.info(f"Graph-Schema (full) geladen: {len(schema)} Source-Types")
_GRAPH_SCHEMA_FULL_CACHE = schema
return schema
def get_topology_info(source_type: str, target_type: str) -> Dict[str, List[str]]:
"""
WP-26 v1.3: Ermittelt Typical und Prohibited Edge-Types für ein Typ-Paar.
Args:
source_type: Typ der Quell-Sektion (z.B. "experience")
target_type: Typ der Ziel-Sektion (z.B. "insight")
Returns:
Dict mit "typical" und "prohibited" Listen
Beispiel: {"typical": ["resulted_in"], "prohibited": ["solves"]}
"""
schema = load_graph_schema_full()
source_lower = source_type.lower() if source_type else "default"
target_lower = target_type.lower() if target_type else "any"
result = {"typical": [], "prohibited": []}
# 1. Exakter Match
if source_lower in schema and target_lower in schema[source_lower]:
return schema[source_lower][target_lower]
# 2. Fallback auf "any" Target
if source_lower in schema and "any" in schema[source_lower]:
return schema[source_lower]["any"]
# 3. Fallback auf "default" Source
if "default" in schema:
if target_lower in schema["default"]:
return schema["default"][target_lower]
if "any" in schema["default"]:
return schema["default"]["any"]
# 4. Absoluter Fallback: alles erlaubt
return {"typical": ["related_to", "references"], "prohibited": []}
def get_typical_edge_for(source_type: str, target_type: str) -> Optional[str]:
"""
WP-26 v1.1: Ermittelt den ersten "Typical Edge-Type" für ein Typ-Paar.
Args:
source_type: Typ der Quell-Sektion (z.B. "experience")
target_type: Typ der Ziel-Sektion (z.B. "insight")
Returns:
Der erste Typical Edge-Type (z.B. "resulted_in") oder None
"""
schema = load_graph_schema()
source_lower = source_type.lower() if source_type else "default"
target_lower = target_type.lower() if target_type else "any"
# 1. Exakter Match
if source_lower in schema:
source_rules = schema[source_lower]
if target_lower in source_rules:
edges = source_rules[target_lower]
if edges:
return edges[0]
# 2. Fallback auf "any" Target
if "any" in source_rules:
edges = source_rules["any"]
if edges:
return edges[0]
# 3. Fallback auf "default" Source
if "default" in schema:
default_rules = schema["default"]
if target_lower in default_rules:
edges = default_rules[target_lower]
if edges:
return edges[0]
if "any" in default_rules:
edges = default_rules["any"]
if edges:
return edges[0]
# 4. Absoluter Fallback
return "related_to"
def clear_graph_schema_cache():
"""
WP-26 v1.1: Löscht den Cache für das Graph-Schema.
Nützlich für Tests oder wenn das Schema neu geladen werden soll.
WP-26 v1.3: Löscht auch den erweiterten Schema-Cache.
"""
global _GRAPH_SCHEMA_CACHE, _GRAPH_SCHEMA_FULL_CACHE
_GRAPH_SCHEMA_CACHE = None
_GRAPH_SCHEMA_FULL_CACHE = None