mindnet/app/core/chunking/chunking_processor.py

94 lines
3.8 KiB
Python

"""
FILE: app/core/chunking/chunking_processor.py
DESCRIPTION: Der zentrale Orchestrator für das Chunking-System.
AUDIT v3.3.3: Wiederherstellung der "Gold-Standard" Qualität.
- Integriert physikalische Kanten-Injektion (Propagierung).
- Stellt H1-Kontext-Fenster sicher.
- Baut den Candidate-Pool für die WP-15b Ingestion auf.
"""
import asyncio
import re
import logging
from typing import List, Dict, Optional
from .chunking_models import Chunk
from .chunking_utils import get_chunk_config, extract_frontmatter_from_text
from .chunking_parser import parse_blocks, parse_edges_robust
from .chunking_strategies import strategy_sliding_window, strategy_by_heading
from .chunking_propagation import propagate_section_edges
logger = logging.getLogger(__name__)
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
"""
Hauptfunktion zur Zerlegung einer Note.
Verbindet Strategien mit physikalischer Kontext-Anreicherung.
"""
# 1. Konfiguration & Parsing
if config is None:
config = get_chunk_config(note_type)
fm, body_text = extract_frontmatter_from_text(md_text)
blocks, doc_title = parse_blocks(md_text)
# Vorbereitung des H1-Präfix für die Embedding-Fenster
h1_prefix = f"# {doc_title}" if doc_title else ""
# 2. Anwendung der Splitting-Strategie
# Wir übergeben den Dokument-Titel/Präfix für die Window-Bildung.
if config.get("strategy") == "by_heading":
chunks = await asyncio.to_thread(strategy_by_heading, blocks, config, note_id, doc_title)
else:
# sliding_window nutzt nun den context_prefix für das Window-Feld.
chunks = await asyncio.to_thread(strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix)
if not chunks:
return []
# 3. Physikalische Kontext-Anreicherung (Der Qualitäts-Fix)
# Schreibt Kanten aus Callouts/Inlines hart in den Text für Qdrant.
chunks = propagate_section_edges(chunks)
# 4. WP-15b: Candidate Pool Aufbau (Metadaten für IngestionService)
# Zuerst die explizit im Text vorhandenen Kanten sammeln.
for ch in chunks:
# Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text.
for e_str in parse_edges_robust(ch.text):
parts = e_str.split(':', 1)
if len(parts) == 2:
k, t = parts
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "explicit"})
# 5. Global Pool (Unzugeordnete Kanten aus dem Dokument-Ende)
# Sucht nach dem Edge-Pool Block im Original-Markdown.
pool_match = re.search(
r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)',
body_text,
re.DOTALL | re.IGNORECASE
)
if pool_match:
global_edges = parse_edges_robust(pool_match.group(1))
for e_str in global_edges:
parts = e_str.split(':', 1)
if len(parts) == 2:
k, t = parts
# Diese Kanten werden als "Global Pool" markiert für die spätere KI-Prüfung.
for ch in chunks:
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"})
# 6. De-Duplikation des Pools & Linking
for ch in chunks:
seen = set()
unique = []
for c in ch.candidate_pool:
key = (c["kind"], c["to"], c["provenance"])
if key not in seen:
seen.add(key)
unique.append(c)
ch.candidate_pool = unique
# Verknüpfung der Nachbarschaften für Graph-Traversierung
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks