mindnet/app/core/chunking/chunking_processor.py

121 lines
5.4 KiB
Python

"""
FILE: app/core/chunking/chunking_processor.py
DESCRIPTION: Der zentrale Orchestrator für das Chunking-System.
AUDIT v3.3.4: Wiederherstellung der "Gold-Standard" Qualität.
- Fix: Synchronisierung der Parameter (context_prefix) für alle Strategien.
- Integriert physikalische Kanten-Injektion (Propagierung).
- Stellt H1-Kontext-Fenster sicher.
- Baut den Candidate-Pool für die WP-15b Ingestion auf.
WP-24c v4.2.0: Konfigurierbare Header-Namen für LLM-Validierung.
"""
import asyncio
import re
import os
import logging
from typing import List, Dict, Optional
from .chunking_models import Chunk
from .chunking_utils import get_chunk_config, extract_frontmatter_from_text
from .chunking_parser import parse_blocks, parse_edges_robust
from .chunking_strategies import strategy_sliding_window, strategy_by_heading
from .chunking_propagation import propagate_section_edges
logger = logging.getLogger(__name__)
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
"""
Hauptfunktion zur Zerlegung einer Note.
Verbindet Strategien mit physikalischer Kontext-Anreicherung.
"""
# 1. Konfiguration & Parsing
if config is None:
config = get_chunk_config(note_type)
fm, body_text = extract_frontmatter_from_text(md_text)
blocks, doc_title = parse_blocks(md_text)
# WP-24c v4.2.0: Filtere Blöcke aus Edge-Zonen (LLM-Validierung & Note-Scope)
# Diese Bereiche sollen nicht als Chunks angelegt werden, sondern nur die Kanten extrahiert werden
blocks_for_chunking = [b for b in blocks if not getattr(b, 'exclude_from_chunking', False)]
# Vorbereitung des H1-Präfix für die Embedding-Fenster (Breadcrumbs)
h1_prefix = f"# {doc_title}" if doc_title else ""
# 2. Anwendung der Splitting-Strategie
# Alle Strategien nutzen nun einheitlich context_prefix für die Window-Bildung.
if config.get("strategy") == "by_heading":
chunks = await asyncio.to_thread(
strategy_by_heading, blocks_for_chunking, config, note_id, context_prefix=h1_prefix
)
else:
chunks = await asyncio.to_thread(
strategy_sliding_window, blocks_for_chunking, config, note_id, context_prefix=h1_prefix
)
if not chunks:
return []
# 3. Physikalische Kontext-Anreicherung (Der Qualitäts-Fix)
# Schreibt Kanten aus Callouts/Inlines hart in den Text für Qdrant.
chunks = propagate_section_edges(chunks)
# 4. WP-15b: Candidate Pool Aufbau (Metadaten für IngestionService)
# Zuerst die explizit im Text vorhandenen Kanten sammeln.
for ch in chunks:
# Wir extrahieren aus dem bereits (durch Propagation) angereicherten Text.
# ch.candidate_pool wird im Modell-Konstruktor als leere Liste initialisiert.
for e_str in parse_edges_robust(ch.text):
parts = e_str.split(':', 1)
if len(parts) == 2:
k, t = parts
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "explicit"})
# 5. Global Pool (Unzugeordnete Kanten - kann mitten im Dokument oder am Ende stehen)
# WP-24c v4.2.0: Konfigurierbare Header-Namen und -Ebene via .env
# Sucht nach ALLEN Edge-Pool Blöcken im Original-Markdown (nicht nur am Ende).
llm_validation_headers = os.getenv(
"MINDNET_LLM_VALIDATION_HEADERS",
"Unzugeordnete Kanten,Edge Pool,Candidates"
)
header_list = [h.strip() for h in llm_validation_headers.split(",") if h.strip()]
# Fallback auf Defaults, falls leer
if not header_list:
header_list = ["Unzugeordnete Kanten", "Edge Pool", "Candidates"]
# Header-Ebene konfigurierbar (Default: 3 für ###)
llm_validation_level = int(os.getenv("MINDNET_LLM_VALIDATION_HEADER_LEVEL", "3"))
header_level_pattern = "#" * llm_validation_level
# Regex-Pattern mit konfigurierbaren Headern und Ebene
# WP-24c v4.2.0: finditer statt search, um ALLE Zonen zu finden (auch mitten im Dokument)
# Zone endet bei einem neuen Header (jeder Ebene) oder am Dokument-Ende
header_pattern = "|".join(re.escape(h) for h in header_list)
zone_pattern = rf'^{re.escape(header_level_pattern)}\s*(?:{header_pattern})\s*\n(.*?)(?=\n#|$)'
for pool_match in re.finditer(zone_pattern, body_text, re.DOTALL | re.IGNORECASE | re.MULTILINE):
global_edges = parse_edges_robust(pool_match.group(1))
for e_str in global_edges:
parts = e_str.split(':', 1)
if len(parts) == 2:
k, t = parts
# Diese Kanten werden als "global_pool" markiert für die spätere KI-Prüfung.
for ch in chunks:
ch.candidate_pool.append({"kind": k, "to": t, "provenance": "global_pool"})
# 6. De-Duplikation des Pools & Linking
for ch in chunks:
seen = set()
unique = []
for c in ch.candidate_pool:
# Eindeutigkeit über Typ, Ziel und Herkunft (Provenance)
key = (c["kind"], c["to"], c["provenance"])
if key not in seen:
seen.add(key)
unique.append(c)
ch.candidate_pool = unique
# Verknüpfung der Nachbarschaften für Graph-Traversierung
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks