WP15b #15

Merged
Lars merged 23 commits from WP15b into main 2025-12-27 22:15:27 +01:00
3 changed files with 200 additions and 57 deletions
Showing only changes of commit 386fa3ef0c - Show all commits

View File

@ -1,9 +1,14 @@
"""
FILE: app/core/chunking/chunking_processor.py
DESCRIPTION: Hauptlogik für das Zerlegen von Markdown in Chunks.
DESCRIPTION: Der zentrale Orchestrator für das Chunking-System.
AUDIT v3.3.3: Wiederherstellung der "Gold-Standard" Qualität.
- Integriert physikalische Kanten-Injektion (Propagierung).
- Stellt H1-Kontext-Fenster sicher.
- Baut den Candidate-Pool für die WP-15b Ingestion auf.
"""
import asyncio
import re
import logging
from typing import List, Dict, Optional
from .chunking_models import Chunk
from .chunking_utils import get_chunk_config, extract_frontmatter_from_text
@ -11,43 +16,79 @@ from .chunking_parser import parse_blocks, parse_edges_robust
from .chunking_strategies import strategy_sliding_window, strategy_by_heading
from .chunking_propagation import propagate_section_edges
logger = logging.getLogger(__name__)
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
"""Orchestriert das Chunking und baut den Candidate-Pool auf."""
if config is None: config = get_chunk_config(note_type)
"""
Hauptfunktion zur Zerlegung einer Note.
Verbindet Strategien mit physikalischer Kontext-Anreicherung.
"""
# 1. Konfiguration & Parsing
if config is None:
config = get_chunk_config(note_type)
fm, body_text = extract_frontmatter_from_text(md_text)
blocks, doc_title = parse_blocks(md_text)
# Vorbereitung des H1-Präfix für die Embedding-Fenster
h1_prefix = f"# {doc_title}" if doc_title else ""
# 2. Anwendung der Splitting-Strategie
# Wir übergeben den Dokument-Titel/Präfix für die Window-Bildung.
if config.get("strategy") == "by_heading":
chunks = await asyncio.to_thread(strategy_by_heading, blocks, config, note_id, doc_title)
else:
chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id)
# sliding_window nutzt nun den context_prefix für das Window-Feld.
chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix)
if not chunks: return []
if not chunks:
return []
# WP-15b: Candidate Pool Aufbau
chunks = propagate_section_edges(chunks, blocks)
# 3. Physikalische Kontext-Anreicherung (Der Qualitäts-Fix)
# Schreibt Kanten aus Callouts/Inlines hart in den Text für Qdrant.
chunks = propagate_section_edges(chunks)
# 4. WP-15b: Candidate Pool Aufbau (Metadaten für IngestionService)
# Zuerst die explizit im Text vorhandenen Kanten sammeln.
for ch in chunks:
# Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text.
for e_str in parse_edges_robust(ch.text):
k, t = e_str.split(':', 1)
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "explicit"})
parts = e_str.split(':', 1)
if len(parts) == 2:
k, t = parts
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "explicit"})
# Global Pool (Unzugeordnete Kanten)
pool_match = re.search(r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)', body_text, re.DOTALL | re.IGNORECASE)
# 5. Global Pool (Unzugeordnete Kanten aus dem Dokument-Ende)
# Sucht nach dem Edge-Pool Block im Original-Markdown.
pool_match = re.search(
r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)',
body_text,
re.DOTALL | re.IGNORECASE
)
if pool_match:
for e_str in parse_edges_robust(pool_match.group(1)):
k, t = e_str.split(':', 1)
for ch in chunks: ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"})
global_edges = parse_edges_robust(pool_match.group(1))
for e_str in global_edges:
parts = e_str.split(':', 1)
if len(parts) == 2:
k, t = parts
# Diese Kanten werden als "Global Pool" markiert für die spätere KI-Prüfung.
for ch in chunks:
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"})
# De-Duplikation
# 6. De-Duplikation des Pools & Linking
for ch in chunks:
seen = set(); unique = []
seen = set()
unique = []
for c in ch.candidate_pool:
if (c["kind"], c["to"]) not in seen:
seen.add((c["kind"], c["to"])); unique.append(c)
key = (c["kind"], c["to"], c["provenance"])
if key not in seen:
seen.add(key)
unique.append(c)
ch.candidate_pool = unique
# Nachbarschaften
# Verknüpfung der Nachbarschaften für Graph-Traversierung
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks

View File

