WP15 #9

Merged
Lars merged 54 commits from WP15 into main 2025-12-13 06:39:48 +01:00
2 changed files with 452 additions and 132 deletions
Showing only changes of commit 135c02bc9a - Show all commits

View File

@ -8,126 +8,393 @@ from pathlib import Path
from markdown_it import MarkdownIt from markdown_it import MarkdownIt
from markdown_it.token import Token from markdown_it.token import Token
import asyncio import asyncio
import logging
# NEUE IMPORTS # NEUE IMPORTS
from app.services.semantic_analyzer import get_semantic_analyzer try:
from app.core.note_payload import extract_frontmatter_from_text from app.services.semantic_analyzer import SemanticAnalyzer, SemanticChunkResult
# WICHTIG: Import der Edge Derivations Logik except ImportError:
from app.core.derive_edges import build_edges_for_note # <-- Muss importiert werden # Fallback für Tests, wenn der Service noch nicht auf dem Pfad ist
print("WARNUNG: SemanticAnalyzer Service nicht gefunden.")
class SemanticAnalyzer:
async def analyze_and_chunk(self, text, type): return [SemanticChunkResult(content=text, suggested_edges=[])]
@dataclass
class SemanticChunkResult:
content: str
suggested_edges: List[str] # Format: "kind:Target"
# ... bestehender Code (Konfiguration, Hilfsfunktionen, RawBlock, Chunk) # Import des Edge Parsers
try:
# --- NEUE STRATEGIE: SMART EDGE ALLOCATION (Ersetzt _strategy_semantic_llm) --- from app.core.derive_edges import build_edges_for_note
async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: str, note_type: str) -> List[Chunk]: except ImportError:
""" print("WARNUNG: derive_edges.py nicht gefunden. Kanten-Parsing simuliert.")
[WP-15, Neue Logik] Zerlegt Note deterministisch und nutzt LLM zur Zuweisung von Kanten (Schritte 1-5). def build_edges_for_note(md_text, note_id, note_type, chunks=[], note_level_references=[], include_note_scope_refs=False):
"""
# 0. Initialisierung
analyzer = get_semantic_analyzer()
# 1. [Schritt 2 des Workflows] Sammeln ALLER Kanten (Inline & Defaults)
# Führt die Edge-Derivation für die gesamte Notiz aus, basierend auf Text und Typ.
raw_edges: List[Dict] = build_edges_for_note(
text=md_text,
note_id=note_id,
note_type=note_type,
# Leere Listen übergeben, da wir noch keine Chunks haben und nur die Note selbst analysieren.
chunks=[],
references=[]
)
# Kanten im Format "kind:Target" sammeln (ohne Duplikate)
all_note_edges = set()
for edge in raw_edges:
# Extrahiere nur Kanten, die relevant für das Chunking sind (Explizite oder Defaults)
if edge.get("target_id") and edge.get("kind"):
# Nutze target_id, da dies der Notiz-ID entspricht
all_note_edges.add(f"{edge['kind']}:{edge['target_id']}")
all_note_edges_list = list(all_note_edges)
# 2. [Schritt 3 des Workflows] Deterministic Chunking
# Nutzt die in der Config angegebene deterministische Strategie (z.B. by_heading)
blocks, doc_title = parse_blocks(md_text)
# Nutze _strategy_by_heading (oder _strategy_sliding_window, je nach Config-Intent),
# da dies die robusteste deterministische Strategie ist. Die Konfiguration kommt
# vom "structured_strict" oder ähnlichem Profil.
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
# Fallback, falls by_heading nur einen Chunk liefert oder fehlschlägt
if not chunks or len(chunks) <= 1:
# Erhöht die Robustheit bei unstrukturierten Texten
chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title)
if not chunks:
# Absoluter Fallback: Ganzer Text ist 1 Chunk.
text = " ".join([b.text for b in blocks if b.kind not in ("heading", "code")]).strip()
if text:
chunks = [Chunk(id=f"{note_id}-c0", note_id=note_id, index=0, text=text, token_count=estimate_tokens(text), section_title=doc_title, section_path="", neighbors_prev=None, neighbors_next=None, char_start=0, char_end=len(text))]
else:
return [] return []
logger = logging.getLogger(__name__)
# 3. [Schritt 4 des Workflows] Kanten pro Chunk zuweisen/filtern # ==========================================
# 1. FUNKTION ZUM AUSLESEN DES FRONTMATTERS
# ==========================================
def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]:
fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL)
if not fm_match:
return {}, md_text
frontmatter_yaml = fm_match.group(1)
try:
frontmatter = yaml.safe_load(frontmatter_yaml)
if not isinstance(frontmatter, dict):
frontmatter = {}
except yaml.YAMLError:
frontmatter = {}
text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL)
return frontmatter, text_without_fm.strip()
# ==========================================
# 2. CONFIGURATION LOADER
# ==========================================
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "types.yaml"
DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)}
_CONFIG_CACHE = None
def _load_yaml_config() -> Dict[str, Any]:
global _CONFIG_CACHE
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if _CONFIG_CACHE is not None:
return _CONFIG_CACHE
if not CONFIG_PATH.exists():
return {}
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
_CONFIG_CACHE = data
return data
except Exception as e:
return {}
def get_chunk_config(note_type: str) -> Dict[str, Any]:
full_config = _load_yaml_config()
profiles = full_config.get("chunking_profiles", {})
type_def = full_config.get("types", {}).get(note_type.lower(), {})
profile_name = type_def.get("chunking_profile")
if not profile_name:
profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
config = profiles.get(profile_name, DEFAULT_PROFILE).copy()
if "overlap" in config and isinstance(config["overlap"], list):
config["overlap"] = tuple(config["overlap"])
return config
def get_sizes(note_type: str):
cfg = get_chunk_config(note_type)
return {"target": (cfg["target"], cfg["target"]), "max": cfg["max"], "overlap": cfg["overlap"]}
# ==========================================
# 3. DATA CLASSES & HELPERS
# ==========================================
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
_WS = re.compile(r'\s+')
def estimate_tokens(text: str) -> int:
t = len(text.strip())
return max(1, math.ceil(t / 4))
def split_sentences(text: str) -> list[str]:
text = _WS.sub(' ', text.strip())
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if not text:
return []
parts = _SENT_SPLIT.split(text)
return [p.strip() for p in parts if p.strip()]
@dataclass
class RawBlock:
kind: str; text: str; level: Optional[int]; section_path: str; section_title: Optional[str]
@dataclass
class Chunk:
id: str; note_id: str; index: int; text: str; window: str; token_count: int; section_title: Optional[str]; section_path: str; neighbors_prev: Optional[str]; neighbors_next: Optional[str]; char_start: int; char_end: int
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
md = MarkdownIt("commonmark").enable("table")
tokens: List[Token] = md.parse(md_text)
blocks: List[RawBlock] = []
h1_title = "Dokument"; h2, h3 = None, None; section_path = "/"
fm, text_without_fm = extract_frontmatter_from_text(md_text)
if text_without_fm.strip():
blocks.append(RawBlock(kind="paragraph", text=text_without_fm.strip(), level=None, section_path=section_path, section_title=h2))
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match:
h1_title = h1_match.group(1).strip()
return blocks, h1_title
# ==========================================
# 4. STRATEGIES (SYNCHRON)
# ==========================================
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]:
"""Klassisches Sliding Window."""
target = config.get("target", 400)
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks: List[Chunk] = []; buf: List[RawBlock] = []
def flush_buffer():
nonlocal buf
if not buf: return
text_body = "\n\n".join([b.text for b in buf])
sec_title = buf[-1].section_title if buf else None
sec_path = buf[-1].section_path if buf else "/"
window_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
if estimate_tokens(text_body) > max_tokens:
sentences = split_sentences(text_body)
current_sents = []
cur_toks = 0
for s in sentences:
st = estimate_tokens(s)
if cur_toks + st > target and current_sents:
txt = "\n".join(current_sents)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
_add_chunk(txt, win, sec_title, sec_path)
ov_txt = " ".join(current_sents)[-overlap*4:]
current_sents = [ov_txt, s] if ov_txt else [s]
cur_toks = estimate_tokens(" ".join(current_sents))
else:
current_sents.append(s)
cur_toks += st
if current_sents:
txt = "\n".join(current_sents)
win = f"{context_prefix}\n{txt}".strip() if context_prefix else txt
_add_chunk(txt, win, sec_title, sec_path)
else:
_add_chunk(text_body, window_body, sec_title, sec_path)
buf = []
def _add_chunk(txt, win, sec, path):
chunks.append(Chunk(id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=txt, window=win, token_count=estimate_tokens(txt), section_title=sec, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0))
for b in blocks:
if estimate_tokens("\n\n".join([x.text for x in buf] + [b.text])) >= target:
flush_buffer()
buf.append(b)
flush_buffer()
return chunks
def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "") -> List[Chunk]:
"""Harter Split an Überschriften mit Context Injection."""
chunks: List[Chunk] = []
sections: Dict[str, List[RawBlock]] = {}
ordered = []
for b in blocks:
if b.kind == "heading": continue
if b.section_path not in sections:
sections[b.section_path] = []
ordered.append(b.section_path)
sections[b.section_path].append(b)
for path in ordered:
s_blocks = sections[path]
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if not s_blocks:
continue
breadcrumbs = path.strip("/").replace("/", " > ")
context_header = f"# {doc_title}\n## {breadcrumbs}"
full_text = "\n\n".join([b.text for b in s_blocks])
if estimate_tokens(full_text) <= config.get("max", 600):
chunks.append(Chunk(id=f"{note_id}#c{len(chunks):02d}", note_id=note_id, index=len(chunks), text=full_text, window=f"{context_header}\n{full_text}", token_count=estimate_tokens(full_text), section_title=s_blocks[0].section_title if s_blocks else None, section_path=path, neighbors_prev=None, neighbors_next=None, char_start=0, char_end=0))
else:
# Fallback auf Sliding Window mit Context Injection
sub = _strategy_sliding_window(s_blocks, config, note_id, doc_title, context_prefix=context_header)
base = len(chunks)
for i, sc in enumerate(sub):
sc.index = base + i
sc.id = f"{note_id}#c{sc.index:02d}"
chunks.append(sc)
return chunks
# ==========================================
# 5. ORCHESTRATION STRATEGY (ASYNC)
# ==========================================
_semantic_analyzer_instance = None
def _get_semantic_analyzer_instance() -> SemanticAnalyzer:
global _semantic_analyzer_instance
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if _semantic_analyzer_instance is None:
_semantic_analyzer_instance = SemanticAnalyzer()
return _semantic_analyzer_instance
# NEU: Abstrakte Funktion zum Extrahieren der Kanten (ersetzt die Simulation)
def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> List[str]:
"""
Ruft die Edge-Derivation auf Note-Ebene auf und gibt die Kanten im Format "kind:Target" zurück.
"""
# Korrigierte Argumentreihenfolge
raw_edges: List[Dict] = build_edges_for_note(
md_text,
note_id,
note_type,
chunks=[],
note_level_references=[],
include_note_scope_refs=False
)
# Filtert die Kanten auf das Format "kind:Target"
all_note_edges = set()
for edge in raw_edges:
if edge.get("target_id") and edge.get("kind") not in ["belongs_to", "next", "prev"]:
all_note_edges.add(f"{edge['kind']}:{edge['target_id']}")
return list(all_note_edges)
async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: str, note_type: str) -> List[Chunk]:
"""
Führt den 5-Schritte-Workflow zur intelligenten Kantenzuweisung aus.
"""
analyzer = _get_semantic_analyzer_instance()
# 1. [Schritt 2] Kanten sammeln (vom gesamten MD-Text)
all_note_edges_list = _extract_all_edges_from_md(md_text, note_id, note_type)
# 2. [Schritt 3] Deterministic Chunking (Primärzerlegung)
primary_strategy = config.get("strategy", "sliding_window")
blocks, doc_title = parse_blocks(md_text)
if primary_strategy == "by_heading":
chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
else:
chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title)
# 3. [Schritt 4] Kanten pro Chunk zuweisen/filtern (LLM-Call pro Chunk)
unassigned_edges: Set[str] = set(all_note_edges_list) unassigned_edges: Set[str] = set(all_note_edges_list)
llm_tasks = [] llm_tasks = []
if all_note_edges_list:
for chunk in chunks: for chunk in chunks:
# Starte den LLM-Filter-Call für jeden Chunk parallel # Starte den LLM-Filter-Call für jeden Chunk parallel
task = analyzer.filter_edges_for_chunk(chunk.text, all_note_edges_list, note_type) task = analyzer.analyze_and_chunk(
chunk_text=chunk.text,
all_note_edges=all_note_edges_list,
note_type=note_type,
)
llm_tasks.append(task) llm_tasks.append(task)
# Warte auf alle LLM-Antworten (Batch-Processing)
filtered_edges_results: List[List[str]] = await asyncio.gather(*llm_tasks) filtered_edges_results: List[List[str]] = await asyncio.gather(*llm_tasks)
# 4. Ergebnisse zuweisen und Unassigned Edges sammeln
for i, filtered_edges_list in enumerate(filtered_edges_results): for i, filtered_edges_list in enumerate(filtered_edges_results):
chunk = chunks[i] chunk = chunks[i]
# Lege die vom LLM gefilterten Edges in den Chunk-Payload # 4. Ergebnisse zuweisen und Unassigned Edges sammeln
# Die Chunk-Klasse muss ein Feld 'suggested_edges' haben (wie im alten SemanticChunkResult)
chunk.suggested_edges = filtered_edges_list chunk.suggested_edges = filtered_edges_list
# Unassigned Edges: Subtrahiere alle Edges, die in diesem Chunk gefunden wurden
unassigned_edges.difference_update(set(filtered_edges_list)) unassigned_edges.difference_update(set(filtered_edges_list))
# 5. Kanten in den Text injizieren (für derive_edges.py)
injection_block = "\n"
for edge_str in chunk.suggested_edges:
if ":" in edge_str:
kind, target = edge_str.split(":", 1)
injection_block += f"[[rel:{kind} | {target}]] "
# 5. [Schritt 5 des Workflows] Fallback: Nicht zugeordnete Kanten zuweisen chunk.text = chunk.text + injection_block
# Alle Kanten, die in KEINEM Chunk explizit zugewiesen wurden, werden JEDEM Chunk zugewiesen. chunk.window = chunk.window + injection_block
# 6. Fallback: Nicht zugeordnete Kanten JEDEM Chunk zuweisen (Schritt 5)
unassigned_edges_list = list(unassigned_edges) unassigned_edges_list = list(unassigned_edges)
if unassigned_edges_list: if unassigned_edges_list:
logger.info(f"Adding {len(unassigned_edges_list)} unassigned edges as fallback to all chunks for note {note_id}") logger.info(f"Adding {len(unassigned_edges_list)} unassigned edges as fallback to all chunks for note {note_id}")
for chunk in chunks: for chunk in chunks:
# Füge die unassigned Edges hinzu (Set-Operation für Duplikat-Schutz) # Füge die Kanten in den Text des Chunks ein (für den Edge-Parser)
existing_edges = set(chunk.suggested_edges) injection_block = "\n"
chunk.suggested_edges = list(existing_edges.union(unassigned_edges_list)) for edge_str in unassigned_edges_list:
if ":" in edge_str:
kind, target = edge_str.split(":", 1)
injection_block += f"[[rel:{kind} | {target}]] "
chunk.text = chunk.text + injection_block
chunk.window = chunk.window + injection_block
# 6. Return Chunks
return chunks return chunks
# --- UPDATE DISPATCHER: chunk_note_async --- # ==========================================
async def chunk_note_async(md_text: str, note_id: str, note_type: str, note_status: str, path_arg: str = None) -> List[Chunk]: # 6. MAIN ENTRY POINT (ASYNC)
# ==========================================
# ... bestehender Code (Frontmatter, Config, etc.) async def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
"""
Hauptfunktion. Analysiert Config und wählt Strategie (MUSS ASYNC SEIN).
"""
# 3. Execution (Dispatcher) # 1. Frontmatter prüfen (Double-LLM-Prevention)
fm, body = extract_frontmatter_from_text(md_text)
note_status = fm.get("status", "").lower()
# Update: Rufe die NEUE Strategie auf, wenn 'semantic_llm' konfiguriert ist. config = get_chunk_config(note_type)
if strategy == "semantic_llm": strategy = config.get("strategy", "sliding_window")
# Neue Konfigurationsprüfung
enable_smart_edge = config.get("enable_smart_edge_allocation", False)
# 2. Strategie-Auswahl
# A. Override bei Draft-Status
if enable_smart_edge and note_status in ["draft", "initial_gen"]:
logger.info(f"Overriding Smart Edge Allocation for draft status. Using 'by_heading' for deterministic chunking.")
enable_smart_edge = False
strategy = "by_heading"
# B. Execution (Dispatcher)
blocks, doc_title = parse_blocks(md_text)
if enable_smart_edge:
# Führt die neue Orchestrierung aus (Smart Edge Allocation)
chunks = await _strategy_smart_edge_allocation(md_text, config, note_id, note_type) chunks = await _strategy_smart_edge_allocation(md_text, config, note_id, note_type)
elif strategy == "by_heading": elif strategy == "by_heading":
blocks, doc_title = parse_blocks(md_text) # Synchronen Code in einem Thread ausführen
# ... bestehender Code chunks = await asyncio.to_thread(_strategy_by_heading, blocks, config, note_id, doc_title)
else: # sliding_window (Default) else: # sliding_window (Default)
blocks, doc_title = parse_blocks(md_text) # Synchronen Code in einem Thread ausführen
# ... bestehender Code chunks = await asyncio.to_thread(_strategy_sliding_window, blocks, config, note_id, doc_title)
# ... bestehender Code (Post-Processing) # 4. Post-Process: Neighbors setzen
for i, ch in enumerate(chunks):
ch.neighbors_prev = chunks[i-1].id if i > 0 else None
ch.neighbors_next = chunks[i+1].id if i < len(chunks)-1 else None
return chunks

