mindnet/app/core/chunker.py

133 lines
5.6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any, Set
import re
import math
import yaml
from pathlib import Path
from markdown_it import MarkdownIt
from markdown_it.token import Token
import asyncio
# NEUE IMPORTS
from app.services.semantic_analyzer import get_semantic_analyzer
from app.core.note_payload import extract_frontmatter_from_text
# WICHTIG: Import der Edge Derivations Logik
from app.core.derive_edges import build_edges_for_note # <-- Muss importiert werden
# ... bestehender Code (Konfiguration, Hilfsfunktionen, RawBlock, Chunk)
# --- NEUE STRATEGIE: SMART EDGE ALLOCATION (Ersetzt _strategy_semantic_llm) ---
async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: str, note_type: str) -> List[Chunk]:
"""
[WP-15, Neue Logik] Zerlegt Note deterministisch und nutzt LLM zur Zuweisung von Kanten (Schritte 1-5).
"""
# 0. Initialisierung
analyzer = get_semantic_analyzer()
# 1. [Schritt 2 des Workflows] Sammeln ALLER Kanten (Inline & Defaults)
# Führt die Edge-Derivation für die gesamte Notiz aus, basierend auf Text und Typ.
raw_edges: List[Dict] = build_edges_for_note(
text=md_text,
note_id=note_id,
note_type=note_type,
# Leere Listen übergeben, da wir noch keine Chunks haben und nur die Note selbst analysieren.
chunks=[],
references=[]
)
# Kanten im Format "kind:Target" sammeln (ohne Duplikate)
all_note_edges = set()
for edge in raw_edges:
# Extrahiere nur Kanten, die relevant für das Chunking sind (Explizite oder Defaults)
if edge.get("target_id") and edge.get("kind"):
# Nutze target_id, da dies der Notiz-ID entspricht
all_note_edges.add(f"{edge['kind']}:{edge['target_id']}")
all_note_edges_list = list(all_note_edges)
# 2. [Schritt 3 des Workflows] Deterministic Chunking
# Nutzt die in der Config angegebene deterministische Strategie (z.B. by_heading)
blocks, doc_title = parse_blocks(md_text)
# Nutze _strategy_by_heading (oder _strategy_sliding_window, je nach Config-Intent),
# da dies die robusteste deterministische Strategie ist. Die Konfiguration kommt
# vom "structured_strict" oder ähnlichem Profil.
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
# Fallback, falls by_heading nur einen Chunk liefert oder fehlschlägt
if not chunks or len(chunks) <= 1:
# Erhöht die Robustheit bei unstrukturierten Texten
chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title)
if not chunks:
# Absoluter Fallback: Ganzer Text ist 1 Chunk.
text = " ".join([b.text for b in blocks if b.kind not in ("heading", "code")]).strip()
if text:
chunks = [Chunk(id=f"{note_id}-c0", note_id=note_id, index=0, text=text, token_count=estimate_tokens(text), section_title=doc_title, section_path="", neighbors_prev=None, neighbors_next=None, char_start=0, char_end=len(text))]
else:
return []
# 3. [Schritt 4 des Workflows] Kanten pro Chunk zuweisen/filtern
unassigned_edges: Set[str] = set(all_note_edges_list)
llm_tasks = []
for chunk in chunks:
# Starte den LLM-Filter-Call für jeden Chunk parallel
task = analyzer.filter_edges_for_chunk(chunk.text, all_note_edges_list, note_type)
llm_tasks.append(task)
# Warte auf alle LLM-Antworten (Batch-Processing)
filtered_edges_results: List[List[str]] = await asyncio.gather(*llm_tasks)
# 4. Ergebnisse zuweisen und Unassigned Edges sammeln
for i, filtered_edges_list in enumerate(filtered_edges_results):
chunk = chunks[i]
# Lege die vom LLM gefilterten Edges in den Chunk-Payload
# Die Chunk-Klasse muss ein Feld 'suggested_edges' haben (wie im alten SemanticChunkResult)
chunk.suggested_edges = filtered_edges_list
# Unassigned Edges: Subtrahiere alle Edges, die in diesem Chunk gefunden wurden
unassigned_edges.difference_update(set(filtered_edges_list))
# 5. [Schritt 5 des Workflows] Fallback: Nicht zugeordnete Kanten zuweisen
# Alle Kanten, die in KEINEM Chunk explizit zugewiesen wurden, werden JEDEM Chunk zugewiesen.
unassigned_edges_list = list(unassigned_edges)
if unassigned_edges_list:
logger.info(f"Adding {len(unassigned_edges_list)} unassigned edges as fallback to all chunks for note {note_id}")
for chunk in chunks:
# Füge die unassigned Edges hinzu (Set-Operation für Duplikat-Schutz)
existing_edges = set(chunk.suggested_edges)
chunk.suggested_edges = list(existing_edges.union(unassigned_edges_list))
# 6. Return Chunks
return chunks
# --- UPDATE DISPATCHER: chunk_note_async ---
async def chunk_note_async(md_text: str, note_id: str, note_type: str, note_status: str, path_arg: str = None) -> List[Chunk]:
# ... bestehender Code (Frontmatter, Config, etc.)
# 3. Execution (Dispatcher)
# Update: Rufe die NEUE Strategie auf, wenn 'semantic_llm' konfiguriert ist.
if strategy == "semantic_llm":
chunks = await _strategy_smart_edge_allocation(md_text, config, note_id, note_type)
elif strategy == "by_heading":
blocks, doc_title = parse_blocks(md_text)
# ... bestehender Code
else: # sliding_window (Default)
blocks, doc_title = parse_blocks(md_text)
# ... bestehender Code
# ... bestehender Code (Post-Processing)