mindnet/app/core/chunker.py
2025-12-26 21:52:08 +01:00

447 lines
16 KiB
Python

"""
FILE: app/core/chunker.py
DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings).
WP-15b: Implementiert Edge-Inheritance und Candidate-Pool Vorbereitung.
Zentralisiert die Kanten-Vorbereitung für die spätere binäre Validierung.
Bietet volle Unterstützung für Hybrid-Chunking (Strict/Soft/Safety-Net).
VERSION: 3.2.0
STATUS: Active
DEPENDENCIES: re, math, yaml, pathlib, asyncio, logging
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Any, Set
import re
import math
import yaml
from pathlib import Path
import asyncio
import logging
# Services
# In WP-15b wird die KI-Validierung in die ingestion.py verlagert.
# Wir behalten den Import für Abwärtskompatibilität, falls Legacy-Skripte ihn benötigen.
try:
from app.services.semantic_analyzer import get_semantic_analyzer
except ImportError:
def get_semantic_analyzer(): return None
# Core Imports
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
# Fallback für Standalone-Betrieb oder Tests
def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return []
logger = logging.getLogger(__name__)
# ==========================================
# 1. HELPER & CONFIG
# ==========================================
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "types.yaml"
# Fallback Default, falls types.yaml fehlt
DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)}
_CONFIG_CACHE = None
def _load_yaml_config() -> Dict[str, Any]:
global _CONFIG_CACHE
if _CONFIG_CACHE is not None: return _CONFIG_CACHE
if not CONFIG_PATH.exists(): return {}
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
_CONFIG_CACHE = data
return data
except Exception: return {}
def get_chunk_config(note_type: str) -> Dict[str, Any]:
"""
Lädt die Chunking-Strategie basierend auf dem Note-Type aus types.yaml.
Sichert die Kompatibilität zu WP-15 Profilen.
"""
full_config = _load_yaml_config()
profiles = full_config.get("chunking_profiles", {})
type_def = full_config.get("types", {}).get(note_type.lower(), {})
# Welches Profil nutzt dieser Typ? (z.B. 'sliding_smart_edges')
profile_name = type_def.get("chunking_profile")
if not profile_name:
profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
config = profiles.get(profile_name, DEFAULT_PROFILE).copy()
# Tupel-Konvertierung für Overlap (YAML liest oft Listen)
if "overlap" in config and isinstance(config["overlap"], list):
config["overlap"] = tuple(config["overlap"])
return config
def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]:
"""Trennt YAML-Frontmatter vom eigentlichen Text."""
fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL)
if not fm_match: return {}, md_text
try:
frontmatter = yaml.safe_load(fm_match.group(1))
if not isinstance(frontmatter, dict): frontmatter = {}
except yaml.YAMLError:
frontmatter = {}
text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL)
return frontmatter, text_without_fm.strip()
# ==========================================
# 2. DATA CLASSES & TEXT TOOLS
# ==========================================
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
_WS = re.compile(r'\s+')
def estimate_tokens(text: str) -> int:
"""Grobe Schätzung der Token-Anzahl (4 Zeichen pro Token)."""
return max(1, math.ceil(len(text.strip()) / 4))
def split_sentences(text: str) -> list[str]:
"""Teilt Text in Sätze auf unter Berücksichtigung von Interpunktion."""
text = _WS.sub(' ', text.strip())
if not text: return []
parts = _SENT_SPLIT.split(text)
return [p.strip() for p in parts if p.strip()]
@dataclass
class RawBlock:
kind: str
text: str
level: Optional[int]
section_path: str
section_title: Optional[str]
@dataclass
class Chunk:
id: str
note_id: str
index: int
text: str
window: str
token_count: int
section_title: Optional[str]
section_path: str
neighbors_prev: Optional[str]
neighbors_next: Optional[str]
# WP-15b: Liste von Kandidaten für die semantische Validierung
candidate_pool: List[Dict[str, Any]] = field(default_factory=list)
suggested_edges: Optional[List[str]] = None
# ==========================================
# 3. PARSING & STRATEGIES
# ==========================================
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""
Zerlegt Text in logische Blöcke (Absätze, Header).
Wichtig für die Strategie 'by_heading' und die Edge-Inheritance.
"""
blocks = []
h1_title = "Dokument"
section_path = "/"
current_h2 = None
fm, text_without_fm = extract_frontmatter_from_text(md_text)
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match:
h1_title = h1_match.group(1).strip()
lines = text_without_fm.split('\n')
buffer = []
for line in lines:
stripped = line.strip()
if stripped.startswith('# '):
continue
elif stripped.startswith('## '):
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_h2))
buffer = []
current_h2 = stripped[3:].strip()
section_path = f"/{current_h2}"
blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2))
elif not stripped:
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_h2))
buffer = []
else:
buffer.append(line)
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_h2))
return blocks, h1_title
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]:
"""
Standard-Strategie aus WP-15.
Fasst Blöcke zusammen und schneidet bei 'target' Tokens.
"""
target = config.get("target", 400)
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks = []
buf = []
def _create_chunk(txt, win, sec, path):
idx = len(chunks)
chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None,
candidate_pool=[]
))
def flush_buffer():
nonlocal buf
if not buf: return
text_body = "\n\n".join([b.text for b in buf])
sec_title = buf[-1].section_title if buf else None
sec_path = buf[-1].section_path if buf else "/"
win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
if estimate_tokens(text_body) <= max_tokens:
_create_chunk(text_body, win_body, sec_title, sec_path)
else:
sentences = split_sentences(text_body)
current_chunk_sents = []
current_len = 0
for sent in sentences:
sent_len = estimate_tokens(sent)
if current_len + sent_len > target and current_chunk_sents:
c_txt = " ".join(current_chunk_sents)
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
_create_chunk(c_txt, c_win, sec_title, sec_path)
overlap_sents = []
ov_len = 0
for s in reversed(current_chunk_sents):
if ov_len + estimate_tokens(s) < overlap:
overlap_sents.insert(0, s)
ov_len += estimate_tokens(s)
else: break
current_chunk_sents = list(overlap_sents)
current_chunk_sents.append(sent)
current_len = ov_len + sent_len
else:
current_chunk_sents.append(sent)
current_len += sent_len
if current_chunk_sents:
c_txt = " ".join(current_chunk_sents)
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
_create_chunk(c_txt, c_win, sec_title, sec_path)
buf = []
for b in blocks:
if b.kind == "heading": continue
current_buf_text = "\n\n".join([x.text for x in buf])
if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target:
flush_buffer()
buf.append(b)
if estimate_tokens(b.text) >= target:
flush_buffer()
flush_buffer()
return chunks
def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
"""
Hybrid-Strategie v2.9 (Strict/Soft/Safety-Net).
"""
strict = config.get("strict_heading_split", False)
target = config.get("target", 400)
max_tokens = config.get("max", 600)
split_level = config.get("split_level", 2)
chunks = []
current_buf = []
current_tokens = 0
def _flush(sec_title, sec_path):
nonlocal current_buf, current_tokens
if not current_buf: return
txt = "\n\n".join(current_buf)
win = f"# {doc_title}\n## {sec_title}\n{txt}".strip() if sec_title else txt
idx = len(chunks)
chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
section_title=sec_title, section_path=sec_path,
neighbors_prev=None, neighbors_next=None,
candidate_pool=[]
))
current_buf = []
current_tokens = 0
for b in blocks:
if b.kind == "heading":
# Hierarchie-Check: Split bei Überschriften oberhalb des Split-Levels
if b.level < split_level:
_flush(b.section_title, b.section_path)
elif b.level == split_level:
if strict or current_tokens >= target:
_flush(b.section_title, b.section_path)
continue
block_tokens = estimate_tokens(b.text)
if current_tokens + block_tokens > max_tokens and current_buf:
_flush(b.section_title, b.section_path)
current_buf.append(b.text)
current_tokens += block_tokens
if current_buf:
last = blocks[-1] if blocks else None
_flush(last.section_title if last else None, last.section_path if last else "/")
return chunks
# ==========================================
# 4. ROBUST EDGE PARSING & PROPAGATION
# ==========================================
def _parse_edges_robust(text: str) -> Set[str]:
"""
Findet Kanten im Text (Wikilinks, Inlines, Callouts).
Fix V3: Support für mehrzeilige Callouts.
"""
found_edges = set()
# A. Inline [[rel:type|target]]
inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text)
for kind, target in inlines:
k = kind.strip().lower()
t = target.strip()
if k and t: found_edges.add(f"{k}:{t}")
# B. Multiline Callouts Parsing (WP-15 Fix)
lines = text.split('\n')
current_edge_type = None
for line in lines:
stripped = line.strip()
callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped)
if callout_match:
current_edge_type = callout_match.group(1).strip().lower()
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
for l in links:
if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}")
continue
if current_edge_type and stripped.startswith('>'):
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
for l in links:
if "rel:" not in l: found_edges.add(f"{current_edge_type}:{l}")
elif not stripped.startswith('>'):
current_edge_type = None
return found_edges
def _propagate_section_edges(chunks: List[Chunk], blocks: List[RawBlock]) -> List[Chunk]:
"""
WP-15b: Implementiert Edge-Inheritance.
Kanten aus Überschriften werden an untergeordnete Chunks vererbt.
"""
section_inheritance: Dict[str, Set[str]] = {}
# 1. Sammeln aus den Heading-Blöcken
for b in blocks:
if b.kind == "heading":
edges = _parse_edges_robust(b.text)
if edges:
if b.section_path not in section_inheritance:
section_inheritance[b.section_path] = set()
section_inheritance[b.section_path].update(edges)
# 2. Injektion in den Candidate-Pool
for ch in chunks:
inherited = section_inheritance.get(ch.section_path, set())
for e_str in inherited:
kind, target = e_str.split(':', 1)
ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "inherited"})
return chunks
# ==========================================
# 5. ORCHESTRATION (WP-15b)
# ==========================================
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
"""
Hauptfunktion zur Chunk-Generierung.
Baut den Candidate-Pool für die semantische Validierung auf.
"""
if config is None:
config = get_chunk_config(note_type)
fm, body_text = extract_frontmatter_from_text(md_text)
primary_strategy = config.get("strategy", "sliding_window")
# 1. Parsing & Splitting
blocks, doc_title = parse_blocks(md_text)
if primary_strategy == "by_heading":
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
else:
chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title)
if not chunks: return []
# 2. WP-15b: Candidate Pool Vorbereitung
# A. Edge Inheritance (Sektions-Propagation)
chunks = _propagate_section_edges(chunks, blocks)
# B. Explicit Edges (Direkt im Chunk-Text enthalten)
for ch in chunks:
explicit = _parse_edges_robust(ch.text)
for e_str in explicit:
kind, target = e_str.split(':', 1)
ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "explicit"})
# C. Global "Unassigned Pool" Detection (Safety Net)
# Sucht nach einer Sektion "Unzugeordnete Kanten" im Body
unassigned_pool = set()
pool_match = re.search(r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)', body_text, re.DOTALL | re.IGNORECASE)
if pool_match:
unassigned_pool = _parse_edges_robust(pool_match.group(1))
for ch in chunks:
for e_str in unassigned_pool:
kind, target = e_str.split(':', 1)
ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "global_pool"})
# D. De-Duplikation des Pools
for ch in chunks:
seen = set()
unique_pool = []
for cand in ch.candidate_pool:
key = (cand["kind"], cand["to"])
if key not in seen:
seen.add(key)
unique_pool.append(cand)
ch.candidate_pool = unique_pool
# 3. Nachbarschafts-Verkettung (Struktur-Kanten)
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks