From 386fa3ef0cbf8f22e1ececc85c4392531544ade5 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 27 Dec 2025 18:17:13 +0100 Subject: [PATCH] =?UTF-8?q?WP15b=20vollst=C3=A4ndieg=20chunking=20strategi?= =?UTF-8?q?en?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/chunking/chunking_processor.py | 79 +++++++++++---- app/core/chunking/chunking_propagation.py | 66 +++++++++---- app/core/chunking/chunking_strategies.py | 112 +++++++++++++++++----- 3 files changed, 200 insertions(+), 57 deletions(-) diff --git a/app/core/chunking/chunking_processor.py b/app/core/chunking/chunking_processor.py index 12c9a7b..1a17acb 100644 --- a/app/core/chunking/chunking_processor.py +++ b/app/core/chunking/chunking_processor.py @@ -1,9 +1,14 @@ """ FILE: app/core/chunking/chunking_processor.py -DESCRIPTION: Hauptlogik für das Zerlegen von Markdown in Chunks. +DESCRIPTION: Der zentrale Orchestrator für das Chunking-System. + AUDIT v3.3.3: Wiederherstellung der "Gold-Standard" Qualität. + - Integriert physikalische Kanten-Injektion (Propagierung). + - Stellt H1-Kontext-Fenster sicher. + - Baut den Candidate-Pool für die WP-15b Ingestion auf. """ import asyncio import re +import logging from typing import List, Dict, Optional from .chunking_models import Chunk from .chunking_utils import get_chunk_config, extract_frontmatter_from_text @@ -11,43 +16,79 @@ from .chunking_parser import parse_blocks, parse_edges_robust from .chunking_strategies import strategy_sliding_window, strategy_by_heading from .chunking_propagation import propagate_section_edges +logger = logging.getLogger(__name__) + async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: - """Orchestriert das Chunking und baut den Candidate-Pool auf.""" - if config is None: config = get_chunk_config(note_type) + """ + Hauptfunktion zur Zerlegung einer Note. + Verbindet Strategien mit physikalischer Kontext-Anreicherung. + """ + # 1. Konfiguration & Parsing + if config is None: + config = get_chunk_config(note_type) + fm, body_text = extract_frontmatter_from_text(md_text) blocks, doc_title = parse_blocks(md_text) + # Vorbereitung des H1-Präfix für die Embedding-Fenster + h1_prefix = f"# {doc_title}" if doc_title else "" + + # 2. Anwendung der Splitting-Strategie + # Wir übergeben den Dokument-Titel/Präfix für die Window-Bildung. if config.get("strategy") == "by_heading": chunks = await asyncio.to_thread(strategy_by_heading, blocks, config, note_id, doc_title) else: - chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id) + # sliding_window nutzt nun den context_prefix für das Window-Feld. + chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix) - if not chunks: return [] + if not chunks: + return [] - # WP-15b: Candidate Pool Aufbau - chunks = propagate_section_edges(chunks, blocks) + # 3. Physikalische Kontext-Anreicherung (Der Qualitäts-Fix) + # Schreibt Kanten aus Callouts/Inlines hart in den Text für Qdrant. + chunks = propagate_section_edges(chunks) + + # 4. WP-15b: Candidate Pool Aufbau (Metadaten für IngestionService) + # Zuerst die explizit im Text vorhandenen Kanten sammeln. for ch in chunks: + # Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text. for e_str in parse_edges_robust(ch.text): - k, t = e_str.split(':', 1) - ch.candidate_pool.append({"kind": k, "to": t, "provenance": "explicit"}) + parts = e_str.split(':', 1) + if len(parts) == 2: + k, t = parts + ch.candidate_pool.append({"kind": k, "to": t, "provenance": "explicit"}) - # Global Pool (Unzugeordnete Kanten) - pool_match = re.search(r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)', body_text, re.DOTALL | re.IGNORECASE) + # 5. Global Pool (Unzugeordnete Kanten aus dem Dokument-Ende) + # Sucht nach dem Edge-Pool Block im Original-Markdown. + pool_match = re.search( + r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)', + body_text, + re.DOTALL | re.IGNORECASE + ) if pool_match: - for e_str in parse_edges_robust(pool_match.group(1)): - k, t = e_str.split(':', 1) - for ch in chunks: ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"}) + global_edges = parse_edges_robust(pool_match.group(1)) + for e_str in global_edges: + parts = e_str.split(':', 1) + if len(parts) == 2: + k, t = parts + # Diese Kanten werden als "Global Pool" markiert für die spätere KI-Prüfung. + for ch in chunks: + ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"}) - # De-Duplikation + # 6. De-Duplikation des Pools & Linking for ch in chunks: - seen = set(); unique = [] + seen = set() + unique = [] for c in ch.candidate_pool: - if (c["kind"], c["to"]) not in seen: - seen.add((c["kind"], c["to"])); unique.append(c) + key = (c["kind"], c["to"], c["provenance"]) + if key not in seen: + seen.add(key) + unique.append(c) ch.candidate_pool = unique - # Nachbarschaften + # Verknüpfung der Nachbarschaften für Graph-Traversierung for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None + return chunks \ No newline at end of file diff --git a/app/core/chunking/chunking_propagation.py b/app/core/chunking/chunking_propagation.py index 1aeb361..099d075 100644 --- a/app/core/chunking/chunking_propagation.py +++ b/app/core/chunking/chunking_propagation.py @@ -1,25 +1,59 @@ """ FILE: app/core/chunking/chunking_propagation.py -DESCRIPTION: Vererbung von Kanten (Inheritance) über Sektions-Pfade. +DESCRIPTION: Injiziert Sektions-Kanten physisch in den Text (Embedding-Enrichment). + Stellt die "Gold-Standard"-Qualität von v3.1.0 wieder her. +VERSION: 3.3.1 +STATUS: Active """ from typing import List, Dict, Set -from .chunking_models import Chunk, RawBlock +from .chunking_models import Chunk from .chunking_parser import parse_edges_robust -def propagate_section_edges(chunks: List[Chunk], blocks: List[RawBlock]) -> List[Chunk]: - """WP-15b: Kanten aus Headings werden an Sub-Chunks vererbt.""" - section_inheritance: Dict[str, Set[str]] = {} - for b in blocks: - if b.kind == "heading": - edges = parse_edges_robust(b.text) - if edges: - if b.section_path not in section_inheritance: - section_inheritance[b.section_path] = set() - section_inheritance[b.section_path].update(edges) +def propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]: + """ + Sammelt Kanten pro Sektion und schreibt sie hart in den Text und das Window. + Dies ist essenziell für die Vektorisierung der Beziehungen. + """ + # 1. Sammeln: Alle expliziten Kanten pro Sektions-Pfad aggregieren + section_map: Dict[str, Set[str]] = {} # path -> set(kind:target) for ch in chunks: - inherited = section_inheritance.get(ch.section_path, set()) - for e_str in inherited: - kind, target = e_str.split(':', 1) - ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "inherited"}) + # Root-Level "/" ignorieren (zu global), Fokus auf spezifische Kapitel + if not ch.section_path or ch.section_path == "/": + continue + + # Nutzt den robusten Parser aus dem Package + edges = parse_edges_robust(ch.text) + if edges: + if ch.section_path not in section_map: + section_map[ch.section_path] = set() + section_map[ch.section_path].update(edges) + + # 2. Injizieren: Kanten in jeden Chunk der Sektion zurückschreiben (Broadcasting) + for ch in chunks: + if ch.section_path in section_map: + edges_to_add = section_map[ch.section_path] + if not edges_to_add: + continue + + injections = [] + for e_str in edges_to_add: + kind, target = e_str.split(':', 1) + # Nur injizieren, wenn die Kante nicht bereits im Text steht + token = f"[[rel:{kind}|{target}]]" + if token not in ch.text: + injections.append(token) + + if injections: + # Physische Anreicherung (Der v3.1.0 Qualitäts-Fix) + # Triple-Newline für saubere Trennung im Embedding-Fenster + block = "\n\n\n" + " ".join(injections) + ch.text += block + + # ENTSCHEIDEND: Auch ins Window schreiben, da Qdrant hier sucht! + if ch.window: + ch.window += block + else: + ch.window = ch.text + return chunks \ No newline at end of file diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index 7684bd5..8945fee 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -1,29 +1,59 @@ """ FILE: app/core/chunking/chunking_strategies.py -DESCRIPTION: Implementierung der mathematischen Splitting-Strategien. +DESCRIPTION: Mathematische Splitting-Strategien. + AUDIT v3.3.2: 100% Konformität zur 'by_heading' Spezifikation. + - Implementiert Hybrid-Safety-Net (Sliding Window für Übergrößen). + - Breadcrumb-Kontext im Window (H1 > H2). + - Sliding Window mit H1-Kontext (Gold-Standard v3.1.0). """ -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk from .chunking_utils import estimate_tokens from .chunking_parser import split_sentences -def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]: - """Fasst Blöcke zusammen und schneidet bei 'target' Tokens.""" - target = config.get("target", 400); max_tokens = config.get("max", 600) +def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> str: + """Baut den Breadcrumb-Kontext für das Embedding-Fenster.""" + parts = [] + if doc_title: parts.append(doc_title) + if sec_title and sec_title != doc_title: parts.append(sec_title) + prefix = " > ".join(parts) + return f"{prefix}\n{text}".strip() if prefix else text + +def strategy_sliding_window(blocks: List[RawBlock], + config: Dict[str, Any], + note_id: str, + context_prefix: str = "") -> List[Chunk]: + """ + Fasst Blöcke zusammen und schneidet bei 'target' Tokens. + Ignoriert H2-Überschriften beim Splitting, um Kontext zu wahren. + """ + target = config.get("target", 400) + max_tokens = config.get("max", 600) overlap_val = config.get("overlap", (50, 80)) overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val - chunks = []; buf = [] + + chunks: List[Chunk] = [] + buf: List[RawBlock] = [] def _add(txt, sec, path): - idx = len(chunks); win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt - chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None)) + idx = len(chunks) + # H1-Kontext Präfix für das Window-Feld + win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt + chunks.append(Chunk( + id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, + text=txt, window=win, token_count=estimate_tokens(txt), + section_title=sec, section_path=path, + neighbors_prev=None, neighbors_next=None + )) def flush(): nonlocal buf if not buf: return text_body = "\n\n".join([b.text for b in buf]) sec_title = buf[-1].section_title; sec_path = buf[-1].section_path - if estimate_tokens(text_body) <= max_tokens: _add(text_body, sec_title, sec_path) + + if estimate_tokens(text_body) <= max_tokens: + _add(text_body, sec_title, sec_path) else: sents = split_sentences(text_body); cur_sents = []; cur_len = 0 for s in sents: @@ -32,33 +62,69 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note _add(" ".join(cur_sents), sec_title, sec_path) ov_s = []; ov_l = 0 for os in reversed(cur_sents): - if ov_l + estimate_tokens(os) < overlap: ov_s.insert(0, os); ov_l += estimate_tokens(os) + if ov_l + estimate_tokens(os) < overlap: + ov_s.insert(0, os); ov_l += estimate_tokens(os) else: break cur_sents = list(ov_s); cur_sents.append(s); cur_len = ov_l + slen - else: cur_sents.append(s); cur_len += slen - if cur_sents: _add(" ".join(cur_sents), sec_title, sec_path) + else: + cur_sents.append(s); cur_len += slen + if cur_sents: + _add(" ".join(cur_sents), sec_title, sec_path) buf = [] for b in blocks: + # H2-Überschriften werden ignoriert, um den Zusammenhang zu wahren if b.kind == "heading": continue - if estimate_tokens("\n\n".join([x.text for x in buf])) + estimate_tokens(b.text) >= target: flush() + if estimate_tokens("\n\n".join([x.text for x in buf])) + estimate_tokens(b.text) >= target: + flush() buf.append(b) - if estimate_tokens(b.text) >= target: flush() flush() return chunks def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]: - """Splittet Text basierend auf Markdown-Überschriften.""" - strict = config.get("strict_heading_split", False); target = config.get("target", 400) - max_tokens = config.get("max", 600); split_level = config.get("split_level", 2) - chunks = []; buf = []; cur_tokens = 0 + """ + Splittet Text basierend auf Markdown-Überschriften mit Hybrid-Safety-Net. + """ + strict = config.get("strict_heading_split", False) + target = config.get("target", 400) + max_tokens = config.get("max", 600) + split_level = config.get("split_level", 2) + overlap = sum(config.get("overlap", (50, 80))) // 2 + + chunks: List[Chunk] = [] + buf: List[str] = [] + cur_tokens = 0 + + def _add_to_chunks(txt, title, path): + idx = len(chunks) + win = _create_context_win(doc_title, title, txt) + chunks.append(Chunk( + id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, + text=txt, window=win, token_count=estimate_tokens(txt), + section_title=title, section_path=path, + neighbors_prev=None, neighbors_next=None + )) def _flush(title, path): nonlocal buf, cur_tokens if not buf: return - txt = "\n\n".join(buf); win = f"# {doc_title}\n## {title}\n{txt}".strip() if title else txt - idx = len(chunks) - chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=title, section_path=path, neighbors_prev=None, neighbors_next=None)) + full_text = "\n\n".join(buf) + if estimate_tokens(full_text) <= max_tokens: + _add_to_chunks(full_text, title, path) + else: + sents = split_sentences(full_text); cur_sents = []; sub_len = 0 + for s in sents: + slen = estimate_tokens(s) + if sub_len + slen > target and cur_sents: + _add_to_chunks(" ".join(cur_sents), title, path) + ov_s = []; ov_l = 0 + for os in reversed(cur_sents): + if ov_l + estimate_tokens(os) < overlap: + ov_s.insert(0, os); ov_l += estimate_tokens(os) + else: break + cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen + else: cur_sents.append(s); sub_len += slen + if cur_sents: _add_to_chunks(" ".join(cur_sents), title, path) buf = []; cur_tokens = 0 for b in blocks: @@ -70,5 +136,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: bt = estimate_tokens(b.text) if cur_tokens + bt > max_tokens and buf: _flush(b.section_title, b.section_path) buf.append(b.text); cur_tokens += bt - if buf: _flush(blocks[-1].section_title if blocks else None, blocks[-1].section_path if blocks else "/") + if buf: + last_b = blocks[-1] if blocks else None + _flush(last_b.section_title if last_b else None, last_b.section_path if last_b else "/") return chunks \ No newline at end of file