app/core/chunker.py hinzugefügt
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
This commit is contained in:
parent
9a3423f35f
commit
edcb36958d
226
app/core/chunker.py
Normal file
226
app/core/chunker.py
Normal file
|
|
@ -0,0 +1,226 @@
|
|||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
import re
|
||||
import math
|
||||
from markdown_it import MarkdownIt
|
||||
from markdown_it.token import Token
|
||||
from .chunk_config import get_sizes
|
||||
|
||||
# --- Hilfen ---
|
||||
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
|
||||
_WS = re.compile(r'\s+')
|
||||
|
||||
def estimate_tokens(text: str) -> int:
|
||||
# leichte Approximation: 1 Token ≈ 4 Zeichen; robust + schnell
|
||||
t = len(text.strip())
|
||||
return max(1, math.ceil(t / 4))
|
||||
|
||||
def split_sentences(text: str) -> list[str]:
|
||||
text = _WS.sub(' ', text.strip())
|
||||
if not text:
|
||||
return []
|
||||
parts = _SENT_SPLIT.split(text)
|
||||
return [p.strip() for p in parts if p.strip()]
|
||||
|
||||
@dataclass
|
||||
class RawBlock:
|
||||
kind: str # "heading" | "paragraph" | "list" | "code" | "table" | "thematic_break" | "blockquote"
|
||||
text: str
|
||||
level: Optional[int] # heading level (2,3,...) or None
|
||||
section_path: str # e.g., "/H2 Title/H3 Subtitle"
|
||||
|
||||
@dataclass
|
||||
class Chunk:
|
||||
id: str
|
||||
note_id: str
|
||||
index: int
|
||||
text: str
|
||||
token_count: int
|
||||
section_title: Optional[str]
|
||||
section_path: str
|
||||
neighbors_prev: Optional[str]
|
||||
neighbors_next: Optional[str]
|
||||
char_start: int
|
||||
char_end: int
|
||||
|
||||
# --- Markdown zu RawBlocks: H2/H3 als Sections, andere Blöcke gruppiert ---
|
||||
def parse_blocks(md_text: str) -> List[RawBlock]:
|
||||
md = MarkdownIt("commonmark").enable("table")
|
||||
tokens: List[Token] = md.parse(md_text)
|
||||
|
||||
blocks: List[RawBlock] = []
|
||||
h2, h3 = None, None
|
||||
section_path = "/"
|
||||
cur_text = []
|
||||
cur_kind = None
|
||||
|
||||
def push(kind: str, txt: str, lvl: Optional[int]):
|
||||
nonlocal section_path
|
||||
txt = txt.strip()
|
||||
if not txt:
|
||||
return
|
||||
title = None
|
||||
if kind == "heading" and lvl:
|
||||
title = txt
|
||||
blocks.append(RawBlock(kind=kind, text=txt, level=lvl, section_path=section_path))
|
||||
|
||||
i = 0
|
||||
while i < len(tokens):
|
||||
t = tokens[i]
|
||||
if t.type == "heading_open":
|
||||
lvl = int(t.tag[1])
|
||||
# Sammle heading inline
|
||||
i += 1
|
||||
title_txt = ""
|
||||
while i < len(tokens) and tokens[i].type != "heading_close":
|
||||
if tokens[i].type == "inline":
|
||||
title_txt += tokens[i].content
|
||||
i += 1
|
||||
title_txt = title_txt.strip()
|
||||
# Section-Pfad aktualisieren
|
||||
if lvl == 2:
|
||||
h2, h3 = title_txt, None
|
||||
section_path = f"/{h2}"
|
||||
elif lvl == 3:
|
||||
h3 = title_txt
|
||||
section_path = f"/{h2}/{h3}" if h2 else f"/{h3}"
|
||||
push("heading", title_txt, lvl)
|
||||
elif t.type in ("paragraph_open", "bullet_list_open", "ordered_list_open",
|
||||
"fence", "code_block", "blockquote_open", "table_open", "hr"):
|
||||
kind = {
|
||||
"paragraph_open": "paragraph",
|
||||
"bullet_list_open": "list",
|
||||
"ordered_list_open": "list",
|
||||
"fence": "code",
|
||||
"code_block": "code",
|
||||
"blockquote_open": "blockquote",
|
||||
"table_open": "table",
|
||||
"hr": "thematic_break",
|
||||
}[t.type]
|
||||
|
||||
if t.type in ("fence", "code_block"):
|
||||
# Codeblock hat eigenen content im selben Token
|
||||
content = t.content or ""
|
||||
push(kind, content, None)
|
||||
else:
|
||||
# inline sammeln bis close
|
||||
content = ""
|
||||
i += 1
|
||||
depth = 1
|
||||
while i < len(tokens) and depth > 0:
|
||||
tk = tokens[i]
|
||||
if tk.type.endswith("_open"):
|
||||
depth += 1
|
||||
elif tk.type.endswith("_close"):
|
||||
depth -= 1
|
||||
elif tk.type == "inline":
|
||||
content += tk.content
|
||||
i += 1
|
||||
push(kind, content, None)
|
||||
continue # wir sind schon auf nächstem Token
|
||||
i += 1
|
||||
|
||||
return blocks
|
||||
|
||||
def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
|
||||
sizes = get_sizes(note_type)
|
||||
target = sum(sizes["target"]) // 2 # mittlerer Zielwert
|
||||
max_tokens = sizes["max"]
|
||||
ov_min, ov_max = sizes["overlap"]
|
||||
overlap = (ov_min + ov_max) // 2
|
||||
|
||||
blocks = parse_blocks(md_text)
|
||||
|
||||
chunks: List[Chunk] = []
|
||||
buf: List[Tuple[str, str, str]] = [] # (text, section_title, section_path)
|
||||
char_pos = 0
|
||||
|
||||
def flush_buffer(force=False):
|
||||
nonlocal buf, chunks, char_pos
|
||||
if not buf:
|
||||
return
|
||||
text = "\n\n".join([b[0] for b in buf]).strip()
|
||||
if not text:
|
||||
buf = []
|
||||
return
|
||||
|
||||
# Wenn zu groß, satzbasiert weich umbrechen
|
||||
toks = estimate_tokens(text)
|
||||
if toks > max_tokens:
|
||||
sentences = split_sentences(text)
|
||||
cur = []
|
||||
cur_tokens = 0
|
||||
for s in sentences:
|
||||
st = estimate_tokens(s)
|
||||
if cur_tokens + st > target and cur:
|
||||
_emit("\n".join(cur))
|
||||
# Overlap: letzte Sätze wiederverwenden
|
||||
ov_text = " ".join(cur)[-overlap*4:] # 4 chars/token Heuristik
|
||||
cur = [ov_text, s] if ov_text else [s]
|
||||
cur_tokens = estimate_tokens(" ".join(cur))
|
||||
else:
|
||||
cur.append(s)
|
||||
cur_tokens += st
|
||||
if cur:
|
||||
_emit("\n".join(cur))
|
||||
else:
|
||||
_emit(text)
|
||||
buf = []
|
||||
|
||||
def _emit(text_block: str):
|
||||
nonlocal chunks, char_pos
|
||||
idx = len(chunks)
|
||||
chunk_id = f"{note_id}#c{idx:02d}"
|
||||
token_count = estimate_tokens(text_block)
|
||||
# section aus letztem buffer-entry ableiten
|
||||
sec_title = buf[-1][1] if buf else None
|
||||
sec_path = buf[-1][2] if buf else "/"
|
||||
start = char_pos
|
||||
end = start + len(text_block)
|
||||
chunks.append(Chunk(
|
||||
id=chunk_id,
|
||||
note_id=note_id,
|
||||
index=idx,
|
||||
text=text_block,
|
||||
token_count=token_count,
|
||||
section_title=sec_title,
|
||||
section_path=sec_path,
|
||||
neighbors_prev=None,
|
||||
neighbors_next=None,
|
||||
char_start=start,
|
||||
char_end=end
|
||||
))
|
||||
char_pos = end + 1
|
||||
|
||||
# Blocks in Puffer sammeln; bei Überschreiten Zielbereich flushen
|
||||
cur_sec_title = None
|
||||
for b in blocks:
|
||||
if b.kind == "heading" and b.level in (2, 3):
|
||||
# Sectionwechsel ⇒ Buffer flushen
|
||||
flush_buffer()
|
||||
cur_sec_title = b.text.strip()
|
||||
# Heading selbst nicht als Chunk, aber als Kontexttitel nutzen
|
||||
continue
|
||||
|
||||
txt = b.text.strip()
|
||||
if not txt:
|
||||
continue
|
||||
|
||||
tentative = "\n\n".join([*(x[0] for x in buf), txt]).strip()
|
||||
if estimate_tokens(tentative) > max(get_sizes(note_type)["target"]):
|
||||
# weicher Schnitt vor Hinzufügen
|
||||
flush_buffer()
|
||||
buf.append((txt, cur_sec_title, b.section_path))
|
||||
|
||||
# bei Erreichen ~Target flushen
|
||||
if estimate_tokens("\n\n".join([x[0] for x in buf])) >= target:
|
||||
flush_buffer()
|
||||
|
||||
flush_buffer(force=True)
|
||||
|
||||
# neighbors setzen
|
||||
for i, ch in enumerate(chunks):
|
||||
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
|
||||
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
|
||||
return chunks
|
||||
Loading…
Reference in New Issue
Block a user