Enhance chunking strategies and graph utilities for section-type transitions

- Implemented WP-26 v1.1: Section-Type-Wechsel erzwingt immer einen neuen Chunk, um konsistente Chunking-Verhalten bei unterschiedlichen section_types zu gewährleisten.
- Introduced automatic Intra-Note-Edges zwischen Sektionen mit unterschiedlichen Typen, um semantische Beziehungen zu erfassen.
- Updated graph utilities to support automatic edge type derivation based on section transitions.
- Added unit tests for section-type changes and automatic edge generation to ensure functionality and reliability.
This commit is contained in:
Lars 2026-01-25 17:36:57 +01:00
parent cc258008dc
commit af3cc0a254
4 changed files with 505 additions and 50 deletions

View File

@ -7,6 +7,8 @@ DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.9.
- Fix: Synchronisierung der Parameter mit dem Orchestrator (context_prefix). - Fix: Synchronisierung der Parameter mit dem Orchestrator (context_prefix).
WP-24c v4.2.5: Strict-Mode ohne Carry-Over - Bei strict_heading_split wird nach jeder Sektion geflasht. WP-24c v4.2.5: Strict-Mode ohne Carry-Over - Bei strict_heading_split wird nach jeder Sektion geflasht.
WP-26 v1.0: section_type und block_id werden an Chunks weitergegeben. WP-26 v1.0: section_type und block_id werden an Chunks weitergegeben.
WP-26 v1.1: Section-Type-Wechsel erzwingt IMMER einen neuen Chunk (unabhängig vom Profil).
Gleiche section_types folgen dem normalen Chunking-Verhalten.
""" """
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from .chunking_models import RawBlock, Chunk from .chunking_models import RawBlock, Chunk
@ -53,11 +55,16 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
# --- SCHRITT 1: Gruppierung in atomare Sektions-Einheiten --- # --- SCHRITT 1: Gruppierung in atomare Sektions-Einheiten ---
# WP-26 v1.0: Erweitert um section_type und block_id Tracking # WP-26 v1.0: Erweitert um section_type und block_id Tracking
# WP-26 v1.1: Section-Type-Wechsel erzwingt IMMER eine neue Sektion (unabhängig vom Profil)
sections: List[Dict[str, Any]] = [] sections: List[Dict[str, Any]] = []
curr_blocks = [] curr_blocks = []
for b in blocks: current_section_type = None # WP-26 v1.1: Tracking des aktuellen section_type
if b.kind == "heading" and b.level <= split_level:
if curr_blocks: def _flush_section():
"""Hilfsfunktion zum Abschließen einer Sektion."""
nonlocal curr_blocks, current_section_type
if not curr_blocks:
return
# WP-26 v1.0: Finde den effektiven section_type und block_id für diese Sektion # WP-26 v1.0: Finde den effektiven section_type und block_id für diese Sektion
# Priorisiere den ersten Block mit section_type, sonst den Heading-Block # Priorisiere den ersten Block mit section_type, sonst den Heading-Block
effective_section_type = None effective_section_type = None
@ -75,26 +82,41 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
"section_type": effective_section_type, "section_type": effective_section_type,
"block_id": effective_block_id "block_id": effective_block_id
}) })
curr_blocks = [b] curr_blocks = []
else: current_section_type = effective_section_type
curr_blocks.append(b)
if curr_blocks:
# WP-26 v1.0: Gleiche Logik für den letzten Block
effective_section_type = None
effective_block_id = None
for cb in curr_blocks:
if cb.section_type and effective_section_type is None:
effective_section_type = cb.section_type
if cb.block_id and effective_block_id is None:
effective_block_id = cb.block_id
sections.append({ for b in blocks:
"text": "\n\n".join([x.text for x in curr_blocks]), # WP-26 v1.1: Prüfe auf Section-Type-Wechsel BEVOR wir den Block hinzufügen
"meta": curr_blocks[0], # Wenn sich der section_type ändert, muss die aktuelle Sektion abgeschlossen werden
"is_empty": len(curr_blocks) == 1 and curr_blocks[0].kind == "heading", block_section_type = b.section_type
"section_type": effective_section_type,
"block_id": effective_block_id # Section-Type-Wechsel erkannt?
}) # (Wechsel ist: alter Typ != neuer Typ UND mindestens einer ist nicht None)
is_section_type_change = (
curr_blocks and # Es gibt bereits Blöcke
block_section_type is not None and # Neuer Block hat expliziten section_type
current_section_type != block_section_type # Typ hat sich geändert
)
if b.kind == "heading" and b.level <= split_level:
# Heading-basierter Split (Standard-Verhalten)
_flush_section()
curr_blocks = [b]
current_section_type = block_section_type # Update tracking
elif is_section_type_change:
# WP-26 v1.1: Section-Type-Wechsel erzwingt neuen Chunk
_flush_section()
curr_blocks = [b]
current_section_type = block_section_type
else:
# Normales Hinzufügen zum aktuellen Block
curr_blocks.append(b)
# Update section_type wenn Block einen hat und wir noch keinen haben
if block_section_type and not current_section_type:
current_section_type = block_section_type
# Letzte Sektion abschließen
_flush_section()
# --- SCHRITT 2: Verarbeitung der Queue --- # --- SCHRITT 2: Verarbeitung der Queue ---
queue = list(sections) queue = list(sections)
@ -217,14 +239,17 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note
""" """
Standard-Sliding-Window für flache Texte ohne Sektionsfokus. Standard-Sliding-Window für flache Texte ohne Sektionsfokus.
WP-26 v1.0: Erweitert um section_type und block_id Weitergabe. WP-26 v1.0: Erweitert um section_type und block_id Weitergabe.
WP-26 v1.1: Section-Type-Wechsel erzwingt IMMER einen neuen Chunk.
""" """
target = config.get("target", 400); max_tokens = config.get("max", 600) target = config.get("target", 400); max_tokens = config.get("max", 600)
chunks: List[Chunk] = []; buf: List[RawBlock] = [] chunks: List[Chunk] = []; buf: List[RawBlock] = []
current_section_type = None # WP-26 v1.1: Tracking des aktuellen section_type
for b in blocks: def _flush_buffer():
b_tokens = estimate_tokens(b.text) """Hilfsfunktion zum Flushen des Buffers."""
curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 nonlocal buf, current_section_type
if curr_tokens + b_tokens > max_tokens and buf: if not buf:
return
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) txt = "\n\n".join([x.text for x in buf]); idx = len(chunks)
win = _create_win(context_prefix, buf[0].section_title, txt) win = _create_win(context_prefix, buf[0].section_title, txt)
# WP-26 v1.0: Finde effektiven section_type und block_id # WP-26 v1.0: Finde effektiven section_type und block_id
@ -232,26 +257,36 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note
effective_block_id = next((b.block_id for b in buf if b.block_id), None) effective_block_id = next((b.block_id for b in buf if b.block_id), None)
chunks.append(Chunk( chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=curr_tokens, text=txt, window=win, token_count=sum(estimate_tokens(x.text) for x in buf),
section_title=buf[0].section_title, section_path=buf[0].section_path, section_title=buf[0].section_title, section_path=buf[0].section_path,
neighbors_prev=None, neighbors_next=None, neighbors_prev=None, neighbors_next=None,
section_type=effective_section_type, block_id=effective_block_id section_type=effective_section_type, block_id=effective_block_id
)) ))
buf = [] buf = []
buf.append(b) current_section_type = effective_section_type
if buf: for b in blocks:
txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) b_tokens = estimate_tokens(b.text)
win = _create_win(context_prefix, buf[0].section_title, txt) curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0
# WP-26 v1.0: Finde effektiven section_type und block_id block_section_type = b.section_type
effective_section_type = next((b.section_type for b in buf if b.section_type), None)
effective_block_id = next((b.block_id for b in buf if b.block_id), None) # WP-26 v1.1: Prüfe auf Section-Type-Wechsel
chunks.append(Chunk( is_section_type_change = (
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, buf and # Es gibt bereits Blöcke im Buffer
text=txt, window=win, token_count=estimate_tokens(txt), block_section_type is not None and # Neuer Block hat expliziten section_type
section_title=buf[0].section_title, section_path=buf[0].section_path, current_section_type != block_section_type # Typ hat sich geändert
neighbors_prev=None, neighbors_next=None, )
section_type=effective_section_type, block_id=effective_block_id
)) # Flush wenn: Token-Limit überschritten ODER Section-Type-Wechsel
if (curr_tokens + b_tokens > max_tokens and buf) or is_section_type_change:
_flush_buffer()
buf.append(b)
# Update section_type wenn Block einen hat und wir noch keinen haben
if block_section_type and not current_section_type:
current_section_type = block_section_type
# Letzten Buffer flushen
_flush_buffer()
return chunks return chunks

View File

@ -32,7 +32,8 @@ import logging
from typing import List, Optional, Dict, Tuple, Set from typing import List, Optional, Dict, Tuple, Set
from .graph_utils import ( from .graph_utils import (
_get, _edge, _mk_edge_id, _dedupe_seq, parse_link_target, _get, _edge, _mk_edge_id, _dedupe_seq, parse_link_target,
PROVENANCE_PRIORITY, load_types_registry, get_edge_defaults_for PROVENANCE_PRIORITY, load_types_registry, get_edge_defaults_for,
get_typical_edge_for # WP-26 v1.1: Für automatische Intra-Note-Edges
) )
from .graph_extractors import ( from .graph_extractors import (
extract_typed_relations, extract_callout_relations, extract_wikilinks extract_typed_relations, extract_callout_relations, extract_wikilinks
@ -646,6 +647,52 @@ def build_edges_for_note(
"provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"] "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"]
})) }))
# 1b) WP-26 v1.1: Automatische Intra-Note-Edges zwischen Sektionen mit unterschiedlichen Typen
# Wenn sich der section_type zwischen aufeinanderfolgenden Chunks ändert,
# wird eine semantische Kante basierend auf graph_schema.md erstellt.
for idx, ch in enumerate(chunks):
if idx >= len(chunks) - 1:
continue # Kein nächster Chunk
cid = _get(ch, "chunk_id", "id")
next_ch = chunks[idx + 1]
next_id = _get(next_ch, "chunk_id", "id")
if not cid or not next_id:
continue
# Hole die effective_types der Chunks
# WP-26 v1.1: section_type oder note_type (effective_type)
current_section_type = ch.get("section_type")
next_section_type = next_ch.get("section_type")
current_type = current_section_type or ch.get("type") or note_type
next_type = next_section_type or next_ch.get("type") or note_type
# Prüfe, ob es einen Section-Type-Wechsel gibt
# Nur wenn beide einen expliziten section_type haben oder sich die effective_types unterscheiden
is_section_change = (
(current_section_type is not None or next_section_type is not None) and
current_type != next_type
)
if is_section_change:
# Ermittle den passenden Edge-Typ aus graph_schema.md
edge_kind = get_typical_edge_for(current_type, next_type)
logger.debug(f"WP-26 Intra-Note-Edge: {current_type} -> {next_type} = {edge_kind}")
# Erstelle die automatische Edge (Forward-Richtung)
edges.append(_edge(edge_kind, "chunk", cid, next_id, note_id, {
"chunk_id": cid,
"edge_id": _mk_edge_id(edge_kind, cid, next_id, "chunk"),
"provenance": "rule",
"rule_id": "inferred:section_transition",
"source_hint": "schema_default",
"confidence": PROVENANCE_PRIORITY.get("schema_default", 0.85),
"is_internal": True, # Explizit als Intra-Note-Edge markieren
"section_transition": f"{current_type}->{next_type}" # Debug-Info
}))
# 2) Inhaltliche Kanten (Explicit & Candidate Pool) # 2) Inhaltliche Kanten (Explicit & Candidate Pool)
reg = load_types_registry() reg = load_types_registry()
defaults = get_edge_defaults_for(note_type, reg) defaults = get_edge_defaults_for(note_type, reg)

