from __future__ import annotations from dataclasses import dataclass from typing import List, Dict, Optional, Tuple import re import math from markdown_it import MarkdownIt from markdown_it.token import Token from .chunk_config import get_sizes # --- Hilfen --- _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: # leichte Approximation: 1 Token ≈ 4 Zeichen; robust + schnell t = len(text.strip()) return max(1, math.ceil(t / 4)) def split_sentences(text: str) -> list[str]: text = _WS.sub(' ', text.strip()) if not text: return [] parts = _SENT_SPLIT.split(text) return [p.strip() for p in parts if p.strip()] @dataclass class RawBlock: kind: str # "heading" | "paragraph" | "list" | "code" | "table" | "thematic_break" | "blockquote" text: str level: Optional[int] # heading level (2,3,...) or None section_path: str # e.g., "/H2 Title/H3 Subtitle" @dataclass class Chunk: id: str note_id: str index: int text: str token_count: int section_title: Optional[str] section_path: str neighbors_prev: Optional[str] neighbors_next: Optional[str] char_start: int char_end: int # --- Markdown zu RawBlocks: H2/H3 als Sections, andere Blöcke gruppiert --- def parse_blocks(md_text: str) -> List[RawBlock]: md = MarkdownIt("commonmark").enable("table") tokens: List[Token] = md.parse(md_text) blocks: List[RawBlock] = [] h2, h3 = None, None section_path = "/" cur_text = [] cur_kind = None def push(kind: str, txt: str, lvl: Optional[int]): nonlocal section_path txt = txt.strip() if not txt: return title = None if kind == "heading" and lvl: title = txt blocks.append(RawBlock(kind=kind, text=txt, level=lvl, section_path=section_path)) i = 0 while i < len(tokens): t = tokens[i] if t.type == "heading_open": lvl = int(t.tag[1]) # Sammle heading inline i += 1 title_txt = "" while i < len(tokens) and tokens[i].type != "heading_close": if tokens[i].type == "inline": title_txt += tokens[i].content i += 1 title_txt = title_txt.strip() # Section-Pfad aktualisieren if lvl == 2: h2, h3 = title_txt, None section_path = f"/{h2}" elif lvl == 3: h3 = title_txt section_path = f"/{h2}/{h3}" if h2 else f"/{h3}" push("heading", title_txt, lvl) elif t.type in ("paragraph_open", "bullet_list_open", "ordered_list_open", "fence", "code_block", "blockquote_open", "table_open", "hr"): kind = { "paragraph_open": "paragraph", "bullet_list_open": "list", "ordered_list_open": "list", "fence": "code", "code_block": "code", "blockquote_open": "blockquote", "table_open": "table", "hr": "thematic_break", }[t.type] if t.type in ("fence", "code_block"): # Codeblock hat eigenen content im selben Token content = t.content or "" push(kind, content, None) else: # inline sammeln bis close content = "" i += 1 depth = 1 while i < len(tokens) and depth > 0: tk = tokens[i] if tk.type.endswith("_open"): depth += 1 elif tk.type.endswith("_close"): depth -= 1 elif tk.type == "inline": content += tk.content i += 1 push(kind, content, None) continue # wir sind schon auf nächstem Token i += 1 return blocks def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]: sizes = get_sizes(note_type) target = sum(sizes["target"]) // 2 # mittlerer Zielwert max_tokens = sizes["max"] ov_min, ov_max = sizes["overlap"] overlap = (ov_min + ov_max) // 2 blocks = parse_blocks(md_text) chunks: List[Chunk] = [] buf: List[Tuple[str, str, str]] = [] # (text, section_title, section_path) char_pos = 0 def flush_buffer(force=False): nonlocal buf, chunks, char_pos if not buf: return text = "\n\n".join([b[0] for b in buf]).strip() if not text: buf = [] return # Wenn zu groß, satzbasiert weich umbrechen toks = estimate_tokens(text) if toks > max_tokens: sentences = split_sentences(text) cur = [] cur_tokens = 0 for s in sentences: st = estimate_tokens(s) if cur_tokens + st > target and cur: _emit("\n".join(cur)) # Overlap: letzte Sätze wiederverwenden ov_text = " ".join(cur)[-overlap*4:] # 4 chars/token Heuristik cur = [ov_text, s] if ov_text else [s] cur_tokens = estimate_tokens(" ".join(cur)) else: cur.append(s) cur_tokens += st if cur: _emit("\n".join(cur)) else: _emit(text) buf = [] def _emit(text_block: str): nonlocal chunks, char_pos idx = len(chunks) chunk_id = f"{note_id}#c{idx:02d}" token_count = estimate_tokens(text_block) # section aus letztem buffer-entry ableiten sec_title = buf[-1][1] if buf else None sec_path = buf[-1][2] if buf else "/" start = char_pos end = start + len(text_block) chunks.append(Chunk( id=chunk_id, note_id=note_id, index=idx, text=text_block, token_count=token_count, section_title=sec_title, section_path=sec_path, neighbors_prev=None, neighbors_next=None, char_start=start, char_end=end )) char_pos = end + 1 # Blocks in Puffer sammeln; bei Überschreiten Zielbereich flushen cur_sec_title = None for b in blocks: if b.kind == "heading" and b.level in (2, 3): # Sectionwechsel ⇒ Buffer flushen flush_buffer() cur_sec_title = b.text.strip() # Heading selbst nicht als Chunk, aber als Kontexttitel nutzen continue txt = b.text.strip() if not txt: continue tentative = "\n\n".join([*(x[0] for x in buf), txt]).strip() if estimate_tokens(tentative) > max(get_sizes(note_type)["target"]): # weicher Schnitt vor Hinzufügen flush_buffer() buf.append((txt, cur_sec_title, b.section_path)) # bei Erreichen ~Target flushen if estimate_tokens("\n\n".join([x[0] for x in buf])) >= target: flush_buffer() flush_buffer(force=True) # neighbors setzen for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None return chunks