From e27b1f4621704a902f214d931a425cd6d65faf71 Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 12 Dec 2025 11:56:44 +0100 Subject: [PATCH] bug fix --- app/core/chunker.py | 60 ++++++++--------------------- tests/test_final_wp15_validation.py | 25 ++++++------ 2 files changed, 30 insertions(+), 55 deletions(-) diff --git a/app/core/chunker.py b/app/core/chunker.py index 95be924..e93bceb 100644 --- a/app/core/chunker.py +++ b/app/core/chunker.py @@ -17,8 +17,8 @@ from app.services.semantic_analyzer import get_semantic_analyzer try: from app.core.derive_edges import build_edges_for_note except ImportError: - # Mock für Tests - def build_edges_for_note(md_text, note_id, note_type, chunks=[], references=[]): return [] + # Mock für Tests: Signatur muss mit dem Aufruf übereinstimmen + def build_edges_for_note(text, note_id, note_type, chunks=[], references=[]): return [] logger = logging.getLogger(__name__) @@ -70,8 +70,7 @@ def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]: # 2. DATA CLASSES # ========================================== -_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])') -_WS = re.compile(r'\s+') +_SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])'); _WS = re.compile(r'\s+') def estimate_tokens(text: str) -> int: return max(1, math.ceil(len(text.strip()) / 4)) @@ -98,10 +97,7 @@ class Chunk: # ========================================== def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: - """ - Zerlegt Text in logische Blöcke (Absätze, Header). - Verbesserte Version: Splittet auch reine Absätze. - """ + """Zerlegt Text in logische Blöcke (Absätze, Header).""" blocks = [] h1_title = "Dokument" section_path = "/" @@ -114,19 +110,14 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: if h1_match: h1_title = h1_match.group(1).strip() - # Rudimentäres Parsing (Markdown-It ist komplex einzubinden ohne vollen Visitor) - # Wir splitten hier einfach an Doppel-Newlines für Paragraphen, wenn keine Header da sind. - - # Zuerst Header-Struktur bewahren lines = text_without_fm.split('\n') buffer = [] for line in lines: stripped = line.strip() - if stripped.startswith('# '): # H1 ignorieren wir im Body meist + if stripped.startswith('# '): continue elif stripped.startswith('## '): - # Flush buffer if buffer: content = "\n".join(buffer).strip() if content: @@ -136,7 +127,6 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]: section_path = f"/{current_h2}" blocks.append(RawBlock("heading", stripped, 2, section_path, current_h2)) elif not stripped: - # Leere Zeile -> Absatzende if buffer: content = "\n".join(buffer).strip() if content: @@ -157,9 +147,7 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not max_tokens = config.get("max", 600) overlap_val = config.get("overlap", (50, 80)) overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val - - chunks = [] - buf = [] # Buffer für Blöcke + chunks = []; buf = [] def _create_chunk(txt, win, sec, path): idx = len(chunks) @@ -174,17 +162,12 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not nonlocal buf if not buf: return - # Kombiniere Blöcke im Buffer text_body = "\n\n".join([b.text for b in buf]) - sec_title = buf[-1].section_title if buf else None - sec_path = buf[-1].section_path if buf else "/" + win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body - # Check Größe if estimate_tokens(text_body) <= max_tokens: - win_body = f"{context_prefix}\n{text_body}".strip() if context_prefix else text_body - _create_chunk(text_body, win_body, sec_title, sec_path) + _create_chunk(text_body, win_body, buf[-1].section_title, buf[-1].section_path) else: - # Text ist zu groß -> Splitte nach Sätzen sentences = split_sentences(text_body) current_chunk_sents = [] current_len = 0 @@ -192,13 +175,10 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not for sent in sentences: sent_len = estimate_tokens(sent) if current_len + sent_len > target and current_chunk_sents: - # Chunk abschließen c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk(c_txt, c_win, sec_title, sec_path) + _create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path) - # Overlap für nächsten Chunk - # Wir nehmen die letzten Sätze, die in den Overlap passen overlap_sents = [] ov_len = 0 for s in reversed(current_chunk_sents): @@ -215,25 +195,19 @@ def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], not current_chunk_sents.append(sent) current_len += sent_len - # Rest verarbeiten if current_chunk_sents: c_txt = " ".join(current_chunk_sents) c_win = f"{context_prefix}\n{c_txt}".strip() if context_prefix else c_txt - _create_chunk(c_txt, c_win, sec_title, sec_path) + _create_chunk(c_txt, c_win, buf[-1].section_title, buf[-1].section_path) buf = [] for b in blocks: - if b.kind == "heading": continue # Header nicht direkt in Text mischen, dienen nur Struktur - - # Wenn Buffer + neuer Block zu groß -> Flush + if b.kind == "heading": continue current_buf_text = "\n\n".join([x.text for x in buf]) if estimate_tokens(current_buf_text) + estimate_tokens(b.text) >= target: flush_buffer() - buf.append(b) - - # Wenn der Block selbst riesig ist (größer als Target), sofort flushen und splitten if estimate_tokens(b.text) >= target: flush_buffer() @@ -244,7 +218,7 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id return _strategy_sliding_window(blocks, config, note_id, doc_title, context_prefix=f"# {doc_title}") # ========================================== -# 4. ORCHESTRATION (ASYNC) - WP-15 CORE +# 4. ORCHESTRATION (ASYNC) # ========================================== async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]: @@ -283,17 +257,17 @@ async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Op async def _run_smart_edge_allocation(chunks: List[Chunk], full_text: str, note_id: str, note_type: str) -> List[Chunk]: analyzer = get_semantic_analyzer() - # FIX: Positional Argument für text übergeben, um TypeError zu vermeiden + # FIX: Nutzung von positional arguments für die ersten 3 Parameter + # Dies verhindert den "multiple values for argument" Fehler raw_edges = build_edges_for_note( full_text, - note_id=note_id, - note_type=note_type, + note_id, + note_type, chunks=[], - references=[] # Falls die Signatur references erwartet + references=[] ) all_candidates = set() - # Robustheit: raw_edges könnte None sein, falls der Mock schlecht ist if raw_edges: for e in raw_edges: if e.get("target_id") and e.get("kind") not in ["next", "prev", "belongs_to"]: diff --git a/tests/test_final_wp15_validation.py b/tests/test_final_wp15_validation.py index e7b6363..def57c0 100644 --- a/tests/test_final_wp15_validation.py +++ b/tests/test_final_wp15_validation.py @@ -18,8 +18,9 @@ def get_config_for_test(strategy: str, enable_smart_edge: bool) -> Dict[str, Any cfg = chunker.get_chunk_config("concept") cfg['strategy'] = strategy cfg['enable_smart_edge_allocation'] = enable_smart_edge - cfg['target'] = 150 # Kleineres Target für sicherere Splits im Test - cfg['max'] = 300 + # WICHTIG: Setze sehr kleine Werte, um Split bei kurzem Text zu erzwingen + cfg['target'] = 50 + cfg['max'] = 100 return cfg TEST_NOTE_ID_SMART = "20251212-test-smart" @@ -41,7 +42,7 @@ Der Konflikt zwischen [[leitbild-rollen#Vater]] und [[leitbild-rollen#Beruf]]. Lösung: [[rel:depends_on leitbild-review#Weekly Review]]. """ -# Text mit klaren Absätzen für Sliding Window Test +# Verlängerter Text, um Split > 1 zu erzwingen (bei Target 50) TEST_MARKDOWN_SLIDING = """ --- id: 20251212-test-legacy @@ -49,9 +50,13 @@ title: Fließtext Protokoll type: journal status: active --- -Dies ist der erste lange Absatz. Er enthält viel Text über allgemeine Dinge und Rituale wie [[leitbild-rituale-system]]. Wir schreiben hier viel, damit der Token-Zähler anschlägt. Das ist wichtig für den Test. +Dies ist der erste Absatz. Er muss lang genug sein, damit der Chunker ihn schneidet. +Wir schreiben hier über Rituale wie [[leitbild-rituale-system]] und viele andere Dinge. +Das Wetter ist schön und die Programmierung läuft gut. Dies sind Füllsätze für Länge. -Dies ist der zweite Absatz, der durch eine Leerzeile getrennt ist. Er sollte idealerweise in einem neuen Chunk landen oder zumindest den Split erzwingen, wenn das Target klein genug ist (150 Tokens). Hier steht noch mehr Text. +Dies ist der zweite Absatz. Er ist durch eine Leerzeile getrennt und sollte einen neuen Kontext bilden. +Auch hier schreiben wir viel Text, damit die Token-Anzahl die Grenze von 50 Tokens überschreitet. +Das System muss hier splitten. """ class TestFinalWP15Integration(unittest.TestCase): @@ -80,12 +85,7 @@ class TestFinalWP15Integration(unittest.TestCase): )) self.assertTrue(len(chunks) >= 2, f"A1 Fehler: Erwartete >= 2 Chunks, bekam {len(chunks)}") - - # Prüfen auf Injektion (Text muss [[rel:...]] enthalten) - # Hinweis: Da wir keine echte LLM-Antwort garantieren können (Mock fehlt hier), - # prüfen wir zumindest, ob der Code durchlief. - # Wenn LLM fehlschlägt/leer ist, läuft der Code durch (Robustheit). - print(f" -> Chunks generiert: {len(chunks)}") + print(f" -> Chunks generiert (Smart): {len(chunks)}") def test_b_backward_compatibility(self): """B: Prüft Sliding Window (Legacy).""" @@ -98,12 +98,13 @@ class TestFinalWP15Integration(unittest.TestCase): config=config )) - # Sliding Window muss bei 2 Absätzen und kleinem Target > 1 Chunk liefern + # Sliding Window muss bei diesem langen Text > 1 Chunk liefern self.assertTrue(len(chunks) >= 2, f"B1 Fehler: Sliding Window lieferte nur {len(chunks)} Chunk(s). Split defekt.") # Check: Keine LLM Kanten (da deaktiviert) injected = re.search(r'\[\[rel:', chunks[0].text) self.assertIsNone(injected, "B2 Fehler: LLM-Kanten trotz Deaktivierung gefunden!") + print(f" -> Chunks generiert (Legacy): {len(chunks)}") if __name__ == '__main__': unittest.main() \ No newline at end of file