View File

@ -1,87 +1,140 @@
""" """
app/services/semantic_analyzer.py Edge Validation & Filtering app/services/semantic_analyzer.py Edge Validation & Filtering
Der Service ist nun primär dafür zuständig, Kanten aus einer Liste dem gegebenen Chunk zuzuordnen. Version: Final (Entkoppelt von internen Typ-Simulationen)
""" """
import json import json
import logging import logging
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from dataclasses import dataclass
# Import der benötigten Services (Annahme: llm_service ist verfügbar.) # Import der benötigten Services (Annahme: llm_service und discovery sind verfügbar.)
from app.services.llm_service import LLMService from app.services.llm_service import LLMService
# ANNAHME: DiscoveryService ist für die Matrix-Logik verfügbar.
from app.services.discovery import DiscoveryService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Ein Singleton-Muster für den Analyzer (wie zuvor) @dataclass
_analyzer_instance: Optional['SemanticAnalyzer'] = None class SemanticChunkResult:
content: str
suggested_edges: List[str] # Format: "kind:Target"
def get_semantic_analyzer(): # Die Klasse muss den TargetTypeResolver als DI-Abhängigkeit erhalten, um flexibel zu sein.
global _analyzer_instance # Da dies aber im Mindnet-System noch nicht etabliert ist, muss der Aufrufer den Resolver bereitstellen.
if _analyzer_instance is None:
_analyzer_instance = SemanticAnalyzer()
return _analyzer_instance
class SemanticAnalyzer: class SemanticAnalyzer:
def __init__(self): def __init__(self):
# Der DiscoveryService wird hier nicht mehr direkt benötigt.
self.llm = LLMService() self.llm = LLMService()
self.discovery = DiscoveryService()
self.MAX_CONTEXT_TOKENS = 3000
async def filter_edges_for_chunk(self, chunk_text: str, all_note_edges: List[str], note_type: str) -> List[str]: async def analyze_and_chunk(
self,
text: str,
source_type: str,
# NEU: Erfordert die Auflösungsfunktion als Eingabe (DI-Prinzip)
target_type_resolver: Optional[callable] = None
) -> List[SemanticChunkResult]:
""" """
[Schritt 4 des Workflows] Sendet Chunk und alle Kanten an LLM, um die relevanten Kanten für diesen Chunk zu filtern. Zerlegt Text mittels LLM in semantische Abschnitte und extrahiert Kanten.
:param chunk_text: Der Text des Chunks zur Analyse.
:param all_note_edges: Alle für die gesamte Notiz gefundenen Kanten (Format: "kind:Target").
:param note_type: Der Typ der Notiz.
:return: Liste der relevanten Kanten für diesen Chunk.
""" """
if not all_note_edges:
return []
edge_list_str = "\n".join([f"- {e}" for e in all_note_edges]) # Standard-Resolver verwenden, wenn keiner übergeben wird
if target_type_resolver is None:
target_type_resolver = self._default_target_type_resolver
system_prompt = ( system_prompt = (
"Du bist ein Edge Filter Agent. Deine Aufgabe ist es, aus einer gegebenen Liste von potentiellen " "Du bist ein Knowledge Graph Experte. Deine Aufgabe ist es, Rohtext in "
"Knowledge Graph Kanten (Edges) jene auszuwählen, die *semantisch relevant* für den vorliegenden " "thematisch geschlossene Abschnitte (Chunks) zu zerlegen.\n"
"Textausschnitt sind. Alle Kanten beziehen sich auf die Hauptnotiz.\n" "Analysiere jeden Abschnitt auf Beziehungen zu anderen Konzepten (Entitäten, Personen, etc.).\n"
"Antworte AUSSCHLIESSLICH mit einer validen JSON-Liste von Kanten-Strings, die im Text direkt erwähnt oder " "Antworte AUSSCHLIESSLICH mit validem JSON in diesem Format:\n"
"klar impliziert werden. Es ist KEIN Array von Objekten, sondern ein Array von Strings.\n" "[\n"
"Format: [\"kind:Target\", \"kind:Target\", ...]\n" " {\n"
"Wähle nur Kanten, die der Chunk *aktiv* benötigt oder referenziert." " \"content\": \"Der Text des Abschnitts...\",\n"
" \"relations\": [{\"target\": \"Entität X\", \"type\": \"related_to\"}]\n"
" }\n"
"]\n"
"Halte die Chunks mittellang (ca. 100-300 Wörter). Verändere den Inhalt nicht, nur die Struktur."
) )
user_prompt = ( user_prompt = f"Dokument-Typ: {source_type}\n\nTEXT:\n{text}"
f"Notiz-Typ: {note_type}\n"
f"Textausschnitt:\n---\n{chunk_text}\n---\n\n"
f"Gesamte Kanten der Notiz (AUSWAHL):\n{edge_list_str}\n\n"
"Welche der oben genannten Kanten sind für diesen Textabschnitt relevant? Liste sie im JSON-Array auf."
)
try: try:
# 1. LLM Call
response_json = await self.llm.generate_raw_response( response_json = await self.llm.generate_raw_response(
user_prompt, user_prompt,
system=system_prompt, system=system_prompt,
force_json=True force_json=True
) )
# 2. Robustes JSON Parsing
clean_json = response_json.replace("```json", "").replace("```", "").strip() clean_json = response_json.replace("```json", "").replace("```", "").strip()
data = json.loads(clean_json) data = json.loads(clean_json)
if isinstance(data, list): if isinstance(data, dict):
# Filtere nach Strings, die den Doppelpunkt enthalten, um das Format "kind:Target" zu garantieren. data = [data]
return [s for s in data if isinstance(s, str) and ":" in s] elif not isinstance(data, list):
logger.error("SemanticAnalyzer: JSON root ist weder Array noch Objekt. Fehlerhafte LLM-Antwort.")
raise ValueError("Root element is not a list or dictionary.")
logger.warning(f"SemanticAnalyzer: LLM lieferte non-list beim Edge-Filtern: {data}") results = []
return [] for item in data:
if not isinstance(item, dict):
logger.warning(f"SemanticAnalyzer: Ungültiges Chunk-Element ignoriert: {item}")
continue
except json.JSONDecodeError as e: content = item.get("content", "").strip()
logger.error(f"SemanticAnalyzer: LLM lieferte KEIN valides JSON beim Edge-Filtern: {e}") if not content: continue
return []
raw_rels = item.get("relations", [])
refined_edges = []
for rel in raw_rels:
if not isinstance(rel, dict):
logger.warning(f"SemanticAnalyzer: Ignoriere ungültige Relation: {rel}")
continue
target = rel.get("target")
raw_type = rel.get("type", "related_to")
if target:
# 1. Typ-Auflösung über die injizierte Funktion
target_entity_type = target_type_resolver(target) # <--- NUTZT DEN INJIZIERTEN RESOLVER
# 2. Matrix-Logik anwenden:
final_kind = self.discovery._resolve_edge_type(source_type, target_entity_type)
# 3. Priorisierung: Wählt den Matrix-Vorschlag, wenn er spezifischer ist.
if final_kind not in ["related_to", "references"] and target_entity_type != "concept":
edge_str = f"{final_kind}:{target}"
else:
edge_str = f"{raw_type}:{target}"
refined_edges.append(edge_str)
results.append(SemanticChunkResult(content=content, suggested_edges=refined_edges))
return results
except json.JSONDecodeError:
logger.error("SemanticAnalyzer: LLM lieferte KEIN valides JSON. Fallback auf Raw Text.")
return [SemanticChunkResult(content=text, suggested_edges=[])]
except Exception as e: except Exception as e:
logger.error(f"SemanticAnalyzer Unbehandelter Fehler beim Edge-Filtern: {e}") logger.error(f"SemanticAnalyzer Unbehandelter Fehler: {e}")
return [] return [SemanticChunkResult(content=text, suggested_edges=[])]
# NEU: Abstrakter Fallback-Resolver (muss außerhalb des Kernmoduls verbleiben)
def _default_target_type_resolver(self, title: str) -> str:
"""Standard-Fallback, wenn kein Resolver übergeben wird (immer 'concept')."""
return "concept"
async def close(self): async def close(self):
# Stellt sicher, dass der AsyncClient geschlossen wird (gute Praxis)
if self.llm: if self.llm:
await self.llm.close() await self.llm.close()
# Export des Singleton-Helpers
_analyzer_instance = None
def get_semantic_analyzer():
global _analyzer_instance
if _analyzer_instance is None:
_analyzer_instance = SemanticAnalyzer()
return _analyzer_instance