@ -1,25 +1,59 @@
"""
FILE: app/core/chunking/chunking_propagation.py
DESCRIPTION: Vererbung von Kanten (Inheritance) über Sektions-Pfade.
DESCRIPTION: Injiziert Sektions-Kanten physisch in den Text (Embedding-Enrichment).
Stellt die "Gold-Standard"-Qualität von v3.1.0 wieder her.
VERSION: 3.3.1
STATUS: Active
"""
from typing import List, Dict, Set
from .chunking_models import Chunk, RawBlock
from .chunking_models import Chunk
from .chunking_parser import parse_edges_robust
def propagate_section_edges(chunks: List[Chunk], blocks: List[RawBlock]) -> List[Chunk]:
"""WP-15b: Kanten aus Headings werden an Sub-Chunks vererbt."""
section_inheritance: Dict[str, Set[str]] = {}
for b in blocks:
if b.kind == "heading":
edges = parse_edges_robust(b.text)
if edges:
if b.section_path not in section_inheritance:
section_inheritance[b.section_path] = set()
section_inheritance[b.section_path].update(edges)
def propagate_section_edges(chunks: List[Chunk]) -> List[Chunk]:
"""
Sammelt Kanten pro Sektion und schreibt sie hart in den Text und das Window.
Dies ist essenziell für die Vektorisierung der Beziehungen.
"""
# 1. Sammeln: Alle expliziten Kanten pro Sektions-Pfad aggregieren
section_map: Dict[str, Set[str]] = {} # path -> set(kind:target)
for ch in chunks:
inherited = section_inheritance.get(ch.section_path, set())
for e_str in inherited:
kind, target = e_str.split(':', 1)
ch.candidate_pool.append({"kind": kind, "to": target, "provenance": "inherited"})
# Root-Level "/" ignorieren (zu global), Fokus auf spezifische Kapitel
if not ch.section_path or ch.section_path == "/":
continue
# Nutzt den robusten Parser aus dem Package
edges = parse_edges_robust(ch.text)
if edges:
if ch.section_path not in section_map:
section_map[ch.section_path] = set()
section_map[ch.section_path].update(edges)
# 2. Injizieren: Kanten in jeden Chunk der Sektion zurückschreiben (Broadcasting)
for ch in chunks:
if ch.section_path in section_map:
edges_to_add = section_map[ch.section_path]
if not edges_to_add:
continue
injections = []
for e_str in edges_to_add:
kind, target = e_str.split(':', 1)
# Nur injizieren, wenn die Kante nicht bereits im Text steht
token = f"[[rel:{kind}|{target}]]"
if token not in ch.text:
injections.append(token)
if injections:
# Physische Anreicherung (Der v3.1.0 Qualitäts-Fix)
# Triple-Newline für saubere Trennung im Embedding-Fenster
block = "\n\n\n" + " ".join(injections)
ch.text += block
# ENTSCHEIDEND: Auch ins Window schreiben, da Qdrant hier sucht!
if ch.window:
ch.window += block
else:
ch.window = ch.text
return chunks

View File

