All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
227 lines
7.4 KiB
Python
227 lines
7.4 KiB
Python
from __future__ import annotations
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Optional, Tuple
|
|
import re
|
|
import math
|
|
from markdown_it import MarkdownIt
|
|
from markdown_it.token import Token
|
|
from .chunk_config import get_sizes
|
|
|
|
# --- Hilfen ---
|
|
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
|
|
_WS = re.compile(r'\s+')
|
|
|
|
def estimate_tokens(text: str) -> int:
|
|
# leichte Approximation: 1 Token ≈ 4 Zeichen; robust + schnell
|
|
t = len(text.strip())
|
|
return max(1, math.ceil(t / 4))
|
|
|
|
def split_sentences(text: str) -> list[str]:
|
|
text = _WS.sub(' ', text.strip())
|
|
if not text:
|
|
return []
|
|
parts = _SENT_SPLIT.split(text)
|
|
return [p.strip() for p in parts if p.strip()]
|
|
|
|
@dataclass
|
|
class RawBlock:
|
|
kind: str # "heading" | "paragraph" | "list" | "code" | "table" | "thematic_break" | "blockquote"
|
|
text: str
|
|
level: Optional[int] # heading level (2,3,...) or None
|
|
section_path: str # e.g., "/H2 Title/H3 Subtitle"
|
|
|
|
@dataclass
|
|
class Chunk:
|
|
id: str
|
|
note_id: str
|
|
index: int
|
|
text: str
|
|
token_count: int
|
|
section_title: Optional[str]
|
|
section_path: str
|
|
neighbors_prev: Optional[str]
|
|
neighbors_next: Optional[str]
|
|
char_start: int
|
|
char_end: int
|
|
|
|
# --- Markdown zu RawBlocks: H2/H3 als Sections, andere Blöcke gruppiert ---
|
|
def parse_blocks(md_text: str) -> List[RawBlock]:
|
|
md = MarkdownIt("commonmark").enable("table")
|
|
tokens: List[Token] = md.parse(md_text)
|
|
|
|
blocks: List[RawBlock] = []
|
|
h2, h3 = None, None
|
|
section_path = "/"
|
|
cur_text = []
|
|
cur_kind = None
|
|
|
|
def push(kind: str, txt: str, lvl: Optional[int]):
|
|
nonlocal section_path
|
|
txt = txt.strip()
|
|
if not txt:
|
|
return
|
|
title = None
|
|
if kind == "heading" and lvl:
|
|
title = txt
|
|
blocks.append(RawBlock(kind=kind, text=txt, level=lvl, section_path=section_path))
|
|
|
|
i = 0
|
|
while i < len(tokens):
|
|
t = tokens[i]
|
|
if t.type == "heading_open":
|
|
lvl = int(t.tag[1])
|
|
# Sammle heading inline
|
|
i += 1
|
|
title_txt = ""
|
|
while i < len(tokens) and tokens[i].type != "heading_close":
|
|
if tokens[i].type == "inline":
|
|
title_txt += tokens[i].content
|
|
i += 1
|
|
title_txt = title_txt.strip()
|
|
# Section-Pfad aktualisieren
|
|
if lvl == 2:
|
|
h2, h3 = title_txt, None
|
|
section_path = f"/{h2}"
|
|
elif lvl == 3:
|
|
h3 = title_txt
|
|
section_path = f"/{h2}/{h3}" if h2 else f"/{h3}"
|
|
push("heading", title_txt, lvl)
|
|
elif t.type in ("paragraph_open", "bullet_list_open", "ordered_list_open",
|
|
"fence", "code_block", "blockquote_open", "table_open", "hr"):
|
|
kind = {
|
|
"paragraph_open": "paragraph",
|
|
"bullet_list_open": "list",
|
|
"ordered_list_open": "list",
|
|
"fence": "code",
|
|
"code_block": "code",
|
|
"blockquote_open": "blockquote",
|
|
"table_open": "table",
|
|
"hr": "thematic_break",
|
|
}[t.type]
|
|
|
|
if t.type in ("fence", "code_block"):
|
|
# Codeblock hat eigenen content im selben Token
|
|
content = t.content or ""
|
|
push(kind, content, None)
|
|
else:
|
|
# inline sammeln bis close
|
|
content = ""
|
|
i += 1
|
|
depth = 1
|
|
while i < len(tokens) and depth > 0:
|
|
tk = tokens[i]
|
|
if tk.type.endswith("_open"):
|
|
depth += 1
|
|
elif tk.type.endswith("_close"):
|
|
depth -= 1
|
|
elif tk.type == "inline":
|
|
content += tk.content
|
|
i += 1
|
|
push(kind, content, None)
|
|
continue # wir sind schon auf nächstem Token
|
|
i += 1
|
|
|
|
return blocks
|
|
|
|
def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
|
|
sizes = get_sizes(note_type)
|
|
target = sum(sizes["target"]) // 2 # mittlerer Zielwert
|
|
max_tokens = sizes["max"]
|
|
ov_min, ov_max = sizes["overlap"]
|
|
overlap = (ov_min + ov_max) // 2
|
|
|
|
blocks = parse_blocks(md_text)
|
|
|
|
chunks: List[Chunk] = []
|
|
buf: List[Tuple[str, str, str]] = [] # (text, section_title, section_path)
|
|
char_pos = 0
|
|
|
|
def flush_buffer(force=False):
|
|
nonlocal buf, chunks, char_pos
|
|
if not buf:
|
|
return
|
|
text = "\n\n".join([b[0] for b in buf]).strip()
|
|
if not text:
|
|
buf = []
|
|
return
|
|
|
|
# Wenn zu groß, satzbasiert weich umbrechen
|
|
toks = estimate_tokens(text)
|
|
if toks > max_tokens:
|
|
sentences = split_sentences(text)
|
|
cur = []
|
|
cur_tokens = 0
|
|
for s in sentences:
|
|
st = estimate_tokens(s)
|
|
if cur_tokens + st > target and cur:
|
|
_emit("\n".join(cur))
|
|
# Overlap: letzte Sätze wiederverwenden
|
|
ov_text = " ".join(cur)[-overlap*4:] # 4 chars/token Heuristik
|
|
cur = [ov_text, s] if ov_text else [s]
|
|
cur_tokens = estimate_tokens(" ".join(cur))
|
|
else:
|
|
cur.append(s)
|
|
cur_tokens += st
|
|
if cur:
|
|
_emit("\n".join(cur))
|
|
else:
|
|
_emit(text)
|
|
buf = []
|
|
|
|
def _emit(text_block: str):
|
|
nonlocal chunks, char_pos
|
|
idx = len(chunks)
|
|
chunk_id = f"{note_id}#c{idx:02d}"
|
|
token_count = estimate_tokens(text_block)
|
|
# section aus letztem buffer-entry ableiten
|
|
sec_title = buf[-1][1] if buf else None
|
|
sec_path = buf[-1][2] if buf else "/"
|
|
start = char_pos
|
|
end = start + len(text_block)
|
|
chunks.append(Chunk(
|
|
id=chunk_id,
|
|
note_id=note_id,
|
|
index=idx,
|
|
text=text_block,
|
|
token_count=token_count,
|
|
section_title=sec_title,
|
|
section_path=sec_path,
|
|
neighbors_prev=None,
|
|
neighbors_next=None,
|
|
char_start=start,
|
|
char_end=end
|
|
))
|
|
char_pos = end + 1
|
|
|
|
# Blocks in Puffer sammeln; bei Überschreiten Zielbereich flushen
|
|
cur_sec_title = None
|
|
for b in blocks:
|
|
if b.kind == "heading" and b.level in (2, 3):
|
|
# Sectionwechsel ⇒ Buffer flushen
|
|
flush_buffer()
|
|
cur_sec_title = b.text.strip()
|
|
# Heading selbst nicht als Chunk, aber als Kontexttitel nutzen
|
|
continue
|
|
|
|
txt = b.text.strip()
|
|
if not txt:
|
|
continue
|
|
|
|
tentative = "\n\n".join([*(x[0] for x in buf), txt]).strip()
|
|
if estimate_tokens(tentative) > max(get_sizes(note_type)["target"]):
|
|
# weicher Schnitt vor Hinzufügen
|
|
flush_buffer()
|
|
buf.append((txt, cur_sec_title, b.section_path))
|
|
|
|
# bei Erreichen ~Target flushen
|
|
if estimate_tokens("\n\n".join([x[0] for x in buf])) >= target:
|
|
flush_buffer()
|
|
|
|
flush_buffer(force=True)
|
|
|
|
# neighbors setzen
|
|
for i, ch in enumerate(chunks):
|
|
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
|
|
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
|
|
return chunks
|