diff --git a/app/core/chunk_config.py b/app/core/chunk_config.py deleted file mode 100644 index 6780b5d..0000000 --- a/app/core/chunk_config.py +++ /dev/null @@ -1,13 +0,0 @@ -TYPE_SIZES = { - "thought": {"target": (150, 250), "max": 300, "overlap": (30, 40)}, - "experience":{"target": (250, 350), "max": 450, "overlap": (40, 60)}, - "journal": {"target": (200, 300), "max": 400, "overlap": (30, 50)}, - "task": {"target": (120, 200), "max": 250, "overlap": (20, 30)}, - "project": {"target": (300, 450), "max": 600, "overlap": (50, 70)}, - "concept": {"target": (250, 400), "max": 550, "overlap": (40, 60)}, - "source": {"target": (200, 350), "max": 500, "overlap": (30, 50)}, -} -DEFAULT = {"target": (250, 350), "max": 500, "overlap": (40, 60)} - -def get_sizes(note_type: str): - return TYPE_SIZES.get(str(note_type).lower(), DEFAULT) diff --git a/app/core/chunker.py b/app/core/chunker.py index 9375a9b..260ef08 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -1,41 +1,119 @@ from __future__ import annotations from dataclasses import dataclass -from typing import List, Dict, Optional, Tuple +from typing import List, Dict, Optional, Tuple, Any import re import math +import yaml +from pathlib import Path from markdown_it import MarkdownIt from markdown_it.token import Token -from .chunk_config import get_sizes + +# ========================================== +# 1. CONFIGURATION LOADER (Ehemals chunk_config.py) +# ========================================== + +# Pfad zur types.yaml bestimmen (2 Ebenen hoch von app/core/) +BASE_DIR = Path(__file__).resolve().parent.parent.parent +CONFIG_PATH = BASE_DIR / "types.yaml" + +# Fallback Values +DEFAULT_PROFILE = { + "strategy": "sliding_window", + "target": 400, + "max": 600, + "overlap": (50, 80) +} + +_CONFIG_CACHE = None + +def _load_yaml_config() -> Dict[str, Any]: + """Lädt die types.yaml und cached das Ergebnis.""" + global _CONFIG_CACHE + if _CONFIG_CACHE is not None: + return _CONFIG_CACHE + + if not CONFIG_PATH.exists(): + print(f"WARNUNG: types.yaml nicht gefunden unter {CONFIG_PATH}. Nutze Defaults.") + return {} + + try: + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + _CONFIG_CACHE = data + return data + except Exception as e: + print(f"FEHLER beim Laden von types.yaml: {e}") + return {} + +def get_chunk_config(note_type: str) -> Dict[str, Any]: + """ + Löst Typ -> Profil -> Konfiguration auf. + """ + full_config = _load_yaml_config() + + # 1. Profile holen + profiles = full_config.get("chunking_profiles", {}) + + # 2. Typ-Definition holen + type_def = full_config.get("types", {}).get(note_type.lower(), {}) + + # 3. Profil-Namen ermitteln (Fallback auf defaults) + profile_name = type_def.get("chunking_profile") + if not profile_name: + profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard") + + # 4. Config bauen + config = profiles.get(profile_name, DEFAULT_PROFILE).copy() + + # Sicherstellen, dass Overlap ein Tuple ist + if "overlap" in config and isinstance(config["overlap"], list): + config["overlap"] = tuple(config["overlap"]) + + return config + +# Legacy Support für alten Code +def get_sizes(note_type: str): + cfg = get_chunk_config(note_type) + return { + "target": (cfg["target"], cfg["target"]), + "max": cfg["max"], + "overlap": cfg["overlap"] + } + +# ========================================== +# 2. CHUNKING LOGIC & PARSER +# ========================================== # --- Hilfen --- _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: - # leichte Approximation: 1 Token ≈ 4 Zeichen; robust + schnell + # 1 Token ≈ 4 chars t = len(text.strip()) return max(1, math.ceil(t / 4)) def split_sentences(text: str) -> list[str]: text = _WS.sub(' ', text.strip()) - if not text: - return [] + if not text: return [] parts = _SENT_SPLIT.split(text) return [p.strip() for p in parts if p.strip()] @dataclass class RawBlock: - kind: str # "heading" | "paragraph" | "list" | "code" | "table" | "thematic_break" | "blockquote" + kind: str text: str - level: Optional[int] # heading level (2,3,...) or None - section_path: str # e.g., "/H2 Title/H3 Subtitle" + level: Optional[int] + section_path: str + section_title: Optional[str] @dataclass class Chunk: id: str note_id: str index: int - text: str + text: str # Reintext für Anzeige + window: str # Text + Context für Embeddings token_count: int section_title: Optional[str] section_path: str @@ -44,183 +122,179 @@ class Chunk: char_start: int char_end: int -# --- Markdown zu RawBlocks: H2/H3 als Sections, andere Blöcke gruppiert --- -def parse_blocks(md_text: str) -> List[RawBlock]: +# --- Markdown Parser --- +def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: + """Parst MD und gibt Blöcke UND den H1 Titel zurück.""" md = MarkdownIt("commonmark").enable("table") tokens: List[Token] = md.parse(md_text) blocks: List[RawBlock] = [] + h1_title = "Dokument" h2, h3 = None, None section_path = "/" - cur_text = [] - cur_kind = None - - def push(kind: str, txt: str, lvl: Optional[int]): - nonlocal section_path - txt = txt.strip() - if not txt: - return - title = None - if kind == "heading" and lvl: - title = txt - blocks.append(RawBlock(kind=kind, text=txt, level=lvl, section_path=section_path)) + + def get_inline_content(idx, tokens): + txt = "" + while idx < len(tokens) and tokens[idx].type != "heading_close": + if tokens[idx].type == "inline": + txt += tokens[idx].content + idx += 1 + return txt.strip() i = 0 while i < len(tokens): t = tokens[i] + if t.type == "heading_open": lvl = int(t.tag[1]) - # Sammle heading inline i += 1 - title_txt = "" - while i < len(tokens) and tokens[i].type != "heading_close": - if tokens[i].type == "inline": - title_txt += tokens[i].content - i += 1 - title_txt = title_txt.strip() - # Section-Pfad aktualisieren - if lvl == 2: + title_txt = get_inline_content(i, tokens) + + if lvl == 1: + h1_title = title_txt + elif lvl == 2: h2, h3 = title_txt, None section_path = f"/{h2}" elif lvl == 3: h3 = title_txt section_path = f"/{h2}/{h3}" if h2 else f"/{h3}" - push("heading", title_txt, lvl) + + blocks.append(RawBlock("heading", title_txt, lvl, section_path, title_txt)) + while i < len(tokens) and tokens[i].type != "heading_close": i += 1 + elif t.type in ("paragraph_open", "bullet_list_open", "ordered_list_open", "fence", "code_block", "blockquote_open", "table_open", "hr"): - kind = { - "paragraph_open": "paragraph", - "bullet_list_open": "list", - "ordered_list_open": "list", - "fence": "code", - "code_block": "code", - "blockquote_open": "blockquote", - "table_open": "table", - "hr": "thematic_break", - }[t.type] - + kind = t.type.replace("_open", "") + content = "" + if t.type in ("fence", "code_block"): - # Codeblock hat eigenen content im selben Token content = t.content or "" - push(kind, content, None) else: - # inline sammeln bis close - content = "" i += 1 - depth = 1 - while i < len(tokens) and depth > 0: + start_level = t.level + while i < len(tokens): tk = tokens[i] - if tk.type.endswith("_open"): - depth += 1 - elif tk.type.endswith("_close"): - depth -= 1 - elif tk.type == "inline": - content += tk.content + if tk.type.replace("_close", "") == kind and tk.level == start_level and tk.type.endswith("_close"): + break + if tk.type == "inline": content += tk.content + elif tk.type in ("fence", "code_block"): content += "\n" + tk.content + elif tk.type in ("softbreak", "hardbreak"): content += "\n" i += 1 - push(kind, content, None) - continue # wir sind schon auf nächstem Token + + if content.strip(): + current_sec_title = h3 if h3 else (h2 if h2 else None) + blocks.append(RawBlock(kind, content.strip(), None, section_path, current_sec_title)) + i += 1 + return blocks, h1_title - return blocks - -def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: - sizes = get_sizes(note_type) - target = sum(sizes["target"]) // 2 # mittlerer Zielwert - max_tokens = sizes["max"] - ov_min, ov_max = sizes["overlap"] - overlap = (ov_min + ov_max) // 2 - - blocks = parse_blocks(md_text) +# --- Strategien --- +def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: + target = config.get("target", 400) + max_tokens = config.get("max", 600) + overlap_val = config.get("overlap", (50, 80)) + overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val + chunks: List[Chunk] = [] - buf: List[Tuple[str, str, str]] = [] # (text, section_title, section_path) - char_pos = 0 + buf: List[RawBlock] = [] + + def flush_buffer(): + nonlocal buf + if not buf: return + text_body = "\n\n".join([b.text for b in buf]) + sec_title = buf[-1].section_title + sec_path = buf[-1].section_path + window_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body - def flush_buffer(force=False): - nonlocal buf, chunks, char_pos - if not buf: - return - text = "\n\n".join([b[0] for b in buf]).strip() - if not text: - buf = [] - return - - # Wenn zu groß, satzbasiert weich umbrechen - toks = estimate_tokens(text) - if toks > max_tokens: - sentences = split_sentences(text) - cur = [] - cur_tokens = 0 + if estimate_tokens(text_body) > max_tokens: + sentences = split_sentences(text_body) + current_sents = [] + cur_toks = 0 for s in sentences: st = estimate_tokens(s) - if cur_tokens + st > target and cur: - _emit("\n".join(cur)) - # Overlap: letzte Sätze wiederverwenden - ov_text = " ".join(cur)[-overlap*4:] # 4 chars/token Heuristik - cur = [ov_text, s] if ov_text else [s] - cur_tokens = estimate_tokens(" ".join(cur)) + if cur_toks + st > target and current_sents: + txt = "\n".join(current_sents) + win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + _add_chunk(txt, win, sec_title, sec_path) + ov_txt = " ".join(current_sents)[-overlap*4:] + current_sents = [ov_txt, s] if ov_txt else [s] + cur_toks = estimate_tokens(" ".join(current_sents)) else: - cur.append(s) - cur_tokens += st - if cur: - _emit("\n".join(cur)) + current_sents.append(s) + cur_toks += st + if current_sents: + txt = "\n".join(current_sents) + win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + _add_chunk(txt, win, sec_title, sec_path) else: - _emit(text) + _add_chunk(text_body, window_body, sec_title, sec_path) buf = [] - def _emit(text_block: str): - nonlocal chunks, char_pos + def _add_chunk(txt, win, sec, path): idx = len(chunks) - chunk_id = f"{note_id}#c{idx:02d}" - token_count = estimate_tokens(text_block) - # section aus letztem buffer-entry ableiten - sec_title = buf[-1][1] if buf else None - sec_path = buf[-1][2] if buf else "/" - start = char_pos - end = start + len(text_block) chunks.append(Chunk( - id=chunk_id, - note_id=note_id, - index=idx, - text=text_block, - token_count=token_count, - section_title=sec_title, - section_path=sec_path, - neighbors_prev=None, - neighbors_next=None, - char_start=start, - char_end=end + id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, + text=txt, window=win, token_count=estimate_tokens(txt), + section_title=sec, section_path=path, + neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0 )) - char_pos = end + 1 - # Blocks in Puffer sammeln; bei Überschreiten Zielbereich flushen - cur_sec_title = None for b in blocks: - if b.kind == "heading" and b.level in (2, 3): - # Sectionwechsel ⇒ Buffer flushen + if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target: flush_buffer() - cur_sec_title = b.text.strip() - # Heading selbst nicht als Chunk, aber als Kontexttitel nutzen - continue + buf.append(b) + flush_buffer() + return chunks - txt = b.text.strip() - if not txt: - continue +def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str) -> List[Chunk]: + chunks: List[Chunk] = [] + sections: Dict[str, List[RawBlock]] = {} + ordered = [] + + for b in blocks: + if b.kind == "heading": continue + if b.section_path not in sections: + sections[b.section_path] = [] + ordered.append(b.section_path) + sections[b.section_path].append(b) + + for path in ordered: + s_blocks = sections[path] + breadcrumbs = path.strip("/").replace("/", " > ") + context_header = f"# {doc_title}\n## {breadcrumbs}" + full_text = "\n\n".join([b.text for b in s_blocks]) + + if estimate_tokens(full_text) <= config.get("max", 600): + chunks.append(Chunk( + id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), + text=full_text, window=f"{context_header}\n{full_text}", + token_count=estimate_tokens(full_text), + section_title=s_blocks[0].section_title, section_path=path, + neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0 + )) + else: + sub = _strategy_sliding_window(s_blocks, config, note_id, context_prefix=context_header) + base = len(chunks) + for i, sc in enumerate(sub): + sc.index = base + i + sc.id = f"{note_id}#c{sc.index:02d}" + chunks.append(sc) + return chunks - tentative = "\n\n".join([*(x[0] for x in buf), txt]).strip() - if estimate_tokens(tentative) > max(get_sizes(note_type)["target"]): - # weicher Schnitt vor Hinzufügen - flush_buffer() - buf.append((txt, cur_sec_title, b.section_path)) +# --- Main Entry Point --- - # bei Erreichen ~Target flushen - if estimate_tokens("\n\n".join([x[0] for x in buf])) >= target: - flush_buffer() - - flush_buffer(force=True) - - # neighbors setzen +def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: + config = get_chunk_config(note_type) + strategy = config.get("strategy", "sliding_window") + blocks, doc_title = parse_blocks(md_text) + + if strategy == "by_heading": + chunks = _strategy_by_heading(blocks, config, note_id, doc_title) + else: + chunks = _strategy_sliding_window(blocks, config, note_id) + for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None - return chunks + return chunks \ No newline at end of file diff --git a/config/types.yaml b/config/types.yaml index 5c3604b..e715ff5 100644 --- a/config/types.yaml +++ b/config/types.yaml @@ -1,86 +1,116 @@ -version: 1.1 # Update auf v1.1 für Mindnet v2.4 +version: 1.2 # Update für Smart Chunking Config + +# --- CHUNKING DEFINITIONEN --- +# Hier definieren wir die technischen Strategien zentral. +chunking_profiles: + # Standard für Fließtexte (Sliding Window) + sliding_short: + strategy: sliding_window + target: 200 + max: 350 + overlap: [30, 50] + + sliding_standard: + strategy: sliding_window + target: 400 + max: 600 + overlap: [50, 80] + + sliding_large: + strategy: sliding_window + target: 500 + max: 800 + overlap: [60, 100] + + # Smart Chunking für Strukturen (Harte Splits) + structured_strict: + strategy: by_heading + split_level: 2 + max: 600 # Fallback Limit + target: 400 # Fallback Target bei Sub-Chunking + overlap: [50, 80] # Overlap bei Sub-Chunking defaults: retriever_weight: 1.0 - chunk_profile: default + chunking_profile: sliding_standard # Fallback Profil edge_defaults: [] types: # --- WISSENSBAUSTEINE --- concept: - chunk_profile: medium + chunking_profile: sliding_standard retriever_weight: 0.60 edge_defaults: ["references", "related_to"] source: - chunk_profile: short + chunking_profile: sliding_standard retriever_weight: 0.50 - edge_defaults: [] # Quellen sind passiv + edge_defaults: [] glossary: - chunk_profile: short + chunking_profile: sliding_short retriever_weight: 0.40 edge_defaults: ["related_to"] - # --- IDENTITÄT & PERSÖNLICHKEIT (Decision Engine Core) --- + # --- IDENTITÄT & PERSÖNLICHKEIT --- profile: - chunk_profile: long + chunking_profile: structured_strict # H2 Split wichtig für Profile retriever_weight: 0.70 edge_defaults: ["references", "related_to"] value: - chunk_profile: short - retriever_weight: 1.00 # MAX: Werte stechen Fakten im Decision-Mode + chunking_profile: structured_strict + retriever_weight: 1.00 edge_defaults: ["related_to"] principle: - chunk_profile: short - retriever_weight: 0.95 # Sehr hoch: Handlungsleitlinien - edge_defaults: ["derived_from", "references"] # Prinzipien leiten sich oft woraus ab + chunking_profile: structured_strict + retriever_weight: 0.95 + edge_defaults: ["derived_from", "references"] - belief: # NEU: Glaubenssätze für Empathie-Modus - chunk_profile: short + belief: + chunking_profile: sliding_short retriever_weight: 0.90 edge_defaults: ["related_to"] experience: - chunk_profile: medium + chunking_profile: sliding_standard retriever_weight: 0.90 - edge_defaults: ["derived_from", "references"] # Erfahrungen haben einen Ursprung + edge_defaults: ["derived_from", "references"] # --- STRATEGIE & ENTSCHEIDUNG --- goal: - chunk_profile: medium + chunking_profile: sliding_standard retriever_weight: 0.95 edge_defaults: ["depends_on", "related_to"] - decision: # ADRs (Architecture Decision Records) - chunk_profile: long # Entscheidungen brauchen oft viel Kontext (Begründung) - retriever_weight: 1.00 # MAX: Getroffene Entscheidungen sind Gesetz - edge_defaults: ["caused_by", "references"] # Entscheidungen haben Gründe + decision: + chunking_profile: structured_strict # ADRs sind oft strukturiert + retriever_weight: 1.00 + edge_defaults: ["caused_by", "references"] - risk: # NEU: Risikomanagement - chunk_profile: short + risk: + chunking_profile: sliding_short retriever_weight: 0.85 - edge_defaults: ["related_to", "blocks"] # Risiken blockieren ggf. Projekte + edge_defaults: ["related_to", "blocks"] milestone: - chunk_profile: short + chunking_profile: sliding_short retriever_weight: 0.70 edge_defaults: ["related_to", "part_of"] # --- OPERATIV --- project: - chunk_profile: long - retriever_weight: 0.97 # Projekte sind der Kontext für alles + chunking_profile: sliding_large # Projekte haben viel Text + retriever_weight: 0.97 edge_defaults: ["references", "depends_on"] task: - chunk_profile: short + chunking_profile: sliding_short retriever_weight: 0.80 edge_defaults: ["depends_on", "part_of"] journal: - chunk_profile: medium + chunking_profile: sliding_standard retriever_weight: 0.80 edge_defaults: ["references", "related_to"] \ No newline at end of file