@ -1,29 +1,59 @@
"""
FILE: app/core/chunking/chunking_strategies.py
DESCRIPTION: Implementierung der mathematischen Splitting-Strategien.
DESCRIPTION: Mathematische Splitting-Strategien.
AUDIT v3.3.2: 100% Konformität zur 'by_heading' Spezifikation.
- Implementiert Hybrid-Safety-Net (Sliding Window für Übergrößen).
- Breadcrumb-Kontext im Window (H1 > H2).
- Sliding Window mit H1-Kontext (Gold-Standard v3.1.0).
"""
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
from .chunking_models import RawBlock, Chunk
from .chunking_utils import estimate_tokens
from .chunking_parser import split_sentences
def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
"""Fasst Blöcke zusammen und schneidet bei 'target' Tokens."""
target = config.get("target", 400); max_tokens = config.get("max", 600)
def _create_context_win(doc_title: str, sec_title: Optional[str], text: str) -> str:
"""Baut den Breadcrumb-Kontext für das Embedding-Fenster."""
parts = []
if doc_title: parts.append(doc_title)
if sec_title and sec_title != doc_title: parts.append(sec_title)
prefix = " > ".join(parts)
return f"{prefix}\n{text}".strip() if prefix else text
def strategy_sliding_window(blocks: List[RawBlock],
config: Dict[str, Any],
note_id: str,
context_prefix: str = "") -> List[Chunk]:
"""
Fasst Blöcke zusammen und schneidet bei 'target' Tokens.
Ignoriert H2-Überschriften beim Splitting, um Kontext zu wahren.
"""
target = config.get("target", 400)
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks = []; buf = []
chunks: List[Chunk] = []
buf: List[RawBlock] = []
def _add(txt, sec, path):
idx = len(chunks); win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None))
idx = len(chunks)
# H1-Kontext Präfix für das Window-Feld
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
section_title=sec, section_path=path,
neighbors_prev=None, neighbors_next=None
))
def flush():
nonlocal buf
if not buf: return
text_body = "\n\n".join([b.text for b in buf])
sec_title = buf[-1].section_title; sec_path = buf[-1].section_path
if estimate_tokens(text_body) <= max_tokens: _add(text_body, sec_title, sec_path)
if estimate_tokens(text_body) <= max_tokens:
_add(text_body, sec_title, sec_path)
else:
sents = split_sentences(text_body); cur_sents = []; cur_len = 0
for s in sents:
@ -32,33 +62,69 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note
_add(" ".join(cur_sents), sec_title, sec_path)
ov_s = []; ov_l = 0
for os in reversed(cur_sents):
if ov_l + estimate_tokens(os) < overlap: ov_s.insert(0, os); ov_l += estimate_tokens(os)
if ov_l + estimate_tokens(os) < overlap:
ov_s.insert(0, os); ov_l += estimate_tokens(os)
else: break
cur_sents = list(ov_s); cur_sents.append(s); cur_len = ov_l + slen
else: cur_sents.append(s); cur_len += slen
if cur_sents: _add(" ".join(cur_sents), sec_title, sec_path)
else:
cur_sents.append(s); cur_len += slen
if cur_sents:
_add(" ".join(cur_sents), sec_title, sec_path)
buf = []
for b in blocks:
# H2-Überschriften werden ignoriert, um den Zusammenhang zu wahren
if b.kind == "heading": continue
if estimate_tokens("\n\n".join([x.text for x in buf])) + estimate_tokens(b.text) >= target: flush()
if estimate_tokens("\n\n".join([x.text for x in buf])) + estimate_tokens(b.text) >= target:
flush()
buf.append(b)
if estimate_tokens(b.text) >= target: flush()
flush()
return chunks
def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
"""Splittet Text basierend auf Markdown-Überschriften."""
strict = config.get("strict_heading_split", False); target = config.get("target", 400)
max_tokens = config.get("max", 600); split_level = config.get("split_level", 2)
chunks = []; buf = []; cur_tokens = 0
"""
Splittet Text basierend auf Markdown-Überschriften mit Hybrid-Safety-Net.
"""
strict = config.get("strict_heading_split", False)
target = config.get("target", 400)
max_tokens = config.get("max", 600)
split_level = config.get("split_level", 2)
overlap = sum(config.get("overlap", (50, 80))) // 2
chunks: List[Chunk] = []
buf: List[str] = []
cur_tokens = 0
def _add_to_chunks(txt, title, path):
idx = len(chunks)
win = _create_context_win(doc_title, title, txt)
chunks.append(Chunk(
id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx,
text=txt, window=win, token_count=estimate_tokens(txt),
section_title=title, section_path=path,
neighbors_prev=None, neighbors_next=None
))
def _flush(title, path):
nonlocal buf, cur_tokens
if not buf: return
txt = "\n\n".join(buf); win = f"# {doc_title}\n## {title}\n{txt}".strip() if title else txt
idx = len(chunks)
chunks.append(Chunk(id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, text=txt, window=win, token_count=estimate_tokens(txt), section_title=title, section_path=path, neighbors_prev=None, neighbors_next=None))
full_text = "\n\n".join(buf)
if estimate_tokens(full_text) <= max_tokens:
_add_to_chunks(full_text, title, path)
else:
sents = split_sentences(full_text); cur_sents = []; sub_len = 0
for s in sents:
slen = estimate_tokens(s)
if sub_len + slen > target and cur_sents:
_add_to_chunks(" ".join(cur_sents), title, path)
ov_s = []; ov_l = 0
for os in reversed(cur_sents):
if ov_l + estimate_tokens(os) < overlap:
ov_s.insert(0, os); ov_l += estimate_tokens(os)
else: break
cur_sents = list(ov_s); cur_sents.append(s); sub_len = ov_l + slen
else: cur_sents.append(s); sub_len += slen
if cur_sents: _add_to_chunks(" ".join(cur_sents), title, path)
buf = []; cur_tokens = 0
for b in blocks:
@ -70,5 +136,7 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id:
bt = estimate_tokens(b.text)
if cur_tokens + bt > max_tokens and buf: _flush(b.section_title, b.section_path)
buf.append(b.text); cur_tokens += bt
if buf: _flush(blocks[-1].section_title if blocks else None, blocks[-1].section_path if blocks else "/")
if buf:
last_b = blocks[-1] if blocks else None
_flush(last_b.section_title if last_b else None, last_b.section_path if last_b else "/")
return chunks