From edcb36958d6ea13265707631f10e1448ec176e7e Mon Sep 17 00:00:00 2001 From: Lars Date: Wed, 3 Sep 2025 07:15:00 +0200 Subject: [PATCH] =?UTF-8?q?app/core/chunker.py=20hinzugef=C3=BCgt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunker.py | 226 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 app/core/chunker.py diff --git a/app/core/chunker.py b/app/core/chunker.py new file mode 100644 index 0000000..9375a9b --- /dev/null +++ b/app/core/chunker.py @@ -0,0 +1,226 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import List, Dict, Optional, Tuple +import re +import math +from markdown_it import MarkdownIt +from markdown_it.token import Token +from .chunk_config import get_sizes + +# --- Hilfen --- +_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') +_WS = re.compile(r'\s+') + +def estimate_tokens(text: str) -> int: + # leichte Approximation: 1 Token ≈ 4 Zeichen; robust + schnell + t = len(text.strip()) + return max(1, math.ceil(t / 4)) + +def split_sentences(text: str) -> list[str]: + text = _WS.sub(' ', text.strip()) + if not text: + return [] + parts = _SENT_SPLIT.split(text) + return [p.strip() for p in parts if p.strip()] + +@dataclass +class RawBlock: + kind: str # "heading" | "paragraph" | "list" | "code" | "table" | "thematic_break" | "blockquote" + text: str + level: Optional[int] # heading level (2,3,...) or None + section_path: str # e.g., "/H2 Title/H3 Subtitle" + +@dataclass +class Chunk: + id: str + note_id: str + index: int + text: str + token_count: int + section_title: Optional[str] + section_path: str + neighbors_prev: Optional[str] + neighbors_next: Optional[str] + char_start: int + char_end: int + +# --- Markdown zu RawBlocks: H2/H3 als Sections, andere Blöcke gruppiert --- +def parse_blocks(md_text: str) -> List[RawBlock]: + md = MarkdownIt("commonmark").enable("table") + tokens: List[Token] = md.parse(md_text) + + blocks: List[RawBlock] = [] + h2, h3 = None, None + section_path = "/" + cur_text = [] + cur_kind = None + + def push(kind: str, txt: str, lvl: Optional[int]): + nonlocal section_path + txt = txt.strip() + if not txt: + return + title = None + if kind == "heading" and lvl: + title = txt + blocks.append(RawBlock(kind=kind, text=txt, level=lvl, section_path=section_path)) + + i = 0 + while i < len(tokens): + t = tokens[i] + if t.type == "heading_open": + lvl = int(t.tag[1]) + # Sammle heading inline + i += 1 + title_txt = "" + while i < len(tokens) and tokens[i].type != "heading_close": + if tokens[i].type == "inline": + title_txt += tokens[i].content + i += 1 + title_txt = title_txt.strip() + # Section-Pfad aktualisieren + if lvl == 2: + h2, h3 = title_txt, None + section_path = f"/{h2}" + elif lvl == 3: + h3 = title_txt + section_path = f"/{h2}/{h3}" if h2 else f"/{h3}" + push("heading", title_txt, lvl) + elif t.type in ("paragraph_open", "bullet_list_open", "ordered_list_open", + "fence", "code_block", "blockquote_open", "table_open", "hr"): + kind = { + "paragraph_open": "paragraph", + "bullet_list_open": "list", + "ordered_list_open": "list", + "fence": "code", + "code_block": "code", + "blockquote_open": "blockquote", + "table_open": "table", + "hr": "thematic_break", + }[t.type] + + if t.type in ("fence", "code_block"): + # Codeblock hat eigenen content im selben Token + content = t.content or "" + push(kind, content, None) + else: + # inline sammeln bis close + content = "" + i += 1 + depth = 1 + while i < len(tokens) and depth > 0: + tk = tokens[i] + if tk.type.endswith("_open"): + depth += 1 + elif tk.type.endswith("_close"): + depth -= 1 + elif tk.type == "inline": + content += tk.content + i += 1 + push(kind, content, None) + continue # wir sind schon auf nächstem Token + i += 1 + + return blocks + +def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: + sizes = get_sizes(note_type) + target = sum(sizes["target"]) // 2 # mittlerer Zielwert + max_tokens = sizes["max"] + ov_min, ov_max = sizes["overlap"] + overlap = (ov_min + ov_max) // 2 + + blocks = parse_blocks(md_text) + + chunks: List[Chunk] = [] + buf: List[Tuple[str, str, str]] = [] # (text, section_title, section_path) + char_pos = 0 + + def flush_buffer(force=False): + nonlocal buf, chunks, char_pos + if not buf: + return + text = "\n\n".join([b[0] for b in buf]).strip() + if not text: + buf = [] + return + + # Wenn zu groß, satzbasiert weich umbrechen + toks = estimate_tokens(text) + if toks > max_tokens: + sentences = split_sentences(text) + cur = [] + cur_tokens = 0 + for s in sentences: + st = estimate_tokens(s) + if cur_tokens + st > target and cur: + _emit("\n".join(cur)) + # Overlap: letzte Sätze wiederverwenden + ov_text = " ".join(cur)[-overlap*4:] # 4 chars/token Heuristik + cur = [ov_text, s] if ov_text else [s] + cur_tokens = estimate_tokens(" ".join(cur)) + else: + cur.append(s) + cur_tokens += st + if cur: + _emit("\n".join(cur)) + else: + _emit(text) + buf = [] + + def _emit(text_block: str): + nonlocal chunks, char_pos + idx = len(chunks) + chunk_id = f"{note_id}#c{idx:02d}" + token_count = estimate_tokens(text_block) + # section aus letztem buffer-entry ableiten + sec_title = buf[-1][1] if buf else None + sec_path = buf[-1][2] if buf else "/" + start = char_pos + end = start + len(text_block) + chunks.append(Chunk( + id=chunk_id, + note_id=note_id, + index=idx, + text=text_block, + token_count=token_count, + section_title=sec_title, + section_path=sec_path, + neighbors_prev=None, + neighbors_next=None, + char_start=start, + char_end=end + )) + char_pos = end + 1 + + # Blocks in Puffer sammeln; bei Überschreiten Zielbereich flushen + cur_sec_title = None + for b in blocks: + if b.kind == "heading" and b.level in (2, 3): + # Sectionwechsel ⇒ Buffer flushen + flush_buffer() + cur_sec_title = b.text.strip() + # Heading selbst nicht als Chunk, aber als Kontexttitel nutzen + continue + + txt = b.text.strip() + if not txt: + continue + + tentative = "\n\n".join([*(x[0] for x in buf), txt]).strip() + if estimate_tokens(tentative) > max(get_sizes(note_type)["target"]): + # weicher Schnitt vor Hinzufügen + flush_buffer() + buf.append((txt, cur_sec_title, b.section_path)) + + # bei Erreichen ~Target flushen + if estimate_tokens("\n\n".join([x[0] for x in buf])) >= target: + flush_buffer() + + flush_buffer(force=True) + + # neighbors setzen + for i, ch in enumerate(chunks): + ch.neighbors_prev = chunks[i-1].id if i > 0 else None + ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None + return chunks