mindnet/app/core/chunker.py
Lars 2d43e0596c
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
Test der neuen mehrzeiligen Edge-Parser
2025-12-22 06:08:48 +01:00

474 lines
17 KiB
Python

"""
FILE: app/core/chunker.py
DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings).
Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer.
FIX V3: Support für mehrzeilige Callouts und Section-Propagation.
VERSION: 3.1.0 (Full Compatibility Merge)
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any, Set
import re
import math
import yaml
from pathlib import Path
import asyncio
import logging
# Services
from app.services.semantic_analyzer import get_semantic_analyzer
# Core Imports
# Wir importieren build_edges_for_note nur, um kompatibel zur Signatur zu bleiben
# oder für den Fallback.
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
# Mock für Tests
def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return []
logger = logging.getLogger(__name__)
# ==========================================
# 1. HELPER & CONFIG
# ==========================================
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "types.yaml"
# Fallback Default, falls types.yaml fehlt
DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)}
_CONFIG_CACHE = None
def _load_yaml_config() -> Dict[str, Any]:
global _CONFIG_CACHE
if _CONFIG_CACHE is not None: return _CONFIG_CACHE
if not CONFIG_PATH.exists(): return {}
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
_CONFIG_CACHE = data
return data
except Exception: return {}
def get_chunk_config(note_type: str) -> Dict[str, Any]:
"""
Lädt die Chunking-Strategie basierend auf dem Note-Type aus types.yaml.
Dies sichert die Kompatibilität zu WP-15 (Profile).
"""
full_config = _load_yaml_config()
profiles = full_config.get("chunking_profiles", {})
type_def = full_config.get("types", {}).get(note_type.lower(), {})
# Welches Profil nutzt dieser Typ? (z.B. 'sliding_smart_edges')
profile_name = type_def.get("chunking_profile")
if not profile_name:
profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
config = profiles.get(profile_name, DEFAULT_PROFILE).copy()
# Tupel-Konvertierung für Overlap (YAML liest oft Listen)
if "overlap" in config and isinstance(config["overlap"], list):
config["overlap"] = tuple(config["overlap"])
return config
def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]:
fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL)
if not fm_match: return {}, md_text
try:
frontmatter = yaml.safe_load(fm_match.group(1))
if not isinstance(frontmatter, dict): frontmatter = {}
except yaml.YAMLError:
frontmatter = {}
text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL)
return frontmatter, text_without_fm.strip()
# ==========================================
# 2. DATA CLASSES & TEXT TOOLS
# ==========================================
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+')
def estimate_tokens(text: str) -> int:
return max(1, math.ceil(len(text.strip()) / 4))
def split_sentences(text: str) -> list[str]:
text = _WS.sub(' ', text.strip())
if not text: return []
parts = _SENT_SPLIT.split(text)
return [p.strip() for p in parts if p.strip()]
@dataclass
class RawBlock:
kind: str; text: str; level: Optional[int]; section_path: str; section_title: Optional[str]
@dataclass
class Chunk:
id: str; note_id: str; index: int; text: str; window: str; token_count: int
section_title: Optional[str]; section_path: str
neighbors_prev: Optional[str]; neighbors_next: Optional[str]
suggested_edges: Optional[List[str]] = None
# ==========================================
# 3. PARSING & STRATEGIES
# ==========================================
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""
Zerlegt Text in logische Blöcke (Absätze, Header).
Wichtig für die Strategie 'by_heading'.
"""
blocks = []
h1_title = "Dokument"
section_path = "/"
current_h2 = None
fm, text_without_fm = extract_frontmatter_from_text(md_text)
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match:
h1_title = h1_match.group(1).strip()
lines = text_without_fm.split('\n')
buffer = []
for line in lines:
stripped = line.strip()
if stripped.startswith('# '):
continue
elif stripped.startswith('## '):
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_h2))
buffer = []
current_h2 = stripped[3:].strip()
section_path = f"/{current_h2}"
blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2))
elif not stripped:
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_h2))
buffer = []
else:
buffer.append(line)
if buffer:
content = "\n".join(buffer).strip()
if content:
blocks.append(RawBlock("paragraph", content, None, section_path, current_h2))
return blocks, h1_title
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]:
"""
Die Standard-Strategie aus WP-15.
Fasst Blöcke zusammen und schneidet bei 'target' Tokens (mit Satz-Rücksicht).
"""
target = config.get("target", 400)
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks = []; buf = []
def _create_chunk(txt, win, sec, path):
idx = len(chunks)
chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None,
suggested_edges=[]
))
def flush_buffer():
nonlocal buf
if not buf: return
text_body = "\n\n".join([b.text for b in buf])
sec_title = buf[-1].section_title if buf else None
sec_path = buf[-1].section_path if buf else "/"
# Context Prefix (z.B. H1) voranstellen für Embedding-Qualität
win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
if estimate_tokens(text_body) <= max_tokens:
_create_chunk(text_body, win_body, sec_title, sec_path)
else:
# Zu groß -> Satzweiser Split
sentences = split_sentences(text_body)
current_chunk_sents = []
current_len = 0
for sent in sentences:
sent_len = estimate_tokens(sent)
if current_len + sent_len > target and current_chunk_sents:
c_txt = " ".join(current_chunk_sents)
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
_create_chunk(c_txt, c_win, sec_title, sec_path)
# Overlap für nächsten Chunk
overlap_sents = []
ov_len = 0
for s in reversed(current_chunk_sents):
if ov_len + estimate_tokens(s) < overlap:
overlap_sents.insert(0, s)
ov_len += estimate_tokens(s)
else:
break
current_chunk_sents = list(overlap_sents)
current_chunk_sents.append(sent)
current_len = ov_len + sent_len
else:
current_chunk_sents.append(sent)
current_len += sent_len
# Rest
if current_chunk_sents:
c_txt = " ".join(current_chunk_sents)
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
_create_chunk(c_txt, c_win, sec_title, sec_path)
buf = []
for b in blocks:
if b.kind == "heading": continue
current_buf_text = "\n\n".join([x.text for x in buf])
if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target:
flush_buffer()
buf.append(b)
if estimate_tokens(b.text) >= target:
flush_buffer()
flush_buffer()
return chunks
def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
"""
Strategie für strukturierte Daten (Profile, Werte).
Nutzt sliding_window, forciert aber Schnitte an Headings (via parse_blocks Vorarbeit).
"""
return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}")
# ==========================================
# 4. ROBUST EDGE PARSING & PROPAGATION (NEU)
# ==========================================
def _parse_edges_robust(text: str) -> Set[str]:
"""
NEU: Findet Kanten im Text, auch wenn sie mehrzeilig oder 'kaputt' formatiert sind.
Erkennt:
> [!edge] type
> [[Link]]
Returns: Set von Strings "kind:target"
"""
found_edges = set()
# A. Inline [[rel:type|target]] (Standard)
inlines = re.findall(r'\[\[rel:([^\|\]]+)\|?([^\]]*)\]\]', text)
for kind, target in inlines:
k = kind.strip()
t = target.strip()
if k and t: found_edges.add(f"{k}:{t}")
# B. Multiline Callouts Parsing (Der Fix für dein Problem)
lines = text.split('\n')
current_edge_type = None
for line in lines:
stripped = line.strip()
# 1. Start Blockquote: > [!edge] type
# (Erlaubt optionalen Doppelpunkt)
callout_match = re.match(r'>\s*\[!edge\]\s*([^:\s]+)', stripped)
if callout_match:
current_edge_type = callout_match.group(1).strip()
# Check: Sind Links noch in der GLEICHEN Zeile?
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
for l in links:
if "rel:" not in l:
found_edges.add(f"{current_edge_type}:{l}")
continue
# 2. Continuation Line: > [[Target]]
# Wenn wir noch im 'edge mode' sind und die Zeile ein Zitat ist
if current_edge_type and stripped.startswith('>'):
links = re.findall(r'\[\[([^\]]+)\]\]', stripped)
for l in links:
if "rel:" not in l:
found_edges.add(f"{current_edge_type}:{l}")
# 3. End of Blockquote (kein '>') -> Reset Type
elif not stripped.startswith('>'):
current_edge_type = None
return found_edges
def _propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]:
"""
NEU: Verteilt Kanten innerhalb einer Sektion.
Löst das Problem: Callout steht oben im Kapitel, gilt aber für alle Chunks darunter.
"""
# Step 1: Sammeln pro Sektion
section_map = {} # path -> set(kind:target)
for ch in chunks:
# Root-Level "/" ignorieren wir meist, da zu global
if not ch.section_path or ch.section_path == "/": continue
edges = _parse_edges_robust(ch.text)
if edges:
if ch.section_path not in section_map:
section_map[ch.section_path] = set()
section_map[ch.section_path].update(edges)
# Step 2: Injizieren (Broadcasting)
for ch in chunks:
if ch.section_path in section_map:
edges_to_add = section_map[ch.section_path]
if not edges_to_add: continue
injections = []
for e_str in edges_to_add:
kind, target = e_str.split(':', 1)
# Check: Kante schon im Text?
token = f"[[rel:{kind}|{target}]]"
if token not in ch.text:
injections.append(token)
if injections:
# Wir schreiben die Kanten "hart" in den Text.
# Damit findet sie derive_edges.py später garantiert.
block = "\n\n\n" + " ".join(injections)
ch.text += block
# Auch ins Window schreiben für Embedding-Kontext
ch.window += block
return chunks
# ==========================================
# 5. ORCHESTRATION (ASYNC)
# ==========================================
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
"""
Hauptfunktion. Verbindet Parsing, Splitting und Edge-Allocation.
"""
# 1. Config laden (WP-15 Kompatibilität)
if config is None:
config = get_chunk_config(note_type)
fm, body_text = extract_frontmatter_from_text(md_text)
note_status = fm.get("status", "").lower()
primary_strategy = config.get("strategy", "sliding_window")
enable_smart_edges = config.get("enable_smart_edge_allocation", False)
# Drafts skippen LLM um Kosten/Zeit zu sparen
if enable_smart_edges and note_status in ["draft", "initial_gen"]:
logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.")
enable_smart_edges = False
# 2. Parsing & Splitting
blocks, doc_title = parse_blocks(md_text)
if primary_strategy == "by_heading":
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
else:
chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title)
if not chunks:
return []
# 3. NEU: Propagation VOR Smart Edge Allocation
# Das repariert die fehlenden Kanten aus deinen Callouts.
chunks = _propagate_section_edges(chunks)
# 4. Smart Edges (LLM)
if enable_smart_edges:
chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type)
# 5. Linking
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks
def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> List[str]:
"""
Hilfsfunktion: Sammelt ALLE Kanten für den LLM-Kandidaten-Pool.
"""
# A. Via derive_edges (Standard)
dummy_chunk = {
"chunk_id": f"{note_id}#full",
"text": md_text,
"content": md_text,
"window": md_text,
"type": note_type
}
# Signatur-Anpassung beachten (WP-15 Fix)
raw_edges = build_edges_for_note(
note_id,
[dummy_chunk],
note_level_references=None,
include_note_scope_refs=False
)
all_candidates = set()
for e in raw_edges:
kind = e.get("kind")
target = e.get("target_id")
if target and kind not in ["belongs_to", "next", "prev", "backlink"]:
all_candidates.add(f"{kind}:{target}")
# B. Via Robust Parser (NEU) - fängt die multiline Callouts
robust_edges = _parse_edges_robust(md_text)
all_candidates.update(robust_edges)
return list(all_candidates)
async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]:
"""
Der LLM-Schritt (WP-15). Filtert irrelevante Kanten.
"""
analyzer = get_semantic_analyzer()
candidate_list = _extract_all_edges_from_md(full_text, note_id, note_type)
if not candidate_list:
return chunks
tasks = []
for chunk in chunks:
tasks.append(analyzer.assign_edges_to_chunk(chunk.text, candidate_list, note_type))
results_per_chunk = await asyncio.gather(*tasks)
assigned_edges_global = set()
for i, confirmed_edges in enumerate(results_per_chunk):
chunk = chunks[i]
chunk.suggested_edges = confirmed_edges
assigned_edges_global.update(confirmed_edges)
if confirmed_edges:
# Wir schreiben auch Smart Edges hart in den Text
injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e])
chunk.text += injection_str
chunk.window += injection_str
# Fallback für Kanten, die das LLM nirgendwo zugeordnet hat
# (Damit nichts verloren geht -> Safety Fallback)
unassigned = set(candidate_list) - assigned_edges_global
if unassigned:
fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e])
for chunk in chunks:
chunk.text += fallback_str
chunk.window += fallback_str
if chunk.suggested_edges is None: chunk.suggested_edges = []
chunk.suggested_edges.extend(list(unassigned))
return chunks