bug fixing chunker

This commit is contained in:
Lars 2025-12-12 10:27:57 +01:00
parent 135c02bc9a
commit 69617802c3

View File

@ -39,21 +39,14 @@ logger = logging.getLogger(__name__)
def extract_frontmatter_from_text(md_text: str) -> Tuple[Dict[str, Any], str]:
fm_match = re.match(r'^\s*---\s*\n(.*?)\n---', md_text, re.DOTALL)
if not fm_match:
return {}, md_text
if not fm_match: return {}, md_text
frontmatter_yaml = fm_match.group(1)
try:
frontmatter = yaml.safe_load(frontmatter_yaml)
if not isinstance(frontmatter, dict):
frontmatter = {}
if not isinstance(frontmatter, dict): frontmatter = {}
except yaml.YAMLError:
frontmatter = {}
text_without_fm = re.sub(r'^\s*---\s*\n(.*?)\n---', '', md_text, flags=re.DOTALL)
return frontmatter, text_without_fm.strip()
@ -65,43 +58,22 @@ BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_PATH = BASE_DIR / "config" / "types.yaml"
DEFAULT_PROFILE = {"strategy": "sliding_window", "target": 400, "max": 600, "overlap": (50, 80)}
_CONFIG_CACHE = None
def _load_yaml_config() -> Dict[str, Any]:
global _CONFIG_CACHE
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if _CONFIG_CACHE is not None:
return _CONFIG_CACHE
if not CONFIG_PATH.exists():
return {}
if _CONFIG_CACHE is not None: return _CONFIG_CACHE
if not CONFIG_PATH.exists(): return {}
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
_CONFIG_CACHE = data
return data
except Exception as e:
return {}
with open(CONFIG_PATH, "r", encoding="utf-8") as f: data = yaml.safe_load(f); _CONFIG_CACHE = data; return data
except Exception as e: return {}
def get_chunk_config(note_type: str) -> Dict[str, Any]:
full_config = _load_yaml_config()
profiles = full_config.get("chunking_profiles", {})
type_def = full_config.get("types", {}).get(note_type.lower(), {})
profile_name = type_def.get("chunking_profile")
if not profile_name:
profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
full_config = _load_yaml_config(); profiles = full_config.get("chunking_profiles", {})
type_def = full_config.get("types", {}).get(note_type.lower(), {}); profile_name = type_def.get("chunking_profile")
if not profile_name: profile_name = full_config.get("defaults", {}).get("chunking_profile", "sliding_standard")
config = profiles.get(profile_name, DEFAULT_PROFILE).copy()
if "overlap" in config and isinstance(config["overlap"], list):
config["overlap"] = tuple(config["overlap"])
if "overlap" in config and isinstance(config["overlap"], list): config["overlap"] = tuple(config["overlap"])
return config
def get_sizes(note_type: str):
cfg = get_chunk_config(note_type)
return {"target": (cfg["target"], cfg["target"]), "max": cfg["max"], "overlap": cfg["overlap"]}
cfg = get_chunk_config(note_type); return {"target": (cfg["target"], cfg["target"]), "max": cfg["max"], "overlap": cfg["overlap"]}
# ==========================================
@ -112,12 +84,12 @@ _SENT_SPLIT = re.compile(r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9„(])')
_WS = re.compile(r'\s+')
def estimate_tokens(text: str) -> int:
t = len(text.strip())
return max(1, math.ceil(t / 4))
t = len(text.strip()); return max(1, math.ceil(t / 4))
# FIX: Kurzschreibweise aufgelöst, um Linter-Fehler zu vermeiden
def split_sentences(text: str) -> list[str]:
text = _WS.sub(' ', text.strip())
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if not text:
return []
@ -135,18 +107,11 @@ class Chunk:
def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
md = MarkdownIt("commonmark").enable("table")
tokens: List[Token] = md.parse(md_text)
blocks: List[RawBlock] = []
h1_title = "Dokument"; h2, h3 = None, None; section_path = "/"
blocks: List[RawBlock] = []; h1_title = "Dokument"; h2, h3 = None, None; section_path = "/"
fm, text_without_fm = extract_frontmatter_from_text(md_text)
if text_without_fm.strip():
blocks.append(RawBlock(kind="paragraph", text=text_without_fm.strip(), level=None, section_path=section_path, section_title=h2))
if text_without_fm.strip(): blocks.append(RawBlock(kind="paragraph", text=text_without_fm.strip(), level=None, section_path=section_path, section_title=h2))
h1_match = re.search(r'^#\s+(.*)', text_without_fm, re.MULTILINE)
if h1_match:
h1_title = h1_match.group(1).strip()
if h1_match: h1_title = h1_match.group(1).strip()
return blocks, h1_title
# ==========================================
@ -155,10 +120,7 @@ def parse_blocks(md_text: str) -> Tuple[List[RawBlock], str]:
def _strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note_id: str, doc_title: str = "", context_prefix: str = "") -> List[Chunk]:
"""Klassisches Sliding Window."""
target = config.get("target", 400)
max_tokens = config.get("max", 600)
overlap_val = config.get("overlap", (50, 80))
overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
target = config.get("target", 400); max_tokens = config.get("max", 600); overlap_val = config.get("overlap", (50, 80)); overlap = sum(overlap_val) // 2 if isinstance(overlap_val, tuple) else overlap_val
chunks: List[Chunk] = []; buf: List[RawBlock] = []
def flush_buffer():
@ -212,15 +174,12 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id
for b in blocks:
if b.kind == "heading": continue
if b.section_path not in sections:
sections[b.section_path] = []
ordered.append(b.section_path)
sections[b.section_path] = []; ordered.append(b.section_path)
sections[b.section_path].append(b)
for path in ordered:
s_blocks = sections[path]
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if not s_blocks:
continue
if not s_blocks: continue
breadcrumbs = path.strip("/").replace("/", " > ")
context_header = f"# {doc_title}\n## {breadcrumbs}"
@ -246,7 +205,6 @@ def _strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id
_semantic_analyzer_instance = None
def _get_semantic_analyzer_instance() -> SemanticAnalyzer:
global _semantic_analyzer_instance
# FEHLER BEHOBEN: Zeilenumbruch eingefügt
if _semantic_analyzer_instance is None:
_semantic_analyzer_instance = SemanticAnalyzer()
return _semantic_analyzer_instance
@ -257,7 +215,6 @@ def _extract_all_edges_from_md(md_text: str, note_id: str, note_type: str) -> Li
Ruft die Edge-Derivation auf Note-Ebene auf und gibt die Kanten im Format "kind:Target" zurück.
"""
# Korrigierte Argumentreihenfolge
raw_edges: List[Dict] = build_edges_for_note(
md_text,
note_id,
@ -303,9 +260,9 @@ async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: s
for chunk in chunks:
# Starte den LLM-Filter-Call für jeden Chunk parallel
task = analyzer.analyze_and_chunk(
chunk_text=chunk.text,
text=chunk.text,
source_type=note_type,
all_note_edges=all_note_edges_list,
note_type=note_type,
)
llm_tasks.append(task)
@ -353,22 +310,24 @@ async def _strategy_smart_edge_allocation(md_text: str, config: Dict, note_id: s
# 6. MAIN ENTRY POINT (ASYNC)
# ==========================================
async def assemble_chunks(note_id: str, md_text: str, note_type: str) -> List[Chunk]:
async def assemble_chunks(note_id: str, md_text: str, note_type: str, config: Optional[Dict] = None) -> List[Chunk]:
"""
Hauptfunktion. Analysiert Config und wählt Strategie (MUSS ASYNC SEIN).
Akzeptiert optional 'config' zur Überschreibung der Laufzeitkonfiguration (für Tests).
"""
# 1. Frontmatter prüfen (Double-LLM-Prevention)
# 1. Konfiguration laden (überschreiben, falls im Test injiziert)
if config is None:
config = get_chunk_config(note_type)
# 2. Frontmatter prüfen (Double-LLM-Prevention)
fm, body = extract_frontmatter_from_text(md_text)
note_status = fm.get("status", "").lower()
config = get_chunk_config(note_type)
strategy = config.get("strategy", "sliding_window")
# Neue Konfigurationsprüfung
enable_smart_edge = config.get("enable_smart_edge_allocation", False)
# 2. Strategie-Auswahl
# 3. Strategie-Auswahl
# A. Override bei Draft-Status
if enable_smart_edge and note_status in ["draft", "initial_gen"]: