Trainer_LLM/scripts/chunking_utils.py

156 lines
5.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# --------------------------------------------------
# chunking_utils.py
#
# Enthält robuste Text-Chunking-Logik:
# 1. Absatzbasiertes Chunking
# 2. Satzbasiertes Chunking per Regex (kein NLTK)
# 3. Satz-Overlap-Chunking
# --------------------------------------------------
import re
# --------------------------------------------------
# Hilfsfunktion: split_sentences
# Zweck:
# - Teilt Text in Sätze auf, basierend auf Punkt, Ausrufe- und Fragezeichen
# - Trennt bei ".!? " (Satzzeichen gefolgt von Leerraum)
# Parameter:
# text : Volltext als String
# Rückgabe:
# Liste von Satz-Strings
# --------------------------------------------------
def split_sentences(text: str) -> list[str]:
# Regex: lookbehind für . ! oder ?, dann ein oder mehrere Whitespace-Zeichen
return re.split(r'(?<=[\.!?])\s+', text.strip())
# --------------------------------------------------
# Funktion: chunk_text_paragraphs
# Zweck:
# - Trennt Text absatzweise in Chunks mit bis zu max_length Zeichen
# - Absätze werden an doppelten Zeilenumbrüchen getrennt
# - Zu große Absätze werden hart in max_length-Teile gesplittet
# Parameter:
# text : Volltext als String
# max_length : Maximale Länge eines Chunks (Standard 500)
# Rückgabe:
# Liste von Strings (Chunks)
# --------------------------------------------------
def chunk_text_paragraphs(text: str, max_length: int = 500) -> list[str]:
paragraphs = re.split(r'\n\s*\n', text.strip())
chunks: list[str] = []
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
# Prüfen, ob Absatz noch in aktuellen Chunk passt (+2 für "\n\n")
if len(current_chunk) + len(para) + 2 <= max_length:
if current_chunk:
current_chunk += "\n\n" + para
else:
current_chunk = para
else:
# Bislang gesammelten Chunk speichern
if current_chunk:
chunks.append(current_chunk)
# Absatz hart splitten, wenn er allein zu groß ist
if len(para) > max_length:
for i in range(0, len(para), max_length):
part = para[i:i + max_length]
chunks.append(part)
current_chunk = ""
else:
# Neuer Chunk beginnt mit diesem Absatz
current_chunk = para
# Letzten Chunk nicht vergessen
if current_chunk:
chunks.append(current_chunk)
return chunks
# --------------------------------------------------
# Funktion: chunk_by_sentences
# Zweck:
# - Zerlegt Text in Sätze per Regex-Split
# - Baut daraus Chunks mit ganzen Sätzen bis max_length
# Parameter:
# text : Volltext als String
# max_length : Maximale Länge eines Chunks (Standard 500)
# Rückgabe:
# Liste von Strings (Chunks)
# --------------------------------------------------
def chunk_by_sentences(text: str, max_length: int = 500) -> list[str]:
sentences = split_sentences(text)
chunks: list[str] = []
current_chunk = ""
for sent in sentences:
sent = sent.strip()
if not sent:
continue
# Prüfen, ob Satz noch in aktuellen Chunk passt (+1 für Leerzeichen)
if len(current_chunk) + len(sent) + 1 <= max_length:
if current_chunk:
current_chunk += " " + sent
else:
current_chunk = sent
else:
# Bisher gesammelten Chunk speichern
if current_chunk:
chunks.append(current_chunk)
# Einzelnen Satz hart splitten, falls er zu lang ist
if len(sent) > max_length:
for i in range(0, len(sent), max_length):
chunks.append(sent[i:i + max_length])
current_chunk = ""
else:
current_chunk = sent
# Letzten Chunk nicht vergessen
if current_chunk:
chunks.append(current_chunk)
return chunks
# --------------------------------------------------
# Funktion: chunk_with_sentence_overlap
# Zweck:
# - Baut zunächst sentence-basierte Chunks
# - Fügt vom vorherigen Chunk overlap_sents Sätze vorne an
# Parameter:
# text : Volltext als String
# max_length : Maximale Länge eines Chunks (Standard 500)
# overlap_sents : Anzahl Sätze, die überlappend übernommen werden (Standard 1)
# Rückgabe:
# Liste von Strings (Chunks mit Kontext-Overlap)
# --------------------------------------------------
def chunk_with_sentence_overlap(
text: str,
max_length: int = 500,
overlap_sents: int = 1
) -> list[str]:
base_chunks = chunk_by_sentences(text, max_length)
overlapped: list[str] = []
for idx, chunk in enumerate(base_chunks):
if idx == 0 or overlap_sents <= 0:
overlapped.append(chunk)
else:
prev = base_chunks[idx - 1]
prev_sents = split_sentences(prev)
context = " ".join(prev_sents[-overlap_sents:])
overlapped.append((context + " " + chunk).strip())
return overlapped