#!/usr/bin/env python3 """ scripts/import_markdown.py CLI-Tool zum Importieren von Markdown-Dateien in Qdrant. Updated for Mindnet v2.3.6 (Async Ingestion Support). """ import asyncio import os import argparse import logging from pathlib import Path from dotenv import load_dotenv # Importiere den neuen Async Service # Stellen wir sicher, dass der Pfad stimmt (Pythonpath) import sys sys.path.append(os.getcwd()) from app.core.ingestion import IngestionService logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("importer") async def main_async(args): vault_path = Path(args.vault).resolve() if not vault_path.exists(): logger.error(f"Vault path does not exist: {vault_path}") return # Service initialisieren (startet Async Clients) logger.info(f"Initializing IngestionService (Prefix: {args.prefix})") service = IngestionService(collection_prefix=args.prefix) logger.info(f"Scanning {vault_path}...") files = list(vault_path.rglob("*.md")) # Exclude .obsidian folder if present files = [f for f in files if ".obsidian" not in str(f)] files.sort() logger.info(f"Found {len(files)} markdown files.") stats = {"processed": 0, "skipped": 0, "errors": 0} # Wir nutzen eine Semaphore, um nicht zu viele Files gleichzeitig zu öffnen/embedden sem = asyncio.Semaphore(5) # Max 5 concurrent files to avoid OOM or Rate Limit async def process_with_limit(f_path): async with sem: try: res = await service.process_file( file_path=str(f_path), vault_root=str(vault_path), force_replace=args.force, apply=args.apply, purge_before=True ) return res except Exception as e: return {"status": "error", "error": str(e), "path": str(f_path)} # Batch Processing # Wir verarbeiten in Chunks, um den Progress zu sehen batch_size = 20 for i in range(0, len(files), batch_size): batch = files[i:i+batch_size] logger.info(f"Processing batch {i} to {i+len(batch)}...") tasks = [process_with_limit(f) for f in batch] results = await asyncio.gather(*tasks) for res in results: if res.get("status") == "success": stats["processed"] += 1 elif res.get("status") == "error": stats["errors"] += 1 logger.error(f"Error in {res.get('path')}: {res.get('error')}") else: stats["skipped"] += 1 logger.info(f"Done. Stats: {stats}") if not args.apply: logger.info("DRY RUN. Use --apply to write to DB.") def main(): load_dotenv() default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") parser = argparse.ArgumentParser(description="Import Vault to Qdrant (Async)") parser.add_argument("--vault", default="./vault", help="Path to vault root") parser.add_argument("--prefix", default=default_prefix, help="Collection prefix") parser.add_argument("--force", action="store_true", help="Force re-index all files") parser.add_argument("--apply", action="store_true", help="Perform writes to Qdrant") args = parser.parse_args() # Starte den Async Loop asyncio.run(main_async(args)) if __name__ == "__main__": main()