This commit is contained in:
Lars 2025-12-12 11:56:44 +01:00
parent 3c19e192bc
commit e27b1f4621
2 changed files with 30 additions and 55 deletions

View File

@ -17,8 +17,8 @@ from app.services.semantic_analyzer import get_semantic_analyzer
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
# Mock für Tests
def build_edges_for_note(md_text, note_id, note_type, chunks=[], references=[]): return []
# Mock für Tests: Signatur muss mit dem Aufruf übereinstimmen
def build_edges_for_note(text, note_id, note_type, chunks=[], references=[]): return []
logger = logging.getLogger(__name__)
@ -70,8 +70,7 @@ def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]:
# 2. DATA CLASSES
# ==========================================
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
_WS = re.compile(r'\s+')
_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+')
def estimate_tokens(text: str) -> int:
return max(1, math.ceil(len(text.strip()) / 4))
@ -98,10 +97,7 @@ class Chunk:
# ==========================================
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
"""
Zerlegt Text in logische Blöcke (Absätze, Header).
Verbesserte Version: Splittet auch reine Absätze.
"""
"""Zerlegt Text in logische Blöcke (Absätze, Header)."""
blocks = []
h1_title = "Dokument"
section_path = "/"
@ -114,19 +110,14 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
if h1_match:
h1_title = h1_match.group(1).strip()
# Rudimentäres Parsing (Markdown-It ist komplex einzubinden ohne vollen Visitor)
# Wir splitten hier einfach an Doppel-Newlines für Paragraphen, wenn keine Header da sind.
# Zuerst Header-Struktur bewahren
lines = text_without_fm.split('\n')
buffer = []
for line in lines:
stripped = line.strip()
if stripped.startswith('# '): # H1 ignorieren wir im Body meist
if stripped.startswith('# '):
continue
elif stripped.startswith('## '):
# Flush buffer
if buffer:
content = "\n".join(buffer).strip()
if content:
@ -136,7 +127,6 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
section_path = f"/{current_h2}"
blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2))
elif not stripped:
# Leere Zeile -> Absatzende
if buffer:
content = "\n".join(buffer).strip()
if content:
@ -157,9 +147,7 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks = []
buf = [] # Buffer für Blöcke
chunks = []; buf = []
def _create_chunk(txt, win, sec, path):
idx = len(chunks)
@ -174,17 +162,12 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not
nonlocal buf
if not buf: return
# Kombiniere Blöcke im Buffer
text_body = "\n\n".join([b.text for b in buf])
sec_title = buf[-1].section_title if buf else None
sec_path = buf[-1].section_path if buf else "/"
# Check Größe
if estimate_tokens(text_body) <= max_tokens:
win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body
_create_chunk(text_body, win_body, sec_title, sec_path)
if estimate_tokens(text_body) <= max_tokens:
_create_chunk(text_body, win_body, buf[-1].section_title, buf[-1].section_path)
else:
# Text ist zu groß -> Splitte nach Sätzen
sentences = split_sentences(text_body)
current_chunk_sents = []
current_len = 0
@ -192,13 +175,10 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not
for sent in sentences:
sent_len = estimate_tokens(sent)
if current_len + sent_len > target and current_chunk_sents:
# Chunk abschließen
c_txt = " ".join(current_chunk_sents)
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
_create_chunk(c_txt, c_win, sec_title, sec_path)
_create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path)
# Overlap für nächsten Chunk
# Wir nehmen die letzten Sätze, die in den Overlap passen
overlap_sents = []
ov_len = 0
for s in reversed(current_chunk_sents):
@ -215,25 +195,19 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not
current_chunk_sents.append(sent)
current_len += sent_len
# Rest verarbeiten
if current_chunk_sents:
c_txt = " ".join(current_chunk_sents)
c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt
_create_chunk(c_txt, c_win, sec_title, sec_path)
_create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path)
buf = []
for b in blocks:
if b.kind == "heading": continue # Header nicht direkt in Text mischen, dienen nur Struktur
# Wenn Buffer + neuer Block zu groß -> Flush
if b.kind == "heading": continue
current_buf_text = "\n\n".join([x.text for x in buf])
if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target:
flush_buffer()
buf.append(b)
# Wenn der Block selbst riesig ist (größer als Target), sofort flushen und splitten
if estimate_tokens(b.text) >= target:
flush_buffer()
@ -244,7 +218,7 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id
return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}")
# ==========================================
# 4. ORCHESTRATION (ASYNC) - WP-15 CORE
# 4. ORCHESTRATION (ASYNC)
# ==========================================
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
@ -283,17 +257,17 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op
async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]:
analyzer = get_semantic_analyzer()
# FIX: Positional Argument für text übergeben, um TypeError zu vermeiden
# FIX: Nutzung von positional arguments für die ersten 3 Parameter
# Dies verhindert den "multiple values for argument" Fehler
raw_edges = build_edges_for_note(
full_text,
note_id=note_id,
note_type=note_type,
note_id,
note_type,
chunks=[],
references=[] # Falls die Signatur references erwartet
references=[]
)
all_candidates = set()
# Robustheit: raw_edges könnte None sein, falls der Mock schlecht ist
if raw_edges:
for e in raw_edges:
if e.get("target_id") and e.get("kind") not in ["next", "prev", "belongs_to"]:

View File

@ -18,8 +18,9 @@ def get_config_for_test(strategy: str, enable_smart_edge: bool) -> Dict[str, Any
cfg = chunker.get_chunk_config("concept")
cfg['strategy'] = strategy
cfg['enable_smart_edge_allocation'] = enable_smart_edge
cfg['target'] = 150 # Kleineres Target für sicherere Splits im Test
cfg['max'] = 300
# WICHTIG: Setze sehr kleine Werte, um Split bei kurzem Text zu erzwingen
cfg['target'] = 50
cfg['max'] = 100
return cfg
TEST_NOTE_ID_SMART = "20251212-test-smart"
@ -41,7 +42,7 @@ Der Konflikt zwischen [[leitbild-rollen#Vater]] und [[leitbild-rollen#Beruf]].
Lösung: [[rel:depends_on leitbild-review#Weekly Review]].
"""
# Text mit klaren Absätzen für Sliding Window Test
# Verlängerter Text, um Split > 1 zu erzwingen (bei Target 50)
TEST_MARKDOWN_SLIDING = """
---
id: 20251212-test-legacy
@ -49,9 +50,13 @@ title: Fließtext Protokoll
type: journal
status: active
---
Dies ist der erste lange Absatz. Er enthält viel Text über allgemeine Dinge und Rituale wie [[leitbild-rituale-system]]. Wir schreiben hier viel, damit der Token-Zähler anschlägt. Das ist wichtig für den Test.
Dies ist der erste Absatz. Er muss lang genug sein, damit der Chunker ihn schneidet.
Wir schreiben hier über Rituale wie [[leitbild-rituale-system]] und viele andere Dinge.
Das Wetter ist schön und die Programmierung läuft gut. Dies sind Füllsätze für Länge.
Dies ist der zweite Absatz, der durch eine Leerzeile getrennt ist. Er sollte idealerweise in einem neuen Chunk landen oder zumindest den Split erzwingen, wenn das Target klein genug ist (150 Tokens). Hier steht noch mehr Text.
Dies ist der zweite Absatz. Er ist durch eine Leerzeile getrennt und sollte einen neuen Kontext bilden.
Auch hier schreiben wir viel Text, damit die Token-Anzahl die Grenze von 50 Tokens überschreitet.
Das System muss hier splitten.
"""
class TestFinalWP15Integration(unittest.TestCase):
@ -80,12 +85,7 @@ class TestFinalWP15Integration(unittest.TestCase):
))
self.assertTrue(len(chunks) >= 2, f"A1 Fehler: Erwartete >= 2 Chunks, bekam {len(chunks)}")
# Prüfen auf Injektion (Text muss [[rel:...]] enthalten)
# Hinweis: Da wir keine echte LLM-Antwort garantieren können (Mock fehlt hier),
# prüfen wir zumindest, ob der Code durchlief.
# Wenn LLM fehlschlägt/leer ist, läuft der Code durch (Robustheit).
print(f" -> Chunks generiert: {len(chunks)}")
print(f" -> Chunks generiert (Smart): {len(chunks)}")
def test_b_backward_compatibility(self):
"""B: Prüft Sliding Window (Legacy)."""
@ -98,12 +98,13 @@ class TestFinalWP15Integration(unittest.TestCase):
config=config
))
# Sliding Window muss bei 2 Absätzen und kleinem Target > 1 Chunk liefern
# Sliding Window muss bei diesem langen Text > 1 Chunk liefern
self.assertTrue(len(chunks) >= 2, f"B1 Fehler: Sliding Window lieferte nur {len(chunks)} Chunk(s). Split defekt.")
# Check: Keine LLM Kanten (da deaktiviert)
injected = re.search(r'\[\[rel:', chunks[0].text)
self.assertIsNone(injected, "B2 Fehler: LLM-Kanten trotz Deaktivierung gefunden!")
print(f" -> Chunks generiert (Legacy): {len(chunks)}")
if __name__ == '__main__':
unittest.main()