""" FILE: app/core/chunking/chunking_processor.py DESCRIPTION: Der zentrale Orchestrator für das Chunking-System. AUDIT v3.3.4: Wiederherstellung der "Gold-Standard" Qualität. - Fix: Synchronisierung der Parameter (context_prefix) für alle Strategien. - Integriert physikalische Kanten-Injektion (Propagierung). - Stellt H1-Kontext-Fenster sicher. - Baut den Candidate-Pool für die WP-15b Ingestion auf. """ import asyncio import re import logging from typing import List, Dict, Optional from .chunking_models import Chunk from .chunking_utils import get_chunk_config, extract_frontmatter_from_text from .chunking_parser import parse_blocks, parse_edges_robust from .chunking_strategies import strategy_sliding_window, strategy_by_heading from .chunking_propagation import propagate_section_edges logger = logging.getLogger(__name__) async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: """ Hauptfunktion zur Zerlegung einer Note. Verbindet Strategien mit physikalischer Kontext-Anreicherung. """ # 1. Konfiguration & Parsing if config is None: config = get_chunk_config(note_type) fm, body_text = extract_frontmatter_from_text(md_text) blocks, doc_title = parse_blocks(md_text) # Vorbereitung des H1-Präfix für die Embedding-Fenster (Breadcrumbs) h1_prefix = f"# {doc_title}" if doc_title else "" # 2. Anwendung der Splitting-Strategie # Alle Strategien nutzen nun einheitlich context_prefix für die Window-Bildung. if config.get("strategy") == "by_heading": chunks = await asyncio.to_thread( strategy_by_heading, blocks, config, note_id, context_prefix=h1_prefix ) else: chunks = await asyncio.to_thread( strategy_sliding_window, blocks, config, note_id, context_prefix=h1_prefix ) if not chunks: return [] # 3. Physikalische Kontext-Anreicherung (Der Qualitäts-Fix) # Schreibt Kanten aus Callouts/Inlines hart in den Text für Qdrant. chunks = propagate_section_edges(chunks) # 4. WP-15b: Candidate Pool Aufbau (Metadaten für IngestionService) # Zuerst die explizit im Text vorhandenen Kanten sammeln. for ch in chunks: # Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text. # ch.candidate_pool wird im Modell-Konstruktor als leere Liste initialisiert. for e_str in parse_edges_robust(ch.text): parts = e_str.split(':', 1) if len(parts) == 2: k, t = parts ch.candidate_pool.append({"kind": k, "to": t, "provenance": "explicit"}) # 5. Global Pool (Unzugeordnete Kanten aus dem Dokument-Ende) # Sucht nach dem Edge-Pool Block im Original-Markdown. pool_match = re.search( r'###?\s*(?:Unzugeordnete Kanten|Edge Pool|Candidates)\s*\n(.*?)(?:\n#|$)', body_text, re.DOTALL | re.IGNORECASE ) if pool_match: global_edges = parse_edges_robust(pool_match.group(1)) for e_str in global_edges: parts = e_str.split(':', 1) if len(parts) == 2: k, t = parts # Diese Kanten werden als "global_pool" markiert für die spätere KI-Prüfung. for ch in chunks: ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"}) # 6. De-Duplikation des Pools & Linking for ch in chunks: seen = set() unique = [] for c in ch.candidate_pool: # Eindeutigkeit über Typ, Ziel und Herkunft (Provenance) key = (c["kind"], c["to"], c["provenance"]) if key not in seen: seen.add(key) unique.append(c) ch.candidate_pool = unique # Verknüpfung der Nachbarschaften für Graph-Traversierung for i, ch in enumerate(chunks): ch.neighbors_prev = chunks[i-1].id if i > 0 else None ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None return chunks