chunker korrigiert zu scmart edges
This commit is contained in:
parent
7263fee4c7
commit
8fadec5c2c
|
|
@ -1,11 +1,11 @@
|
||||||
"""
|
"""
|
||||||
FILE: app/core/chunker.py
|
FILE: app/core/chunker.py
|
||||||
DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer.
|
DESCRIPTION: Zerlegt Texte in Chunks (Sliding Window oder nach Headings). Orchestriert die Smart-Edge-Allocation via SemanticAnalyzer.
|
||||||
VERSION: 2.5.0
|
VERSION: 2.6.0 (Fix: Strict Heading Split & Header Retention)
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
DEPENDENCIES: app.services.semantic_analyzer, app.core.derive_edges, markdown_it, yaml, asyncio
|
DEPENDENCIES: app.services.semantic_analyzer, app.core.derive_edges, markdown_it, yaml, asyncio
|
||||||
EXTERNAL_CONFIG: config/types.yaml
|
EXTERNAL_CONFIG: config/types.yaml
|
||||||
LAST_ANALYSIS: 2025-12-15
|
LAST_ANALYSIS: 2025-12-16
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -15,8 +15,6 @@ import re
|
||||||
import math
|
import math
|
||||||
import yaml
|
import yaml
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from markdown_it import MarkdownIt
|
|
||||||
from markdown_it.token import Token
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
@ -27,7 +25,7 @@ from app.services.semantic_analyzer import get_semantic_analyzer
|
||||||
try:
|
try:
|
||||||
from app.core.derive_edges import build_edges_for_note
|
from app.core.derive_edges import build_edges_for_note
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# Mock für Tests
|
# Mock für Tests, falls Module fehlen
|
||||||
def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return []
|
def build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False): return []
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -103,7 +101,7 @@ class Chunk:
|
||||||
suggested_edges: Optional[List[str]] = None
|
suggested_edges: Optional[List[str]] = None
|
||||||
|
|
||||||
# ==========================================
|
# ==========================================
|
||||||
# 3. PARSING & STRATEGIES (SYNCHRON)
|
# 3. PARSING & STRATEGIES
|
||||||
# ==========================================
|
# ==========================================
|
||||||
|
|
||||||
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
|
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
|
||||||
|
|
@ -125,6 +123,8 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
|
||||||
for line in lines:
|
for line in lines:
|
||||||
stripped = line.strip()
|
stripped = line.strip()
|
||||||
if stripped.startswith('# '):
|
if stripped.startswith('# '):
|
||||||
|
# H1 wird für den Titel genutzt, aber nicht als Block für sliding window
|
||||||
|
# (Außer es ist H1 im Body, aber wir ignorieren H1 hier meist als Title)
|
||||||
continue
|
continue
|
||||||
elif stripped.startswith('## '):
|
elif stripped.startswith('## '):
|
||||||
if buffer:
|
if buffer:
|
||||||
|
|
@ -134,6 +134,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
|
||||||
buffer = []
|
buffer = []
|
||||||
current_h2 = stripped[3:].strip()
|
current_h2 = stripped[3:].strip()
|
||||||
section_path = f"/{current_h2}"
|
section_path = f"/{current_h2}"
|
||||||
|
# WICHTIG: Die Überschrift selbst als Block speichern!
|
||||||
blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2))
|
blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2))
|
||||||
elif not stripped:
|
elif not stripped:
|
||||||
if buffer:
|
if buffer:
|
||||||
|
|
@ -151,6 +152,15 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
|
||||||
|
|
||||||
return blocks, h1_title
|
return blocks, h1_title
|
||||||
|
|
||||||
|
def _create_chunk_obj(chunks_list: List[Chunk], note_id: str, txt: str, win: str, sec: Optional[str], path: str):
|
||||||
|
idx = len(chunks_list)
|
||||||
|
chunks_list.append(Chunk(
|
||||||
|
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
|
||||||
|
text=txt, window=win, token_count=estimate_tokens(txt),
|
||||||
|
section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None,
|
||||||
|
suggested_edges=[]
|
||||||
|
))
|
||||||
|
|
||||||
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]:
|
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]:
|
||||||
target = config.get("target", 400)
|
target = config.get("target", 400)
|
||||||
max_tokens = config.get("max", 600)
|
max_tokens = config.get("max", 600)
|
||||||
|
|
@ -158,15 +168,6 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not
|
||||||
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
|
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
|
||||||
chunks = []; buf = []
|
chunks = []; buf = []
|
||||||
|
|
||||||
def _create_chunk(txt, win, sec, path):
|
|
||||||
idx = len(chunks)
|
|
||||||
chunks.append(Chunk(
|
|
||||||
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
|
|
||||||
text=txt, window=win, token_count=estimate_tokens(txt),
|
|
||||||
section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None,
|
|
||||||
suggested_edges=[]
|
|
||||||
))
|
|
||||||
|
|
||||||
def flush_buffer():
|
def flush_buffer():
|
||||||
nonlocal buf
|
nonlocal buf
|
||||||
if not buf: return
|
if not buf: return
|
||||||
|
|
@ -175,18 +176,24 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not
|
||||||
win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
|
win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
|
||||||
|
|
||||||
if estimate_tokens(text_body) <= max_tokens:
|
if estimate_tokens(text_body) <= max_tokens:
|
||||||
_create_chunk(text_body, win_body, buf[-1].section_title, buf[-1].section_path)
|
sec = buf[0].section_title if buf else None
|
||||||
|
path = buf[0].section_path if buf else "/"
|
||||||
|
_create_chunk_obj(chunks, note_id, text_body, win_body, sec, path)
|
||||||
else:
|
else:
|
||||||
sentences = split_sentences(text_body)
|
sentences = split_sentences(text_body)
|
||||||
current_chunk_sents = []
|
current_chunk_sents = []
|
||||||
current_len = 0
|
current_len = 0
|
||||||
|
|
||||||
|
# Basis-Info vom ersten Block im Buffer
|
||||||
|
sec = buf[0].section_title if buf else None
|
||||||
|
path = buf[0].section_path if buf else "/"
|
||||||
|
|
||||||
for sent in sentences:
|
for sent in sentences:
|
||||||
sent_len = estimate_tokens(sent)
|
sent_len = estimate_tokens(sent)
|
||||||
if current_len + sent_len > target and current_chunk_sents:
|
if current_len + sent_len > target and current_chunk_sents:
|
||||||
c_txt = " ".join(current_chunk_sents)
|
c_txt = " ".join(current_chunk_sents)
|
||||||
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
|
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
|
||||||
_create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path)
|
_create_chunk_obj(chunks, note_id, c_txt, c_win, sec, path)
|
||||||
|
|
||||||
overlap_sents = []
|
overlap_sents = []
|
||||||
ov_len = 0
|
ov_len = 0
|
||||||
|
|
@ -207,27 +214,81 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not
|
||||||
if current_chunk_sents:
|
if current_chunk_sents:
|
||||||
c_txt = " ".join(current_chunk_sents)
|
c_txt = " ".join(current_chunk_sents)
|
||||||
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
|
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
|
||||||
_create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path)
|
_create_chunk_obj(chunks, note_id, c_txt, c_win, sec, path)
|
||||||
|
|
||||||
buf = []
|
buf = []
|
||||||
|
|
||||||
for b in blocks:
|
for b in blocks:
|
||||||
if b.kind == "heading": continue
|
# Bei Sliding Window ignorieren wir Heading-Blocks als Split-Trigger NICHT zwingend,
|
||||||
|
# aber wir wollen Headings oft nicht "allein" stehen haben.
|
||||||
|
# Hier einfache Logik:
|
||||||
|
if b.kind == "heading":
|
||||||
|
# Optional: Buffer flushen bei neuem Header, um Kontextwechsel sauberer zu machen
|
||||||
|
flush_buffer()
|
||||||
|
|
||||||
current_buf_text = "\n\n".join([x.text for x in buf])
|
current_buf_text = "\n\n".join([x.text for x in buf])
|
||||||
if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target:
|
if buf and (estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target):
|
||||||
flush_buffer()
|
flush_buffer()
|
||||||
|
|
||||||
buf.append(b)
|
buf.append(b)
|
||||||
if estimate_tokens(b.text) >= target:
|
|
||||||
flush_buffer()
|
|
||||||
|
|
||||||
flush_buffer()
|
flush_buffer()
|
||||||
return chunks
|
return chunks
|
||||||
|
|
||||||
def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
|
def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
|
||||||
return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}")
|
"""
|
||||||
|
STRICT HEADING SPLIT (Fix v2.6.0):
|
||||||
|
Trennt den Text konsequent an jeder Überschrift der definierten Ebene.
|
||||||
|
Behält Überschriften als Teil (erste Zeile) des Chunks bei.
|
||||||
|
Kein Merging kleiner Abschnitte über Header-Grenzen hinweg.
|
||||||
|
"""
|
||||||
|
split_level = config.get("split_level", 2)
|
||||||
|
chunks = []
|
||||||
|
|
||||||
|
# Temporärer Speicher für den aktuellen Chunk
|
||||||
|
current_chunk_blocks = []
|
||||||
|
|
||||||
|
context_prefix = f"# {doc_title}"
|
||||||
|
|
||||||
|
def flush_current_chunk():
|
||||||
|
nonlocal current_chunk_blocks
|
||||||
|
if not current_chunk_blocks:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Text zusammenbauen
|
||||||
|
text_body = "\n\n".join([b.text for b in current_chunk_blocks])
|
||||||
|
# Window bauen (hier einfach Text, da Kontext via Header implizit ist)
|
||||||
|
win_body = f"{context_prefix}\n{text_body}".strip()
|
||||||
|
|
||||||
|
# Metadaten vom ersten Block (üblicherweise der Header) nehmen
|
||||||
|
first_b = current_chunk_blocks[0]
|
||||||
|
sec = first_b.section_title
|
||||||
|
path = first_b.section_path
|
||||||
|
|
||||||
|
_create_chunk_obj(chunks, note_id, text_body, win_body, sec, path)
|
||||||
|
current_chunk_blocks = []
|
||||||
|
|
||||||
|
for b in blocks:
|
||||||
|
# Prüfen, ob dieser Block ein Trenner (Header auf Split-Level) ist
|
||||||
|
is_splitter = (b.kind == "heading" and b.level == split_level)
|
||||||
|
|
||||||
|
if is_splitter:
|
||||||
|
# 1. Den bisherigen Chunk abschließen (falls vorhanden)
|
||||||
|
flush_current_chunk()
|
||||||
|
|
||||||
|
# 2. Den neuen Chunk mit diesem Header beginnen
|
||||||
|
current_chunk_blocks.append(b)
|
||||||
|
else:
|
||||||
|
# Einfach anhängen
|
||||||
|
current_chunk_blocks.append(b)
|
||||||
|
|
||||||
|
# Letzten Rest flushen
|
||||||
|
flush_current_chunk()
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
|
||||||
# ==========================================
|
# ==========================================
|
||||||
# 4. ORCHESTRATION (ASYNC) - WP-15 CORE
|
# 4. ORCHESTRATION (ASYNC)
|
||||||
# ==========================================
|
# ==========================================
|
||||||
|
|
||||||
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
|
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
|
||||||
|
|
@ -240,12 +301,14 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op
|
||||||
primary_strategy = config.get("strategy", "sliding_window")
|
primary_strategy = config.get("strategy", "sliding_window")
|
||||||
enable_smart_edges = config.get("enable_smart_edge_allocation", False)
|
enable_smart_edges = config.get("enable_smart_edge_allocation", False)
|
||||||
|
|
||||||
|
# Performance/Cost-Guard: Bei Entwürfen keine Smart Edges
|
||||||
if enable_smart_edges and note_status in ["draft", "initial_gen"]:
|
if enable_smart_edges and note_status in ["draft", "initial_gen"]:
|
||||||
logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.")
|
logger.info(f"Chunker: Skipping Smart Edges for draft '{note_id}'.")
|
||||||
enable_smart_edges = False
|
enable_smart_edges = False
|
||||||
|
|
||||||
blocks, doc_title = parse_blocks(md_text)
|
blocks, doc_title = parse_blocks(md_text)
|
||||||
|
|
||||||
|
# Strategie-Auswahl
|
||||||
if primary_strategy == "by_heading":
|
if primary_strategy == "by_heading":
|
||||||
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
|
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
|
||||||
else:
|
else:
|
||||||
|
|
@ -254,10 +317,11 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op
|
||||||
if not chunks:
|
if not chunks:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
# Smart Edge Allocation (WP-15)
|
||||||
if enable_smart_edges:
|
if enable_smart_edges:
|
||||||
# Hier rufen wir nun die Smart Edge Allocation auf
|
|
||||||
chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type)
|
chunks = await _run_smart_edge_allocation(chunks, md_text, note_id, note_type)
|
||||||
|
|
||||||
|
# Verkettung der Chunks (next/prev)
|
||||||
for i, ch in enumerate(chunks):
|
for i, ch in enumerate(chunks):
|
||||||
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
|
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
|
||||||
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
|
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
|
||||||
|
|
@ -269,30 +333,25 @@ def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> Li
|
||||||
Hilfsfunktion: Erstellt einen Dummy-Chunk für den gesamten Text und ruft
|
Hilfsfunktion: Erstellt einen Dummy-Chunk für den gesamten Text und ruft
|
||||||
den Edge-Parser auf, um ALLE Kanten der Notiz zu finden.
|
den Edge-Parser auf, um ALLE Kanten der Notiz zu finden.
|
||||||
"""
|
"""
|
||||||
# 1. Dummy Chunk erstellen, der den gesamten Text enthält
|
|
||||||
# Das ist notwendig, da build_edges_for_note Kanten nur aus Chunks extrahiert.
|
|
||||||
dummy_chunk = {
|
dummy_chunk = {
|
||||||
"chunk_id": f"{note_id}#full",
|
"chunk_id": f"{note_id}#full",
|
||||||
"text": md_text,
|
"text": md_text,
|
||||||
"content": md_text, # Sicherstellen, dass der Parser Text findet
|
"content": md_text,
|
||||||
"window": md_text,
|
"window": md_text,
|
||||||
"type": note_type
|
"type": note_type
|
||||||
}
|
}
|
||||||
|
# Parsing aller Kanten (Inline, Wikilinks, Callouts)
|
||||||
# 2. Aufruf des Parsers (Signatur-Fix!)
|
|
||||||
# derive_edges.py: build_edges_for_note(note_id, chunks, note_level_references=None, include_note_scope_refs=False)
|
|
||||||
raw_edges = build_edges_for_note(
|
raw_edges = build_edges_for_note(
|
||||||
note_id,
|
note_id,
|
||||||
[dummy_chunk],
|
[dummy_chunk],
|
||||||
note_level_references=None,
|
note_level_references=None,
|
||||||
include_note_scope_refs=False
|
include_note_scope_refs=False
|
||||||
)
|
)
|
||||||
|
|
||||||
# 3. Kanten extrahieren
|
|
||||||
all_candidates = set()
|
all_candidates = set()
|
||||||
for e in raw_edges:
|
for e in raw_edges:
|
||||||
kind = e.get("kind")
|
kind = e.get("kind")
|
||||||
target = e.get("target_id")
|
target = e.get("target_id")
|
||||||
|
# Struktur-Kanten ignorieren wir für die Verteilung
|
||||||
if target and kind not in ["belongs_to", "next", "prev", "backlink"]:
|
if target and kind not in ["belongs_to", "next", "prev", "backlink"]:
|
||||||
all_candidates.add(f"{kind}:{target}")
|
all_candidates.add(f"{kind}:{target}")
|
||||||
|
|
||||||
|
|
@ -301,7 +360,7 @@ def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> Li
|
||||||
async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]:
|
async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]:
|
||||||
analyzer = get_semantic_analyzer()
|
analyzer = get_semantic_analyzer()
|
||||||
|
|
||||||
# A. Alle potenziellen Kanten der Notiz sammeln (über den Dummy-Chunk Trick)
|
# A. Alle potenziellen Kanten der Notiz sammeln
|
||||||
candidate_list = _extract_all_edges_from_md(full_text, note_id, note_type)
|
candidate_list = _extract_all_edges_from_md(full_text, note_id, note_type)
|
||||||
|
|
||||||
if not candidate_list:
|
if not candidate_list:
|
||||||
|
|
@ -314,7 +373,7 @@ async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_i
|
||||||
|
|
||||||
results_per_chunk = await asyncio.gather(*tasks)
|
results_per_chunk = await asyncio.gather(*tasks)
|
||||||
|
|
||||||
# C. Injection & Fallback
|
# C. Injection & Fallback Tracking
|
||||||
assigned_edges_global = set()
|
assigned_edges_global = set()
|
||||||
|
|
||||||
for i, confirmed_edges in enumerate(results_per_chunk):
|
for i, confirmed_edges in enumerate(results_per_chunk):
|
||||||
|
|
@ -322,14 +381,18 @@ async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_i
|
||||||
chunk.suggested_edges = confirmed_edges
|
chunk.suggested_edges = confirmed_edges
|
||||||
assigned_edges_global.update(confirmed_edges)
|
assigned_edges_global.update(confirmed_edges)
|
||||||
|
|
||||||
|
# Injection: Wir hängen die bestätigten Edges unsichtbar (fürs Embedding) oder sichtbar an
|
||||||
|
# Hier als "Pseudo-Code" im Text, damit sie embedded werden.
|
||||||
if confirmed_edges:
|
if confirmed_edges:
|
||||||
|
# Format: [[rel:kind|target]]
|
||||||
injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e])
|
injection_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in confirmed_edges if ':' in e])
|
||||||
chunk.text += injection_str
|
chunk.text += injection_str
|
||||||
chunk.window += injection_str
|
chunk.window += injection_str
|
||||||
|
|
||||||
# D. Fallback: Unassigned Kanten überall hin
|
# D. Fallback: Kanten, die NIRGENDS zugewiesen wurden, werden JEDEM Chunk angehängt (Sicherheit)
|
||||||
unassigned = set(candidate_list) - assigned_edges_global
|
unassigned = set(candidate_list) - assigned_edges_global
|
||||||
if unassigned:
|
if unassigned:
|
||||||
|
logger.info(f"Chunker: {len(unassigned)} unassigned edges in {note_id}. Distributing to all chunks.")
|
||||||
fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e])
|
fallback_str = "\n" + " ".join([f"[[rel:{e.split(':')[0]}|{e.split(':')[1]}]]" for e in unassigned if ':' in e])
|
||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
chunk.text += fallback_str
|
chunk.text += fallback_str
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user