Trainer_LLM/scripts/import_txtdocuments.py

136 lines
4.3 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import shutil
import requests
import re
from tqdm import tqdm
from datetime import datetime
# 📌 Konfiguration
API_URL = "http://localhost:8000/embed"
CHUNK_SIZE = 500
OVERLAP = 100
BATCH_SIZE = 20
# 📁 Kommandozeilenparameter auswerten
if len(sys.argv) != 2:
print("❌ Bitte gib eine Kategorie an, z.B.: python import_txtdocuments.py karatetrainer")
sys.exit(1)
CATEGORY = sys.argv[1]
SOURCE_DIR = os.path.expanduser(f"~/knowledge/{CATEGORY}")
ARCHIVE_DIR = os.path.join(SOURCE_DIR, "_imported")
COLLECTION = CATEGORY
if not os.path.exists(SOURCE_DIR):
print(f"❌ Der Ordner '{SOURCE_DIR}' existiert nicht.")
sys.exit(1)
os.makedirs(ARCHIVE_DIR, exist_ok=True)
print(f"📁 Lese Dokumente aus: {SOURCE_DIR}")
print(f"📂 Archivierte Dateien: {ARCHIVE_DIR}")
print(f"🎯 Ziel-Collection: {COLLECTION}")
# 🔧 Text in überlappende Chunks aufteilen
#def chunk_text(text, size=CHUNK_SIZE, overlap=OVERLAP):
# chunks = []
# start = 0
# while start < len(text):
# end = min(start + size, len(text))
# chunks.append(text[start:end])
# start += size - overlap
# return chunks
def chunk_text_paragraphs(text, max_length=500):
paragraphs = re.split(r'\n\s*\n', text.strip()) # Absatztrennung
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) + 2 <= max_length:
current_chunk += para + "\n\n"
else:
if current_chunk:
chunks.append(current_chunk.strip())
# Falls einzelner Absatz zu groß ist, hart splitten
if len(para) > max_length:
for i in range(0, len(para), max_length):
chunks.append(para[i:i+max_length].strip())
current_chunk = ""
else:
current_chunk = para + "\n\n"
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
# 📚 Alle .txt-Dateien im Ordner lesen
def read_all_text_files(folder):
file_chunk_map = {} # Map: filename → chunks
for filename in os.listdir(folder):
if filename.endswith(".txt"):
path = os.path.join(folder, filename)
with open(path, "r", encoding="utf-8") as f:
text = f.read()
file_chunk_map[filename] = chunk_text_paragraphs(text)
return file_chunk_map
# 🧱 Strukturierte Payloads vorbereiten
def prepare_payloads(file_chunk_map, collection):
payloads = []
imported_at = datetime.now().isoformat()
for filename, chunks in file_chunk_map.items():
for local_index, chunk in enumerate(chunks):
payload = {
"text": chunk,
"source": filename,
"type": "file",
"category": collection,
"imported_at": imported_at,
"chunk_index": local_index
}
payloads.append(payload)
for p in payloads:
print(f"{p['source']}: chunk_index={p['chunk_index']}")
return payloads
# 📤 An API senden
def embed_chunks_in_batches(payloads, collection):
results = []
for i in tqdm(range(0, len(payloads), BATCH_SIZE), desc="📡 Embedding"):
batch = payloads[i:i + BATCH_SIZE]
response = requests.post(API_URL, json={"chunks": batch, "collection": collection})
response.raise_for_status()
results.append(response.json())
return results
# 🚀 Hauptlogik
if __name__ == "__main__":
file_chunk_map = read_all_text_files(SOURCE_DIR)
processed_files = list(file_chunk_map.keys())
payloads = prepare_payloads(file_chunk_map, COLLECTION)
if not payloads:
print("⚠️ Keine Textabschnitte gefunden.")
sys.exit(0)
print(f"📦 {len(payloads)} Textabschnitte aus {len(processed_files)} Dateien gefunden. Sende an API...")
try:
result = embed_chunks_in_batches(payloads, COLLECTION)
print(f"\n✅ Embedding abgeschlossen: {len(result)} API-Antwort(en) erhalten.")
# 🗃️ Verarbeitete Dateien archivieren
for filename in processed_files:
src = os.path.join(SOURCE_DIR, filename)
dst = os.path.join(ARCHIVE_DIR, filename)
shutil.move(src, dst)
print(f"📁 {len(processed_files)} Dateien verschoben nach _imported.")
except Exception as e:
print(f"❌ Fehler beim Senden: {e}")