modified: tests/test_smart_chunking_integration.py

This commit is contained in:
Lars 2025-12-12 09:16:25 +01:00
parent 69ad7bc823
commit 94dbaafc72

View File

@ -1,4 +1,4 @@
# tests/test_smart_chunking_integration.py
# tests/test_smart_chunking_integration.py (Final für Stabilität und Cleanup)
import asyncio
import unittest
@ -6,8 +6,6 @@ import os
import sys
from pathlib import Path
from typing import List, Dict
import time
import threading # Import für die Thread-basierte Schließung
# --- PFAD-KORREKTUR ---
ROOT_DIR = Path(__file__).resolve().parent.parent
@ -17,7 +15,6 @@ sys.path.insert(0, str(ROOT_DIR))
# Import der Kernkomponenten
from app.core import chunker
from app.core import derive_edges
# Import der benötigten Klasse SemanticAnalyzer
from app.services.semantic_analyzer import SemanticAnalyzer
@ -43,17 +40,26 @@ Am Nachmittag gab es einen Konflikt bei der Karate-Trainer-Ausbildung. Ein Schü
Abends habe ich den wöchentlichen Load-Check mit meinem Partner gemacht. Das Paar-Ritual [[leitbild-rituale-system#R5]] hilft, das Ziel [[leitbild-ziele-portfolio#Nordstern Partner]] aktiv zu verfolgen. Es ist der operative Rhythmus für uns beide.
"""
# --- HILFSFUNKTION FÜR DAS ASYNCHRONE SCHLIESSEN IN SYNCHRONER UMGEBUNG ---
# Dies löst den Event Loop is closed Fehler in Python 3.12+
def run_async_in_new_loop(coro):
"""Führt eine Koroutine in einem neuen, temporären asyncio-Loop aus."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# --- HILFSFUNKTION FÜR DAS ASYNCHRONE SCHLIESSEN ---
# Führt die asynchrone Koroutine in einem temporären, dedizierten Loop aus.
def _teardown_sync_async_client(coro):
"""Führt eine async Koroutine in einem eigenen, temporären Loop aus."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
if loop.is_running():
# Wenn der Loop bereits läuft (z.B. durch andere async-Tasks im Hintergrund),
# nutzen wir run_coroutine_threadsafe.
future = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return future.result(timeout=5)
except Exception:
future.cancel()
else:
# Führe im aktuellen Thread aus, wenn kein Loop läuft (typischer teardown-Fall)
return loop.run_until_complete(coro)
finally:
loop.close()
asyncio.set_event_loop(asyncio.new_event_loop()) # Setzt den Loop zurück
class TestSemanticChunking(unittest.TestCase):
@ -63,7 +69,6 @@ class TestSemanticChunking(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Initialisiert den SemanticAnalyzer einmalig."""
# WICHTIG: Die Instanz muss erzeugt werden, bevor der erste async-Call läuft.
cls._analyzer_instance = SemanticAnalyzer()
# Stellt sicher, dass der Chunker diese Singleton-Instanz verwendet
chunker._semantic_analyzer_instance = cls._analyzer_instance
@ -72,8 +77,8 @@ class TestSemanticChunking(unittest.TestCase):
def tearDownClass(cls):
"""Schließt den httpx.AsyncClient nach allen Tests (Löst Loop-Konflikt)."""
if cls._analyzer_instance:
# Wir führen die async close Methode in einem neuen Loop aus
run_async_in_new_loop(cls._analyzer_instance.close())
# Nutzt die temporäre Loop-Lösung
_teardown_sync_async_client(cls._analyzer_instance.close())
def setUp(self):
self.config = chunker.get_chunk_config(TEST_NOTE_TYPE)
@ -86,6 +91,7 @@ class TestSemanticChunking(unittest.TestCase):
def test_b_llm_chunking_and_injection(self):
"""
Prüft den gesamten End-to-End-Flow: 1. LLM-Chunking, 2. Kanten-Injektion, 3. Kanten-Erkennung.
(Diese Tests setzen voraus, dass das LLM JSON liefert)
"""
# --- 1. Chunking (Asynchron) ---
@ -97,8 +103,7 @@ class TestSemanticChunking(unittest.TestCase):
print(f"\n--- LLM Chunker Output: {len(chunks)} Chunks ---")
# Assertion B1: Zerlegung (Das LLM muss mehr als 1 Chunk liefern)
# Die LLM-Stabilität ist das Problem. Wir prüfen auf erfolgreiche Zerlegung.
# Assertion B1: Zerlegung (Wenn das LLM erfolgreich war, muss > 1 Chunk geliefert werden)
self.assertTrue(len(chunks) > 1,
"Assertion B1 Fehler: LLM hat nicht zerlegt (Fallback aktiv). Prüfe LLM-Stabilität.")