smarter chunker initial

This commit is contained in:
Lars 2025-12-11 22:39:35 +01:00
parent c741cc7d1b
commit a1cd0741c9
3 changed files with 274 additions and 183 deletions

View File

@ -1,13 +0,0 @@
TYPE_SIZES = {
"thought": {"target": (150, 250), "max": 300, "overlap": (30, 40)},
"experience":{"target": (250, 350), "max": 450, "overlap": (40, 60)},
"journal": {"target": (200, 300), "max": 400, "overlap": (30, 50)},
"task": {"target": (120, 200), "max": 250, "overlap": (20, 30)},
"project": {"target": (300, 450), "max": 600, "overlap": (50, 70)},
"concept": {"target": (250, 400), "max": 550, "overlap": (40, 60)},
"source": {"target": (200, 350), "max": 500, "overlap": (30, 50)},
}
DEFAULT = {"target": (250, 350), "max": 500, "overlap": (40, 60)}
def get_sizes(note_type: str):
return TYPE_SIZES.get(str(note_type).lower(), DEFAULT)

View File

@ -1,41 +1,119 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from typing import List, Dict, Optional, Tuple, Any
import re
import math
import yaml
from pathlib import Path
from markdown_it import MarkdownIt
from markdown_it.token import Token
from .chunk_config import get_sizes
# ==========================================
# 1. CONFIGURATION LOADER (Ehemals chunk_config.py)
# ==========================================
# Pfad zur types.yaml bestimmen (2 Ebenen hoch von app/core/)
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = BASE_DIR / "types.yaml"
# Fallback Values
DEFAULT_PROFILE = {
"strategy": "sliding_window",
"target": 400,
"max": 600,
"overlap": (50, 80)
}
_CONFIG_CACHE = None
def _load_yaml_config() -> Dict[str, Any]:
"""Lädt die types.yaml und cached das Ergebnis."""
global _CONFIG_CACHE
if _CONFIG_CACHE is not None:
return _CONFIG_CACHE
if not CONFIG_PATH.exists():
print(f"WARNUNG: types.yaml nicht gefunden unter {CONFIG_PATH}. Nutze Defaults.")
return {}
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
_CONFIG_CACHE = data
return data
except Exception as e:
print(f"FEHLER beim Laden von types.yaml: {e}")
return {}
def get_chunk_config(note_type: str) -> Dict[str, Any]:
"""
Löst Typ -> Profil -> Konfiguration auf.
"""
full_config = _load_yaml_config()
# 1. Profile holen
profiles = full_config.get("chunking_profiles", {})
# 2. Typ-Definition holen
type_def = full_config.get("types", {}).get(note_type.lower(), {})
# 3. Profil-Namen ermitteln (Fallback auf defaults)
profile_name = type_def.get("chunking_profile")
if not profile_name:
profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
# 4. Config bauen
config = profiles.get(profile_name, DEFAULT_PROFILE).copy()
# Sicherstellen, dass Overlap ein Tuple ist
if "overlap" in config and isinstance(config["overlap"], list):
config["overlap"] = tuple(config["overlap"])
return config
# Legacy Support für alten Code
def get_sizes(note_type: str):
cfg = get_chunk_config(note_type)
return {
"target": (cfg["target"], cfg["target"]),
"max": cfg["max"],
"overlap": cfg["overlap"]
}
# ==========================================
# 2. CHUNKING LOGIC & PARSER
# ==========================================
# --- Hilfen ---
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
_WS = re.compile(r'\s+')
def estimate_tokens(text: str) -> int:
# leichte Approximation: 1 Token ≈ 4 Zeichen; robust + schnell
# 1 Token ≈ 4 chars
t = len(text.strip())
return max(1, math.ceil(t / 4))
def split_sentences(text: str) -> list[str]:
text = _WS.sub(' ', text.strip())
if not text:
return []
if not text: return []
parts = _SENT_SPLIT.split(text)
return [p.strip() for p in parts if p.strip()]
@dataclass
class RawBlock:
kind: str # "heading" | "paragraph" | "list" | "code" | "table" | "thematic_break" | "blockquote"
kind: str
text: str
level: Optional[int] # heading level (2,3,...) or None
section_path: str # e.g., "/H2 Title/H3 Subtitle"
level: Optional[int]
section_path: str
section_title: Optional[str]
@dataclass
class Chunk:
id: str
note_id: str
index: int
text: str
text: str # Reintext für Anzeige
window: str # Text + Context für Embeddings
token_count: int
section_title: Optional[str]
section_path: str
@ -44,183 +122,179 @@ class Chunk:
char_start: int
char_end: int
# --- Markdown zu RawBlocks: H2/H3 als Sections, andere Blöcke gruppiert ---
def parse_blocks(md_text: str) -> List[RawBlock]:
# --- Markdown Parser ---
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""Parst MD und gibt Blöcke UND den H1 Titel zurück."""
md = MarkdownIt("commonmark").enable("table")
tokens: List[Token] = md.parse(md_text)
blocks: List[RawBlock] = []
h1_title = "Dokument"
h2, h3 = None, None
section_path = "/"
cur_text = []
cur_kind = None
def push(kind: str, txt: str, lvl: Optional[int]):
nonlocal section_path
txt = txt.strip()
if not txt:
return
title = None
if kind == "heading" and lvl:
title = txt
blocks.append(RawBlock(kind=kind, text=txt, level=lvl, section_path=section_path))
def get_inline_content(idx, tokens):
txt = ""
while idx < len(tokens) and tokens[idx].type != "heading_close":
if tokens[idx].type == "inline":
txt += tokens[idx].content
idx += 1
return txt.strip()
i = 0
while i < len(tokens):
t = tokens[i]
if t.type == "heading_open":
lvl = int(t.tag[1])
# Sammle heading inline
i += 1
title_txt = ""
while i < len(tokens) and tokens[i].type != "heading_close":
if tokens[i].type == "inline":
title_txt += tokens[i].content
i += 1
title_txt = title_txt.strip()
# Section-Pfad aktualisieren
if lvl == 2:
title_txt = get_inline_content(i, tokens)
if lvl == 1:
h1_title = title_txt
elif lvl == 2:
h2, h3 = title_txt, None
section_path = f"/{h2}"
elif lvl == 3:
h3 = title_txt
section_path = f"/{h2}/{h3}" if h2 else f"/{h3}"
push("heading", title_txt, lvl)
blocks.append(RawBlock("heading", title_txt, lvl, section_path, title_txt))
while i < len(tokens) and tokens[i].type != "heading_close": i += 1
elif t.type in ("paragraph_open", "bullet_list_open", "ordered_list_open",
"fence", "code_block", "blockquote_open", "table_open", "hr"):
kind = {
"paragraph_open": "paragraph",
"bullet_list_open": "list",
"ordered_list_open": "list",
"fence": "code",
"code_block": "code",
"blockquote_open": "blockquote",
"table_open": "table",
"hr": "thematic_break",
}[t.type]
kind = t.type.replace("_open", "")
content = ""
if t.type in ("fence", "code_block"):
# Codeblock hat eigenen content im selben Token
content = t.content or ""
push(kind, content, None)
else:
# inline sammeln bis close
content = ""
i += 1
depth = 1
while i < len(tokens) and depth > 0:
start_level = t.level
while i < len(tokens):
tk = tokens[i]
if tk.type.endswith("_open"):
depth += 1
elif tk.type.endswith("_close"):
depth -= 1
elif tk.type == "inline":
content += tk.content
if tk.type.replace("_close", "") == kind and tk.level == start_level and tk.type.endswith("_close"):
break
if tk.type == "inline": content += tk.content
elif tk.type in ("fence", "code_block"): content += "\n" + tk.content
elif tk.type in ("softbreak", "hardbreak"): content += "\n"
i += 1
push(kind, content, None)
continue # wir sind schon auf nächstem Token
if content.strip():
current_sec_title = h3 if h3 else (h2 if h2 else None)
blocks.append(RawBlock(kind, content.strip(), None, section_path, current_sec_title))
i += 1
return blocks, h1_title
return blocks
def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
sizes = get_sizes(note_type)
target = sum(sizes["target"]) // 2 # mittlerer Zielwert
max_tokens = sizes["max"]
ov_min, ov_max = sizes["overlap"]
overlap = (ov_min + ov_max) // 2
blocks = parse_blocks(md_text)
# --- Strategien ---
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
target = config.get("target", 400)
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks: List[Chunk] = []
buf: List[Tuple[str, str, str]] = [] # (text, section_title, section_path)
char_pos = 0
buf: List[RawBlock] = []
def flush_buffer():
nonlocal buf
if not buf: return
text_body = "\n\n".join([b.text for b in buf])
sec_title = buf[-1].section_title
sec_path = buf[-1].section_path
window_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
def flush_buffer(force=False):
nonlocal buf, chunks, char_pos
if not buf:
return
text = "\n\n".join([b[0] for b in buf]).strip()
if not text:
buf = []
return
# Wenn zu groß, satzbasiert weich umbrechen
toks = estimate_tokens(text)
if toks > max_tokens:
sentences = split_sentences(text)
cur = []
cur_tokens = 0
if estimate_tokens(text_body) > max_tokens:
sentences = split_sentences(text_body)
current_sents = []
cur_toks = 0
for s in sentences:
st = estimate_tokens(s)
if cur_tokens + st > target and cur:
_emit("\n".join(cur))
# Overlap: letzte Sätze wiederverwenden
ov_text = " ".join(cur)[-overlap*4:] # 4 chars/token Heuristik
cur = [ov_text, s] if ov_text else [s]
cur_tokens = estimate_tokens(" ".join(cur))
if cur_toks + st > target and current_sents:
txt = "\n".join(current_sents)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
_add_chunk(txt, win, sec_title, sec_path)
ov_txt = " ".join(current_sents)[-overlap*4:]
current_sents = [ov_txt, s] if ov_txt else [s]
cur_toks = estimate_tokens(" ".join(current_sents))
else:
cur.append(s)
cur_tokens += st
if cur:
_emit("\n".join(cur))
current_sents.append(s)
cur_toks += st
if current_sents:
txt = "\n".join(current_sents)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
_add_chunk(txt, win, sec_title, sec_path)
else:
_emit(text)
_add_chunk(text_body, window_body, sec_title, sec_path)
buf = []
def _emit(text_block: str):
nonlocal chunks, char_pos
def _add_chunk(txt, win, sec, path):
idx = len(chunks)
chunk_id = f"{note_id}#c{idx:02d}"
token_count = estimate_tokens(text_block)
# section aus letztem buffer-entry ableiten
sec_title = buf[-1][1] if buf else None
sec_path = buf[-1][2] if buf else "/"
start = char_pos
end = start + len(text_block)
chunks.append(Chunk(
id=chunk_id,
note_id=note_id,
index=idx,
text=text_block,
token_count=token_count,
section_title=sec_title,
section_path=sec_path,
neighbors_prev=None,
neighbors_next=None,
char_start=start,
char_end=end
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
section_title=sec, section_path=path,
neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0
))
char_pos = end + 1
# Blocks in Puffer sammeln; bei Überschreiten Zielbereich flushen
cur_sec_title = None
for b in blocks:
if b.kind == "heading" and b.level in (2, 3):
# Sectionwechsel ⇒ Buffer flushen
if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target:
flush_buffer()
cur_sec_title = b.text.strip()
# Heading selbst nicht als Chunk, aber als Kontexttitel nutzen
continue
buf.append(b)
flush_buffer()
return chunks
txt = b.text.strip()
if not txt:
continue
def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str) -> List[Chunk]:
chunks: List[Chunk] = []
sections: Dict[str, List[RawBlock]] = {}
ordered = []
for b in blocks:
if b.kind == "heading": continue
if b.section_path not in sections:
sections[b.section_path] = []
ordered.append(b.section_path)
sections[b.section_path].append(b)
for path in ordered:
s_blocks = sections[path]
breadcrumbs = path.strip("/").replace("/", " > ")
context_header = f"# {doc_title}\n## {breadcrumbs}"
full_text = "\n\n".join([b.text for b in s_blocks])
if estimate_tokens(full_text) <= config.get("max", 600):
chunks.append(Chunk(
id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks),
text=full_text, window=f"{context_header}\n{full_text}",
token_count=estimate_tokens(full_text),
section_title=s_blocks[0].section_title, section_path=path,
neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0
))
else:
sub = _strategy_sliding_window(s_blocks, config, note_id, context_prefix=context_header)
base = len(chunks)
for i, sc in enumerate(sub):
sc.index = base + i
sc.id = f"{note_id}#c{sc.index:02d}"
chunks.append(sc)
return chunks
tentative = "\n\n".join([*(x[0] for x in buf), txt]).strip()
if estimate_tokens(tentative) > max(get_sizes(note_type)["target"]):
# weicher Schnitt vor Hinzufügen
flush_buffer()
buf.append((txt, cur_sec_title, b.section_path))
# --- Main Entry Point ---
# bei Erreichen ~Target flushen
if estimate_tokens("\n\n".join([x[0] for x in buf])) >= target:
flush_buffer()
flush_buffer(force=True)
# neighbors setzen
def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
config = get_chunk_config(note_type)
strategy = config.get("strategy", "sliding_window")
blocks, doc_title = parse_blocks(md_text)
if strategy == "by_heading":
chunks = _strategy_by_heading(blocks, config, note_id, doc_title)
else:
chunks = _strategy_sliding_window(blocks, config, note_id)
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks
return chunks

View File

@ -1,86 +1,116 @@
version: 1.1 # Update auf v1.1 für Mindnet v2.4
version: 1.2 # Update für Smart Chunking Config
# --- CHUNKING DEFINITIONEN ---
# Hier definieren wir die technischen Strategien zentral.
chunking_profiles:
# Standard für Fließtexte (Sliding Window)
sliding_short:
strategy: sliding_window
target: 200
max: 350
overlap: [30, 50]
sliding_standard:
strategy: sliding_window
target: 400
max: 600
overlap: [50, 80]
sliding_large:
strategy: sliding_window
target: 500
max: 800
overlap: [60, 100]
# Smart Chunking für Strukturen (Harte Splits)
structured_strict:
strategy: by_heading
split_level: 2
max: 600 # Fallback Limit
target: 400 # Fallback Target bei Sub-Chunking
overlap: [50, 80] # Overlap bei Sub-Chunking
defaults:
retriever_weight: 1.0
chunk_profile: default
chunking_profile: sliding_standard # Fallback Profil
edge_defaults: []
types:
# --- WISSENSBAUSTEINE ---
concept:
chunk_profile: medium
chunking_profile: sliding_standard
retriever_weight: 0.60
edge_defaults: ["references", "related_to"]
source:
chunk_profile: short
chunking_profile: sliding_standard
retriever_weight: 0.50
edge_defaults: [] # Quellen sind passiv
edge_defaults: []
glossary:
chunk_profile: short
chunking_profile: sliding_short
retriever_weight: 0.40
edge_defaults: ["related_to"]
# --- IDENTITÄT & PERSÖNLICHKEIT (Decision Engine Core) ---
# --- IDENTITÄT & PERSÖNLICHKEIT ---
profile:
chunk_profile: long
chunking_profile: structured_strict # H2 Split wichtig für Profile
retriever_weight: 0.70
edge_defaults: ["references", "related_to"]
value:
chunk_profile: short
retriever_weight: 1.00 # MAX: Werte stechen Fakten im Decision-Mode
chunking_profile: structured_strict
retriever_weight: 1.00
edge_defaults: ["related_to"]
principle:
chunk_profile: short
retriever_weight: 0.95 # Sehr hoch: Handlungsleitlinien
edge_defaults: ["derived_from", "references"] # Prinzipien leiten sich oft woraus ab
chunking_profile: structured_strict
retriever_weight: 0.95
edge_defaults: ["derived_from", "references"]
belief: # NEU: Glaubenssätze für Empathie-Modus
chunk_profile: short
belief:
chunking_profile: sliding_short
retriever_weight: 0.90
edge_defaults: ["related_to"]
experience:
chunk_profile: medium
chunking_profile: sliding_standard
retriever_weight: 0.90
edge_defaults: ["derived_from", "references"] # Erfahrungen haben einen Ursprung
edge_defaults: ["derived_from", "references"]
# --- STRATEGIE & ENTSCHEIDUNG ---
goal:
chunk_profile: medium
chunking_profile: sliding_standard
retriever_weight: 0.95
edge_defaults: ["depends_on", "related_to"]
decision: # ADRs (Architecture Decision Records)
chunk_profile: long # Entscheidungen brauchen oft viel Kontext (Begründung)
retriever_weight: 1.00 # MAX: Getroffene Entscheidungen sind Gesetz
edge_defaults: ["caused_by", "references"] # Entscheidungen haben Gründe
decision:
chunking_profile: structured_strict # ADRs sind oft strukturiert
retriever_weight: 1.00
edge_defaults: ["caused_by", "references"]
risk: # NEU: Risikomanagement
chunk_profile: short
risk:
chunking_profile: sliding_short
retriever_weight: 0.85
edge_defaults: ["related_to", "blocks"] # Risiken blockieren ggf. Projekte
edge_defaults: ["related_to", "blocks"]
milestone:
chunk_profile: short
chunking_profile: sliding_short
retriever_weight: 0.70
edge_defaults: ["related_to", "part_of"]
# --- OPERATIV ---
project:
chunk_profile: long
retriever_weight: 0.97 # Projekte sind der Kontext für alles
chunking_profile: sliding_large # Projekte haben viel Text
retriever_weight: 0.97
edge_defaults: ["references", "depends_on"]
task:
chunk_profile: short
chunking_profile: sliding_short
retriever_weight: 0.80
edge_defaults: ["depends_on", "part_of"]
journal:
chunk_profile: medium
chunking_profile: sliding_standard
retriever_weight: 0.80
edge_defaults: ["references", "related_to"]