View File

@ -33,6 +33,7 @@ PROVENANCE_PRIORITY = {
"derived:backlink": 0.90, "derived:backlink": 0.90,
"edge_defaults": 0.70, "edge_defaults": 0.70,
"schema_default": 0.85, "schema_default": 0.85,
"inferred:section_transition": 0.85, # WP-26 v1.1: Automatische Section-Übergänge
# Struktur-Kanten (provenance: "structure") # Struktur-Kanten (provenance: "structure")
"structure:belongs_to": 1.00, "structure:belongs_to": 1.00,
"structure:order": 0.95, "structure:order": 0.95,
@ -56,6 +57,7 @@ PROVENANCE_TO_DTO = {
"edge_defaults": ("rule", "edge_defaults"), "edge_defaults": ("rule", "edge_defaults"),
"schema_default": ("rule", "schema_default"), "schema_default": ("rule", "schema_default"),
"inferred:schema": ("rule", "schema_default"), "inferred:schema": ("rule", "schema_default"),
"inferred:section_transition": ("rule", "schema_default"), # WP-26 v1.1
"rule": ("rule", None), "rule": ("rule", None),
# structure # structure
"structure:belongs_to": ("structure", "belongs_to"), "structure:belongs_to": ("structure", "belongs_to"),
@ -256,3 +258,156 @@ def get_edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]:
return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] return [str(x) for x in v["edge_defaults"] if isinstance(x, str)]
return [] return []
# ---------------------------------------------------------------------------
# WP-26 v1.1: Graph-Schema Parser für automatische Edge-Typ-Ableitung
# ---------------------------------------------------------------------------
# Cache für geladenes Schema (vermeidet mehrfaches Parsen)
_GRAPH_SCHEMA_CACHE: Optional[Dict[str, Dict[str, List[str]]]] = None
def load_graph_schema() -> Dict[str, Dict[str, List[str]]]:
"""
WP-26 v1.1: Parst das graph_schema.md und extrahiert Typical Edge-Types.
Das Schema hat folgendes Format:
## Source: `experience`
| Target-Note-type | Typical Edge-Types | Prohibited Edge-Types |
| :--- | :--- | :--- |
| `event` | `caused_by` | `consists_of` |
Returns:
Dict[source_type, Dict[target_type, List[typical_edges]]]
Beispiel: {"experience": {"event": ["caused_by"], "insight": ["resulted_in"]}}
"""
global _GRAPH_SCHEMA_CACHE
if _GRAPH_SCHEMA_CACHE is not None:
return _GRAPH_SCHEMA_CACHE
import re
import logging
logger = logging.getLogger(__name__)
schema_path = get_schema_path()
# Versuche verschiedene Pfade
paths_to_try = [
schema_path,
os.path.join(os.getcwd(), "config", "graph_schema.md"),
os.path.join(os.path.dirname(__file__), "..", "..", "..", "config", "graph_schema.md"),
]
# Falls MINDNET_OBSIDIAN_DICTIONARY gesetzt ist, nutze diesen Pfad
obsidian_dict = os.getenv("MINDNET_OBSIDIAN_DICTIONARY")
if obsidian_dict:
paths_to_try.insert(0, os.path.join(obsidian_dict, "graph_schema.md"))
content = None
for path in paths_to_try:
if os.path.isfile(path):
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
logger.debug(f"Graph-Schema geladen von: {path}")
break
except Exception as e:
logger.warning(f"Fehler beim Laden von {path}: {e}")
if not content:
logger.warning("Graph-Schema nicht gefunden. Fallback auf leeres Schema.")
_GRAPH_SCHEMA_CACHE = {}
return _GRAPH_SCHEMA_CACHE
schema: Dict[str, Dict[str, List[str]]] = {}
current_source = None
# Regex für Source-Header: ## Source: `experience`
source_pattern = re.compile(r'^##\s+Source:\s*`(\w+)`', re.IGNORECASE)
# Regex für Tabellen-Zeile: | `event` | `caused_by` | `consists_of` |
# oder | `event` | `caused_by`, `resulted_in` | - |
table_row_pattern = re.compile(
r'^\|\s*`(\w+)`\s*\|\s*([^|]+)\s*\|\s*([^|]*)\s*\|'
)
for line in content.split('\n'):
stripped = line.strip()
# Prüfe auf Source-Header
source_match = source_pattern.match(stripped)
if source_match:
current_source = source_match.group(1).lower()
if current_source not in schema:
schema[current_source] = {}
continue
# Prüfe auf Tabellen-Zeile (nur wenn wir einen Source haben)
if current_source:
row_match = table_row_pattern.match(stripped)
if row_match:
target_type = row_match.group(1).lower()
typical_edges_raw = row_match.group(2).strip()
# Parse die Edge-Types (können mit Backticks und Kommas getrennt sein)
# Format: `caused_by`, `resulted_in` oder `caused_by`
edge_pattern = re.compile(r'`(\w+)`')
typical_edges = edge_pattern.findall(typical_edges_raw)
if typical_edges:
schema[current_source][target_type] = typical_edges
logger.info(f"Graph-Schema geladen: {len(schema)} Source-Types")
_GRAPH_SCHEMA_CACHE = schema
return schema
def get_typical_edge_for(source_type: str, target_type: str) -> Optional[str]:
"""
WP-26 v1.1: Ermittelt den ersten "Typical Edge-Type" für ein Typ-Paar.
Args:
source_type: Typ der Quell-Sektion (z.B. "experience")
target_type: Typ der Ziel-Sektion (z.B. "insight")
Returns:
Der erste Typical Edge-Type (z.B. "resulted_in") oder None
"""
schema = load_graph_schema()
source_lower = source_type.lower() if source_type else "default"
target_lower = target_type.lower() if target_type else "any"
# 1. Exakter Match
if source_lower in schema:
source_rules = schema[source_lower]
if target_lower in source_rules:
edges = source_rules[target_lower]
if edges:
return edges[0]
# 2. Fallback auf "any" Target
if "any" in source_rules:
edges = source_rules["any"]
if edges:
return edges[0]
# 3. Fallback auf "default" Source
if "default" in schema:
default_rules = schema["default"]
if target_lower in default_rules:
edges = default_rules[target_lower]
if edges:
return edges[0]
if "any" in default_rules:
edges = default_rules["any"]
if edges:
return edges[0]
# 4. Absoluter Fallback
return "related_to"
def clear_graph_schema_cache():
"""
WP-26 v1.1: Löscht den Cache für das Graph-Schema.
Nützlich für Tests oder wenn das Schema neu geladen werden soll.
"""
global _GRAPH_SCHEMA_CACHE
_GRAPH_SCHEMA_CACHE = None

View File

@ -1,12 +1,17 @@
""" """
FILE: tests/test_wp26_section_types.py FILE: tests/test_wp26_section_types.py
DESCRIPTION: Unit-Tests für WP-26 Phase 1: Section-Types und Intra-Note-Edges DESCRIPTION: Unit-Tests für WP-26 Phase 1: Section-Types und Intra-Note-Edges
VERSION: 1.0.0 WP-26 v1.1: Erweitert um Tests für Section-Split und automatische Edges
VERSION: 1.1.0
""" """
import pytest import pytest
from app.core.chunking.chunking_parser import parse_blocks from app.core.chunking.chunking_parser import parse_blocks
from app.core.chunking.chunking_models import RawBlock, Chunk from app.core.chunking.chunking_models import RawBlock, Chunk
from app.core.graph.graph_utils import normalize_provenance, _edge from app.core.chunking.chunking_strategies import strategy_by_heading, strategy_sliding_window
from app.core.graph.graph_utils import (
normalize_provenance, _edge, get_typical_edge_for,
load_graph_schema, clear_graph_schema_cache
)
class TestSectionTypeRecognition: class TestSectionTypeRecognition:
@ -261,5 +266,218 @@ class TestNestedEdgeCallouts:
assert len(callouts) >= 1 assert len(callouts) >= 1
# =============================================================================
# WP-26 v1.1: Tests für Section-Type-Wechsel und automatische Chunk-Splits
# =============================================================================
class TestSectionTypeSplit:
"""UT-14: Section-Type-Wechsel erzwingt neuen Chunk"""
def test_section_type_change_forces_new_chunk_by_heading(self):
"""Section-Type-Wechsel erzwingt neuen Chunk (by_heading Strategie)"""
md = """
## Situation ^sit
> [!section] experience
Die Geschichte beginnt hier.
## Reflexion ^ref
> [!section] insight
Erkenntnisse aus der Situation.
"""
blocks, _ = parse_blocks(md)
# Konfiguration: strict=False (würde normalerweise zusammenfassen)
config = {
"target": 1000, # Hoher Wert, um sicherzustellen, dass nicht wegen Token-Limit gesplittet wird
"max": 2000,
"split_level": 2,
"strict_heading_split": False, # Normalerweise würde zusammengefasst
"enable_smart_edge_allocation": True
}
chunks = strategy_by_heading(blocks, config, "test-note")
# Es sollten mindestens 2 Chunks geben (wegen Section-Type-Wechsel)
assert len(chunks) >= 2
# Prüfe, dass die Chunks unterschiedliche section_types haben
section_types = [c.section_type for c in chunks if c.section_type]
assert "experience" in section_types
assert "insight" in section_types
def test_same_section_type_follows_normal_behavior(self):
"""Gleicher Section-Type folgt normalem Chunking-Verhalten"""
md = """
## Section A
> [!section] experience
Content A about experience.
## Section B
> [!section] experience
Content B also about experience.
"""
blocks, _ = parse_blocks(md)
# Konfiguration: strict=False (würde zusammenfassen bei gleichem Typ)
config = {
"target": 1000,
"max": 2000,
"split_level": 2,
"strict_heading_split": False,
"enable_smart_edge_allocation": True
}
chunks = strategy_by_heading(blocks, config, "test-note")
# Bei gleichem section_type und strict=False könnten Chunks zusammengefasst werden
# (abhängig von Token-Limits)
# Wichtig: Alle Chunks sollten section_type "experience" haben
for c in chunks:
if c.section_type:
assert c.section_type == "experience"
def test_sliding_window_respects_section_type_change(self):
"""sliding_window Strategie respektiert Section-Type-Wechsel"""
md = """
## Part 1
> [!section] experience
Short content.
## Part 2
> [!section] insight
Another short content.
"""
blocks, _ = parse_blocks(md)
config = {
"target": 1000, # Hoher Wert
"max": 2000
}
chunks = strategy_sliding_window(blocks, config, "test-note")
# Bei Section-Type-Wechsel sollte gesplittet werden
section_types = [c.section_type for c in chunks if c.section_type]
# Beide Typen sollten in separaten Chunks sein
if len(section_types) >= 2:
assert "experience" in section_types or "insight" in section_types
class TestGraphSchemaParser:
"""UT-15: Graph-Schema Parser Tests"""
def test_get_typical_edge_experience_to_insight(self):
"""Typischer Edge von experience zu insight ist 'resulted_in'"""
# Lade Schema (falls verfügbar)
edge = get_typical_edge_for("experience", "insight")
# Sollte entweder 'resulted_in' oder Fallback 'related_to' sein
assert edge in ["resulted_in", "related_to"]
def test_get_typical_edge_insight_to_decision(self):
"""Typischer Edge von insight zu decision"""
edge = get_typical_edge_for("insight", "decision")
# Basierend auf graph_schema.md: foundation_for oder guides
assert edge in ["foundation_for", "guides", "related_to"]
def test_get_typical_edge_fallback(self):
"""Fallback auf 'related_to' für unbekannte Typen"""
edge = get_typical_edge_for("unknown_type_1", "unknown_type_2")
# Fallback sollte immer related_to sein
assert edge == "related_to"
def test_get_typical_edge_any_target(self):
"""Fallback auf 'any' Target-Regel"""
edge = get_typical_edge_for("experience", "unknown_target")
# Sollte Fallback auf "any"-Regel oder "related_to"
assert edge is not None
class TestAutomaticIntraNoteEdges:
"""UT-16: Automatische Intra-Note-Edges zwischen Sektionen"""
def test_edge_payload_has_section_transition(self):
"""Edge zwischen Sektionen enthält section_transition Metadaten"""
edge = _edge(
kind="resulted_in",
scope="chunk",
source_id="note1#c00",
target_id="note1#c01",
note_id="note1",
extra={
"provenance": "rule",
"rule_id": "inferred:section_transition",
"section_transition": "experience->insight",
"is_internal": True
}
)
assert edge["is_internal"] is True
assert edge["section_transition"] == "experience->insight"
assert edge["provenance"] == "rule"
def test_inferred_section_transition_provenance(self):
"""Provenance 'inferred:section_transition' wird korrekt normalisiert"""
prov, hint = normalize_provenance("inferred:section_transition")
assert prov == "rule"
assert hint == "schema_default"
class TestRealWorldScenario:
"""UT-17: Real-World Szenario - Krebsdiagnose Note"""
def test_krebsdiagnose_note_structure(self):
"""Testet die erwartete Struktur der Krebsdiagnose-Note"""
md = """
## 📖 Diagnose: Glioblastom ^kontext
Nach der Operation gab es ein Diagnose-Gespräch.
## 🎭 Emotions-Check ^emotionen
Ich reagierte mit Zittern am Körper.
## 💡 Lektion ^learning
> [!section] insight
Ich habe versucht die nächsten Schritte zu durchdenken.
Meine positive Einstellung hat mir geholfen.
"""
blocks, _ = parse_blocks(md)
# Konfiguration wie structured_smart_edges
config = {
"target": 400,
"max": 600,
"split_level": 2,
"strict_heading_split": False,
"enable_smart_edge_allocation": True
}
chunks = strategy_by_heading(blocks, config, "krebsdiagnose")
# Wegen Section-Type-Wechsel (None -> insight) sollte die Lektion
# ein separater Chunk sein
insight_chunks = [c for c in chunks if c.section_type == "insight"]
# Mindestens ein Chunk mit section_type "insight"
assert len(insight_chunks) >= 1
# Der insight-Chunk sollte den Lektions-Inhalt enthalten
insight_text = insight_chunks[0].text
assert "durchdenken" in insight_text.lower() or "positive" in insight_text.lower()
if __name__ == "__main__": if __name__ == "__main__":
pytest.main([__file__, "-v"]) pytest.main([__file__, "-v"])