mindnet/app/core/chunker.py

305 lines
10 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any
import re
import math
import yaml
from pathlib import Path
from markdown_it import MarkdownIt
from markdown_it.token import Token
# ==========================================
# 1. CONFIGURATION LOADER (Updated for config/ dir)
# ==========================================
# Pfad-Logik:
# Wir gehen 3 Ebenen hoch: app/core/chunker.py -> app/core -> app -> root
BASE_DIR = Path(__file__).resolve().parent.parent.parent
# KORREKTUR: types.yaml liegt im Unterordner "config"
CONFIG_PATH = BASE_DIR / "config" / "types.yaml"
# Fallback Values
DEFAULT_PROFILE = {
"strategy": "sliding_window",
"target": 400,
"max": 600,
"overlap": (50, 80)
}
_CONFIG_CACHE = None
def _load_yaml_config() -> Dict[str, Any]:
"""Lädt die config/types.yaml und cached das Ergebnis."""
global _CONFIG_CACHE
if _CONFIG_CACHE is not None:
return _CONFIG_CACHE
if not CONFIG_PATH.exists():
# Debugging-Hilfe: Zeigt an, wo gesucht wurde
print(f"WARNUNG: types.yaml nicht gefunden unter: {CONFIG_PATH}")
print(f" (Basis-Verzeichnis war: {BASE_DIR})")
return {}
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
_CONFIG_CACHE = data
return data
except Exception as e:
print(f"FEHLER beim Laden von {CONFIG_PATH}: {e}")
return {}
def get_chunk_config(note_type: str) -> Dict[str, Any]:
"""
Löst Typ -> Profil -> Konfiguration auf.
"""
full_config = _load_yaml_config()
# 1. Profile holen
profiles = full_config.get("chunking_profiles", {})
# 2. Typ-Definition holen
type_def = full_config.get("types", {}).get(note_type.lower(), {})
# 3. Profil-Namen ermitteln (Fallback auf defaults)
profile_name = type_def.get("chunking_profile")
if not profile_name:
profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
# 4. Config bauen
config = profiles.get(profile_name, DEFAULT_PROFILE).copy()
# Sicherstellen, dass Overlap ein Tuple ist
if "overlap" in config and isinstance(config["overlap"], list):
config["overlap"] = tuple(config["overlap"])
return config
# Legacy Support für alten Code
def get_sizes(note_type: str):
cfg = get_chunk_config(note_type)
return {
"target": (cfg["target"], cfg["target"]),
"max": cfg["max"],
"overlap": cfg["overlap"]
}
# ==========================================
# 2. CHUNKING LOGIC & PARSER
# ==========================================
# --- Hilfen ---
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
_WS = re.compile(r'\s+')
def estimate_tokens(text: str) -> int:
# 1 Token ≈ 4 chars
t = len(text.strip())
return max(1, math.ceil(t / 4))
def split_sentences(text: str) -> list[str]:
text = _WS.sub(' ', text.strip())
if not text: return []
parts = _SENT_SPLIT.split(text)
return [p.strip() for p in parts if p.strip()]
@dataclass
class RawBlock:
kind: str
text: str
level: Optional[int]
section_path: str
section_title: Optional[str]
@dataclass
class Chunk:
id: str
note_id: str
index: int
text: str # Reintext für Anzeige
window: str # Text + Context für Embeddings
token_count: int
section_title: Optional[str]
section_path: str
neighbors_prev: Optional[str]
neighbors_next: Optional[str]
char_start: int
char_end: int
# --- Markdown Parser ---
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""Parst MD und gibt Blöcke UND den H1 Titel zurück."""
md = MarkdownIt("commonmark").enable("table")
tokens: List[Token] = md.parse(md_text)
blocks: List[RawBlock] = []
h1_title = "Dokument"
h2, h3 = None, None
section_path = "/"
def get_inline_content(idx, tokens):
txt = ""
while idx < len(tokens) and tokens[idx].type != "heading_close":
if tokens[idx].type == "inline":
txt += tokens[idx].content
idx += 1
return txt.strip()
i = 0
while i < len(tokens):
t = tokens[i]
if t.type == "heading_open":
lvl = int(t.tag[1])
i += 1
title_txt = get_inline_content(i, tokens)
if lvl == 1:
h1_title = title_txt
elif lvl == 2:
h2, h3 = title_txt, None
section_path = f"/{h2}"
elif lvl == 3:
h3 = title_txt
section_path = f"/{h2}/{h3}" if h2 else f"/{h3}"
blocks.append(RawBlock("heading", title_txt, lvl, section_path, title_txt))
while i < len(tokens) and tokens[i].type != "heading_close": i += 1
elif t.type in ("paragraph_open", "bullet_list_open", "ordered_list_open",
"fence", "code_block", "blockquote_open", "table_open", "hr"):
kind = t.type.replace("_open", "")
content = ""
if t.type in ("fence", "code_block"):
content = t.content or ""
else:
i += 1
start_level = t.level
while i < len(tokens):
tk = tokens[i]
if tk.type.replace("_close", "") == kind and tk.level == start_level and tk.type.endswith("_close"):
break
if tk.type == "inline": content += tk.content
elif tk.type in ("fence", "code_block"): content += "\n" + tk.content
elif tk.type in ("softbreak", "hardbreak"): content += "\n"
i += 1
if content.strip():
current_sec_title = h3 if h3 else (h2 if h2 else None)
blocks.append(RawBlock(kind, content.strip(), None, section_path, current_sec_title))
i += 1
return blocks, h1_title
# --- Strategien ---
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
target = config.get("target", 400)
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks: List[Chunk] = []
buf: List[RawBlock] = []
def flush_buffer():
nonlocal buf
if not buf: return
text_body = "\n\n".join([b.text for b in buf])
sec_title = buf[-1].section_title
sec_path = buf[-1].section_path
window_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
if estimate_tokens(text_body) > max_tokens:
sentences = split_sentences(text_body)
current_sents = []
cur_toks = 0
for s in sentences:
st = estimate_tokens(s)
if cur_toks + st > target and current_sents:
txt = "\n".join(current_sents)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
_add_chunk(txt, win, sec_title, sec_path)
ov_txt = " ".join(current_sents)[-overlap*4:]
current_sents = [ov_txt, s] if ov_txt else [s]
cur_toks = estimate_tokens(" ".join(current_sents))
else:
current_sents.append(s)
cur_toks += st
if current_sents:
txt = "\n".join(current_sents)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
_add_chunk(txt, win, sec_title, sec_path)
else:
_add_chunk(text_body, window_body, sec_title, sec_path)
buf = []
def _add_chunk(txt, win, sec, path):
idx = len(chunks)
chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
section_title=sec, section_path=path,
neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0
))
for b in blocks:
if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target:
flush_buffer()
buf.append(b)
flush_buffer()
return chunks
def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str) -> List[Chunk]:
chunks: List[Chunk] = []
sections: Dict[str, List[RawBlock]] = {}
ordered = []
for b in blocks:
if b.kind == "heading": continue
if b.section_path not in sections:
sections[b.section_path] = []
ordered.append(b.section_path)
sections[b.section_path].append(b)
for path in ordered:
s_blocks = sections[path]
breadcrumbs = path.strip("/").replace("/", " > ")
context_header = f"# {doc_title}\n## {breadcrumbs}"
full_text = "\n\n".join([b.text for b in s_blocks])
if estimate_tokens(full_text) <= config.get("max", 600):
chunks.append(Chunk(
id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks),
text=full_text, window=f"{context_header}\n{full_text}",
token_count=estimate_tokens(full_text),
section_title=s_blocks[0].section_title, section_path=path,
neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0
))
else:
sub = _strategy_sliding_window(s_blocks, config, note_id, context_prefix=context_header)
base = len(chunks)
for i, sc in enumerate(sub):
sc.index = base + i
sc.id = f"{note_id}#c{sc.index:02d}"
chunks.append(sc)
return chunks
# --- Main Entry Point ---
def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
config = get_chunk_config(note_type)
strategy = config.get("strategy", "sliding_window")
blocks, doc_title = parse_blocks(md_text)
if strategy == "by_heading":
chunks = _strategy_by_heading(blocks, config, note_id, doc_title)
else:
chunks = _strategy_sliding_window(blocks, config, note_id)
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks