chunker mit LLM initial
This commit is contained in:
parent
714763f92f
commit
13ab2b7d68
|
|
@ -8,15 +8,17 @@ from pathlib import Path
|
|||
from markdown_it import MarkdownIt
|
||||
from markdown_it.token import Token
|
||||
|
||||
# NEUE IMPORTS
|
||||
# Import des Semantic Analyzer Services
|
||||
from app.services.semantic_analyzer import get_semantic_analyzer
|
||||
import asyncio # Für den asynchronen Aufruf des Chunkers
|
||||
|
||||
# ==========================================
|
||||
# 1. CONFIGURATION LOADER (Updated for config/ dir)
|
||||
# 1. CONFIGURATION LOADER (Ehemals chunk_config.py)
|
||||
# ==========================================
|
||||
|
||||
# Pfad-Logik:
|
||||
# Wir gehen 3 Ebenen hoch: app/core/chunker.py -> app/core -> app -> root
|
||||
# Pfad-Logik: app/core/chunker.py -> app/core -> app -> root/config/types.yaml
|
||||
BASE_DIR = Path(__file__).resolve().parent.parent.parent
|
||||
|
||||
# KORREKTUR: types.yaml liegt im Unterordner "config"
|
||||
CONFIG_PATH = BASE_DIR / "config" / "types.yaml"
|
||||
|
||||
# Fallback Values
|
||||
|
|
@ -38,7 +40,6 @@ def _load_yaml_config() -> Dict[str, Any]:
|
|||
if not CONFIG_PATH.exists():
|
||||
# Debugging-Hilfe: Zeigt an, wo gesucht wurde
|
||||
print(f"WARNUNG: types.yaml nicht gefunden unter: {CONFIG_PATH}")
|
||||
print(f" (Basis-Verzeichnis war: {BASE_DIR})")
|
||||
return {}
|
||||
|
||||
try:
|
||||
|
|
@ -51,32 +52,24 @@ def _load_yaml_config() -> Dict[str, Any]:
|
|||
return {}
|
||||
|
||||
def get_chunk_config(note_type: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Löst Typ -> Profil -> Konfiguration auf.
|
||||
"""
|
||||
"""Löst Typ -> Profil -> Konfiguration auf."""
|
||||
full_config = _load_yaml_config()
|
||||
|
||||
# 1. Profile holen
|
||||
profiles = full_config.get("chunking_profiles", {})
|
||||
|
||||
# 2. Typ-Definition holen
|
||||
type_def = full_config.get("types", {}).get(note_type.lower(), {})
|
||||
|
||||
# 3. Profil-Namen ermitteln (Fallback auf defaults)
|
||||
profile_name = type_def.get("chunking_profile")
|
||||
|
||||
if not profile_name:
|
||||
profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
|
||||
|
||||
# 4. Config bauen
|
||||
config = profiles.get(profile_name, DEFAULT_PROFILE).copy()
|
||||
|
||||
# Sicherstellen, dass Overlap ein Tuple ist
|
||||
if "overlap" in config and isinstance(config["overlap"], list):
|
||||
config["overlap"] = tuple(config["overlap"])
|
||||
|
||||
return config
|
||||
|
||||
# Legacy Support für alten Code
|
||||
# Legacy Support
|
||||
def get_sizes(note_type: str):
|
||||
cfg = get_chunk_config(note_type)
|
||||
return {
|
||||
|
|
@ -86,7 +79,7 @@ def get_sizes(note_type: str):
|
|||
}
|
||||
|
||||
# ==========================================
|
||||
# 2. CHUNKING LOGIC & PARSER
|
||||
# 2. DATA CLASSES & HELPERS
|
||||
# ==========================================
|
||||
|
||||
# --- Hilfen ---
|
||||
|
|
@ -94,7 +87,6 @@ _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
|
|||
_WS = re.compile(r'\s+')
|
||||
|
||||
def estimate_tokens(text: str) -> int:
|
||||
# 1 Token ≈ 4 chars
|
||||
t = len(text.strip())
|
||||
return max(1, math.ceil(t / 4))
|
||||
|
||||
|
|
@ -117,8 +109,8 @@ class Chunk:
|
|||
id: str
|
||||
note_id: str
|
||||
index: int
|
||||
text: str # Reintext für Anzeige
|
||||
window: str # Text + Context für Embeddings
|
||||
text: str # Reintext für Anzeige (JETZT INKL. INJIZIERTER LINKS)
|
||||
window: str # Text + Context für Embeddings (WIE 'text' BEI LLM-CHUNK)
|
||||
token_count: int
|
||||
section_title: Optional[str]
|
||||
section_path: str
|
||||
|
|
@ -193,7 +185,9 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
|
|||
i += 1
|
||||
return blocks, h1_title
|
||||
|
||||
# --- Strategien ---
|
||||
# ==========================================
|
||||
# 3. STRATEGIES (SYNCHRON)
|
||||
# ==========================================
|
||||
|
||||
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, context_prefix: str = "") -> List[Chunk]:
|
||||
target = config.get("target", 400)
|
||||
|
|
@ -266,6 +260,8 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id
|
|||
|
||||
for path in ordered:
|
||||
s_blocks = sections[path]
|
||||
if not s_blocks: continue
|
||||
|
||||
breadcrumbs = path.strip("/").replace("/", " > ")
|
||||
context_header = f"# {doc_title}\n## {breadcrumbs}"
|
||||
full_text = "\n\n".join([b.text for b in s_blocks])
|
||||
|
|
@ -279,6 +275,7 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id
|
|||
neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0
|
||||
))
|
||||
else:
|
||||
# Fallback auf Sliding Window mit Context Injection
|
||||
sub = _strategy_sliding_window(s_blocks, config, note_id, context_prefix=context_header)
|
||||
base = len(chunks)
|
||||
for i, sc in enumerate(sub):
|
||||
|
|
@ -287,19 +284,79 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id
|
|||
chunks.append(sc)
|
||||
return chunks
|
||||
|
||||
# --- Main Entry Point ---
|
||||
# ==========================================
|
||||
# 4. STRATEGY (ASYNCHRON)
|
||||
# ==========================================
|
||||
|
||||
def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
|
||||
async def _strategy_semantic_llm(md_text: str, config: Dict[str, Any], note_id: str, note_type: str) -> List[Chunk]:
|
||||
"""
|
||||
NEUE STRATEGIE: Delegiert die Zerlegung und Kanten-Extraktion an ein LLM.
|
||||
"""
|
||||
analyzer = get_semantic_analyzer()
|
||||
|
||||
# Text-Splitting wird hier vom LLM übernommen
|
||||
semantic_chunks = await analyzer.analyze_and_chunk(md_text, note_type)
|
||||
|
||||
chunks: List[Chunk] = []
|
||||
|
||||
for i, sc in enumerate(semantic_chunks):
|
||||
# 1. Edge Injection für derive_edges.py
|
||||
# Wir formatieren die LLM-generierten Kanten in die Inline-Syntax,
|
||||
# damit die bestehende derive_edges.py (Regex) sie findet.
|
||||
|
||||
injection_block = "\n"
|
||||
for edge_str in sc.suggested_edges:
|
||||
kind, target = edge_str.split(":", 1)
|
||||
# Nutzt die Syntax: [[rel:kind | Target]]
|
||||
injection_block += f"[[rel:{kind} | {target}]] "
|
||||
|
||||
full_text = sc.content + injection_block
|
||||
|
||||
# 2. Chunk Objekt bauen
|
||||
chunks.append(Chunk(
|
||||
id=f"{note_id}#sem{i:02d}",
|
||||
note_id=note_id,
|
||||
index=i,
|
||||
text=full_text.strip(), # Enthält die Links (für derive_edges)
|
||||
window=full_text.strip(), # Auch das Embedding "sieht" die Links (gut für Retrieval)
|
||||
token_count=estimate_tokens(full_text),
|
||||
section_title="Semantic Section",
|
||||
section_path="/LLM",
|
||||
neighbors_prev=None, neighbors_next=None,
|
||||
char_start=0, char_end=0
|
||||
))
|
||||
|
||||
return chunks
|
||||
|
||||
# ==========================================
|
||||
# 5. MAIN ENTRY POINT (ASYNC)
|
||||
# ==========================================
|
||||
|
||||
async def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
|
||||
"""
|
||||
Hauptfunktion. Analysiert Config und wählt Strategie. MUSS ASYNC SEIN.
|
||||
"""
|
||||
config = get_chunk_config(note_type)
|
||||
strategy = config.get("strategy", "sliding_window")
|
||||
blocks, doc_title = parse_blocks(md_text)
|
||||
|
||||
if strategy == "by_heading":
|
||||
chunks = _strategy_by_heading(blocks, config, note_id, doc_title)
|
||||
else:
|
||||
chunks = _strategy_sliding_window(blocks, config, note_id)
|
||||
# Die beiden bestehenden Strategien rufen wir über einen Sync-Wrapper auf,
|
||||
# damit assemble_chunks ASYNC bleiben kann.
|
||||
if strategy == "semantic_llm":
|
||||
chunks = await _strategy_semantic_llm(md_text, config, note_id, note_type)
|
||||
|
||||
elif strategy == "by_heading":
|
||||
blocks, doc_title = parse_blocks(md_text)
|
||||
# Blockiert nur kurz für die sync-Rechenarbeit
|
||||
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
|
||||
|
||||
else: # sliding_window (Default)
|
||||
blocks, doc_title = parse_blocks(md_text)
|
||||
# Blockiert nur kurz für die sync-Rechenarbeit
|
||||
chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id)
|
||||
|
||||
# Post-Process: Neighbors setzen
|
||||
for i, ch in enumerate(chunks):
|
||||
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
|
||||
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
|
||||
|
||||
return chunks
|
||||
98
app/services/semantic_analyzer.py
Normal file
98
app/services/semantic_analyzer.py
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
"""
|
||||
app/services/semantic_analyzer.py
|
||||
Kapselt die LLM-Strategie für Chunking und Kanten-Extraktion.
|
||||
Nutzt die Matrix-Logik aus DiscoveryService für konsistente Kanten-Typen.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import List, Dict, Any, Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
from app.services.llm_service import LLMService
|
||||
from app.services.discovery import DiscoveryService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class SemanticChunkResult:
|
||||
content: str
|
||||
suggested_edges: List[str] # Format: "kind:Target"
|
||||
|
||||
class SemanticAnalyzer:
|
||||
def __init__(self):
|
||||
self.llm = LLMService()
|
||||
self.discovery = DiscoveryService() # Wiederverwendung der Matrix-Logik
|
||||
|
||||
async def analyze_and_chunk(self, text: str, source_type: str) -> List[SemanticChunkResult]:
|
||||
"""
|
||||
Zerlegt Text mittels LLM in semantische Abschnitte und extrahiert Kanten.
|
||||
"""
|
||||
# 1. Prompt bauen
|
||||
system_prompt = (
|
||||
"Du bist ein Knowledge Graph Experte. Deine Aufgabe ist es, Rohtext in "
|
||||
"thematisch geschlossene Abschnitte (Chunks) zu zerlegen.\n"
|
||||
"Analysiere jeden Abschnitt auf Beziehungen zu anderen Konzepten.\n"
|
||||
"Antworte AUSSCHLIESSLICH mit validem JSON in diesem Format:\n"
|
||||
"[\n"
|
||||
" {\n"
|
||||
" \"content\": \"Der Text des Abschnitts...\",\n"
|
||||
" \"relations\": [{\"target\": \"Qdrant\", \"type\": \"depends_on\"}]\n"
|
||||
" }\n"
|
||||
"]\n"
|
||||
"Halte die Chunks mittellang (ca. 100-300 Wörter). Verändere den Inhalt nicht, nur die Struktur."
|
||||
)
|
||||
|
||||
user_prompt = f"Dokument-Typ: {source_type}\n\nTEXT:\n{text}"
|
||||
|
||||
try:
|
||||
# 2. LLM Call
|
||||
response_json = await self.llm.generate_raw_response(user_prompt, system=system_prompt)
|
||||
|
||||
# 3. JSON Parsing & Validierung
|
||||
# Markdown Code-Block entfernen falls vorhanden
|
||||
clean_json = response_json.replace("```json", "").replace("```", "").strip()
|
||||
data = json.loads(clean_json)
|
||||
|
||||
results = []
|
||||
for item in data:
|
||||
content = item.get("content", "").strip()
|
||||
if not content: continue
|
||||
|
||||
raw_rels = item.get("relations", [])
|
||||
refined_edges = []
|
||||
|
||||
for rel in raw_rels:
|
||||
target = rel.get("target")
|
||||
raw_type = rel.get("type", "related_to")
|
||||
|
||||
if target:
|
||||
# 4. Matrix-Logik anwenden (Active Intelligence)
|
||||
# Wir versuchen, den Typ des Ziels zu erraten oder nutzen Matrix blind
|
||||
# Hier vereinfacht: Wir nutzen Discovery Logic um den Edge-Typ zu validieren
|
||||
# (Wir nehmen an, Target Type ist unbekannt -> 'concept')
|
||||
final_kind = self.discovery._resolve_edge_type(source_type, "concept")
|
||||
|
||||
# Wenn LLM spezifischer war (z.B. 'blocks'), nehmen wir das LLM,
|
||||
# sonst den Matrix-Vorschlag
|
||||
if raw_type in ["related_to", "link"] and final_kind != "related_to":
|
||||
edge_str = f"{final_kind}:{target}"
|
||||
else:
|
||||
edge_str = f"{raw_type}:{target}"
|
||||
|
||||
refined_edges.append(edge_str)
|
||||
|
||||
results.append(SemanticChunkResult(content=content, suggested_edges=refined_edges))
|
||||
|
||||
return results
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("SemanticAnalyzer: LLM lieferte kein valides JSON. Fallback auf Raw Text.")
|
||||
return [SemanticChunkResult(content=text, suggested_edges=[])]
|
||||
except Exception as e:
|
||||
logger.error(f"SemanticAnalyzer Error: {e}")
|
||||
return [SemanticChunkResult(content=text, suggested_edges=[])]
|
||||
|
||||
async def close(self):
|
||||
await self.llm.close()
|
||||
|
|
@ -29,7 +29,14 @@ chunking_profiles:
|
|||
max: 600 # Fallback Limit
|
||||
target: 400 # Fallback Target bei Sub-Chunking
|
||||
overlap: [50, 80] # Overlap bei Sub-Chunking
|
||||
|
||||
|
||||
# NEU: LLM-basierte semantische Zerlegung (Chunker.py ruft semantic_analyzer.py)
|
||||
semantic_llm:
|
||||
strategy: semantic_llm
|
||||
# Da das LLM die Längensteuerung übernimmt, dienen diese als Fallback/Empfehlung
|
||||
target: 400
|
||||
max: 800
|
||||
|
||||
defaults:
|
||||
retriever_weight: 1.0
|
||||
chunking_profile: sliding_standard # Fallback Profil
|
||||
|
|
@ -54,7 +61,7 @@ types:
|
|||
|
||||
# --- IDENTITÄT & PERSÖNLICHKEIT ---
|
||||
profile:
|
||||
chunking_profile: structured_strict # H2 Split wichtig für Profile
|
||||
chunking_profile: structured_strict
|
||||
retriever_weight: 0.70
|
||||
edge_defaults: ["references", "related_to"]
|
||||
|
||||
|
|
@ -85,7 +92,7 @@ types:
|
|||
edge_defaults: ["depends_on", "related_to"]
|
||||
|
||||
decision:
|
||||
chunking_profile: structured_strict # ADRs sind oft strukturiert
|
||||
chunking_profile: structured_strict
|
||||
retriever_weight: 1.00
|
||||
edge_defaults: ["caused_by", "references"]
|
||||
|
||||
|
|
@ -101,7 +108,7 @@ types:
|
|||
|
||||
# --- OPERATIV ---
|
||||
project:
|
||||
chunking_profile: sliding_large # Projekte haben viel Text
|
||||
chunking_profile: sliding_large
|
||||
retriever_weight: 0.97
|
||||
edge_defaults: ["references", "depends_on"]
|
||||
|
||||
|
|
@ -111,6 +118,7 @@ types:
|
|||
edge_defaults: ["depends_on", "part_of"]
|
||||
|
||||
journal:
|
||||
chunking_profile: sliding_standard
|
||||
# NEUE ZUWEISUNG: Journale profitieren am meisten von der semantischen Analyse
|
||||
chunking_profile: semantic_llm
|
||||
retriever_weight: 0.80
|
||||
edge_defaults: ["references", "related_to"]
|
||||
Loading…
Reference in New Issue
Block a user