Trainer_LLM/scripts/archiv/import_documents.py

94 lines
2.9 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import shutil
import requests
from tqdm import tqdm
# 📌 Konfiguration
API_URL = "http://localhost:8000/embed"
CHUNK_SIZE = 500
OVERLAP = 100
BATCH_SIZE = 20
# 📁 Kommandozeilenparameter auswerten
if len(sys.argv) != 2:
print("❌ Bitte gib eine Kategorie an, z.B.: python index_documents_advanced.py karatetrainer")
sys.exit(1)
CATEGORY = sys.argv[1]
SOURCE_DIR = os.path.expanduser(f"~/knowledge/{CATEGORY}")
ARCHIVE_DIR = os.path.join(SOURCE_DIR, "_imported")
COLLECTION = CATEGORY
if not os.path.exists(SOURCE_DIR):
print(f"❌ Der Ordner '{SOURCE_DIR}' existiert nicht.")
sys.exit(1)
os.makedirs(ARCHIVE_DIR, exist_ok=True)
print(f"📁 Lese Dokumente aus: {SOURCE_DIR}")
print(f"📂 Archivierte Dateien: {ARCHIVE_DIR}")
print(f"🎯 Ziel-Collection: {COLLECTION}")
# 🔧 Text in überlappende Chunks aufteilen
def chunk_text(text, size=CHUNK_SIZE, overlap=OVERLAP):
chunks = []
start = 0
while start < len(text):
end = min(start + size, len(text))
chunks.append(text[start:end])
start += size - overlap
return chunks
# 📚 Alle .txt-Dateien im Ordner lesen
def read_all_text_files(folder):
file_chunk_map = {} # Map: filename → chunks
for filename in os.listdir(folder):
if filename.endswith(".txt"):
path = os.path.join(folder, filename)
with open(path, "r", encoding="utf-8") as f:
text = f.read()
file_chunk_map[filename] = chunk_text(text)
return file_chunk_map
# 📤 An API senden
def embed_chunks_in_batches(chunks, collection):
results = []
for i in tqdm(range(0, len(chunks), BATCH_SIZE), desc="📡 Embedding"):
batch = chunks[i:i + BATCH_SIZE]
response = requests.post(API_URL, json={"texts": batch, "collection": collection})
response.raise_for_status()
results.append(response.json())
return results
# 🚀 Hauptlogik
if __name__ == "__main__":
file_chunk_map = read_all_text_files(SOURCE_DIR)
all_chunks = []
processed_files = []
for filename, chunks in file_chunk_map.items():
if chunks:
all_chunks.extend(chunks)
processed_files.append(filename)
if not all_chunks:
print("⚠️ Keine Textabschnitte gefunden.")
sys.exit(0)
print(f"📦 {len(all_chunks)} Textabschnitte aus {len(processed_files)} Dateien gefunden.")
try:
result = embed_chunks_in_batches(all_chunks, COLLECTION)
print(f"\n✅ Embedding abgeschlossen: {len(result)} API-Antwort(en) erhalten.")
# 🗃️ Verarbeitete Dateien archivieren
for filename in processed_files:
src = os.path.join(SOURCE_DIR, filename)
dst = os.path.join(ARCHIVE_DIR, filename)
shutil.move(src, dst)
print(f"📁 {len(processed_files)} Dateien verschoben nach _imported.")
except Exception as e:
print(f"❌ Fehler beim Senden: {e}")