#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ FILE: scripts/import_markdown.py VERSION: 2.6.1 (2026-01-10) STATUS: Active (Core) COMPATIBILITY: IngestionProcessor v3.4.2+, graph_utils v1.6.2+ Zweck: ------- Hauptwerkzeug zum Importieren von Markdown-Dateien aus einem lokalen Obsidian-Vault in die Qdrant Vektor-Datenbank. Das Script ist darauf optimiert, die strukturelle Integrität des Wissensgraphen zu wahren und die manuelle Nutzer-Autorität vor automatisierten System-Eingriffen zu schützen. Hintergrund der 2-Phasen-Schreibstrategie (Authority-First): ------------------------------------------------------------ Um das Problem der "Ghost-IDs" (Links auf Titel statt IDs) und der asynchronen Überschreibungen (Symmetrien löschen manuelle Kanten) zu lösen, implementiert dieses Script eine strikte Trennung der Arbeitsabläufe: 1. PASS 1: Global Context Discovery (Pre-Scan) - Scannt den gesamten Vault, um ein Mapping von Titeln/Dateinamen zu Note-IDs aufzubauen. - Dieser Cache wird dem IngestionService übergeben, damit Wikilinks wie [[Klaus]] während der Verarbeitung sofort in die korrekte Zeitstempel-ID (z.B. 202601031726-klaus) aufgelöst werden können. - Dies verhindert die Erzeugung falscher UUIDs durch unaufgelöste Bezeichnungen. 2. PHASE 1: Authority Processing (Schreib-Durchlauf) - Alle validen Dateien werden in Batches verarbeitet. - Notizen, Chunks und explizite (vom Nutzer manuell gesetzte) Kanten werden sofort geschrieben. - Durch die Verwendung von 'wait=True' in der Datenbank-Layer (qdrant_points) wird sichergestellt, dass diese Informationen physisch indiziert sind, bevor Phase 2 startet. - Symmetrische Gegenkanten werden während dieser Phase lediglich im Speicher gepuffert. 3. PHASE 2: Global Symmetry Commitment (Integritäts-Sicherung) - Erst nach Abschluss aller Batches wird die Methode commit_vault_symmetries() aufgerufen. - Diese prüft die gepufferten Symmetrie-Vorschläge gegen die bereits existierende Nutzer-Autorität in der Datenbank. - Dank der in graph_utils v1.6.2 zentralisierten ID-Logik (_mk_edge_id) erkennt das System Kollisionen hunderprozentig: Existiert bereits eine manuelle Kante für dieselbe Verbindung, wird die automatische Symmetrie unterdrückt. Detaillierte Funktionsweise: ---------------------------- - Ordner-Filter: Schließt System-Ordner wie .trash, .obsidian, .sync sowie Vorlagen konsequent aus. - Cloud-Resilienz: Implementiert Semaphoren zur Begrenzung paralleler API-Zugriffe (max. 5). - Mixture of Experts (MoE): Nutzt LLM-Validierung zur intelligenten Zuweisung von Kanten. - Change Detection: Vergleicht Hashes, um redundante Schreibvorgänge zu vermeiden. Ergebnis-Interpretation: ------------------------ - Log-Ausgabe: Zeigt detailliert den Fortschritt, LLM-Entscheidungen (✅ OK / ❌ SKIP) und den Status der Symmetrie-Injektion. - Statistiken: Gibt am Ende eine Zusammenfassung über Erfolg, Übersprungene (Hash identisch) und Fehler (z.B. fehlendes Frontmatter). Verwendung: ----------- - Initialer Aufbau: python3 -m scripts.import_markdown --vault /pfad/zum/vault --apply - Update-Lauf: Das Script erkennt Änderungen automatisch via Change Detection. - Erzwingung: Mit --force wird die Hash-Prüfung ignoriert und alles neu indiziert. """ import asyncio import os import argparse import logging import sys from pathlib import Path from typing import List, Dict, Any from dotenv import load_dotenv # Root Logger Setup: INFO-Level für volle Transparenz der fachlichen Prozesse logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) # Sicherstellung, dass das Root-Verzeichnis im Python-Pfad liegt sys.path.append(os.getcwd()) # App-spezifische Imports from app.core.ingestion import IngestionService from app.core.parser import pre_scan_markdown logger = logging.getLogger("importer") async def main_async(args): """ Haupt-Workflow der Ingestion. Koordiniert die zwei Durchläufe (Pass 1/2) und die zwei Schreibphasen (Phase 1/2). """ vault_path = Path(args.vault).resolve() if not vault_path.exists(): logger.error(f"Vault-Pfad existiert nicht: {vault_path}") return # 1. Initialisierung des zentralen Ingestion-Services # Nutzt IngestionProcessor v3.4.2 (initialisiert Registry mit .env Pfaden) logger.info(f"Initializing IngestionService (Prefix: {args.prefix})") service = IngestionService(collection_prefix=args.prefix) logger.info(f"Scanning {vault_path}...") all_files_raw = list(vault_path.rglob("*.md")) # --- GLOBALER ORDNER-FILTER --- # Diese Liste stellt sicher, dass keine System-Leichen oder temporäre Dateien # den Graphen korrumpieren oder zu ID-Kollisionen führen. files = [] ignore_list = [".trash", ".obsidian", ".sync", "templates", "_system", ".git"] for f in all_files_raw: f_str = str(f) # Filtert Ordner aus der ignore_list und versteckte Verzeichnisse if not any(folder in f_str for folder in ignore_list) and not "/." in f_str: files.append(f) files.sort() logger.info(f"Found {len(files)} relevant markdown files (filtered trash/system/hidden).") # ========================================================================= # PASS 1: Global Pre-Scan # Ziel: Aufbau eines vollständigen Mappings von Bezeichnungen zu stabilen IDs. # WICHTIG: Dies ist die Voraussetzung für die korrekte ID-Generierung in Phase 1. # ========================================================================= logger.info(f"🔍 [Pass 1] Global Pre-Scan: Building context cache for {len(files)} files...") for f_path in files: try: # Extrahiert Frontmatter und Metadaten ohne DB-Last # Nutzt service.registry zur Typ-Auflösung ctx = pre_scan_markdown(str(f_path), registry=service.registry) if ctx: # Mehrfache Indizierung für maximale Trefferrate bei Wikilinks service.batch_cache[ctx.note_id] = ctx service.batch_cache[ctx.title] = ctx # Auch den Dateinamen ohne Endung als Alias hinterlegen service.batch_cache[os.path.splitext(f_path.name)[0]] = ctx except Exception as e: logger.warning(f"⚠️ Pre-scan fehlgeschlagen für {f_path.name}: {e}") # ========================================================================= # PHASE 1: Authority Processing (Batch-Lauf) # Ziel: Verarbeitung der Dateiinhalte und Speicherung der Nutzer-Autorität. # ========================================================================= stats = {"processed": 0, "skipped": 0, "errors": 0} # Semaphore begrenzt die Parallelität zum Schutz der lokalen oder Cloud-API sem = asyncio.Semaphore(5) async def process_with_limit(f_path): """Kapselt den Prozess-Aufruf mit Ressourcen-Limitierung.""" async with sem: try: # Verwendet process_file (v3.4.2), das explizite Kanten sofort schreibt. # Symmetrien werden im Service-Puffer gesammelt und NICHT sofort geschrieben. return await service.process_file( file_path=str(f_path), vault_root=str(vault_path), force_replace=args.force, apply=args.apply, purge_before=True ) except Exception as e: return {"status": "error", "error": str(e), "path": str(f_path)} logger.info(f"🚀 [Phase 1] Starting semantic processing in batches...") batch_size = 20 for i in range(0, len(files), batch_size): batch = files[i:i+batch_size] logger.info(f"--- Processing Batch {i//batch_size + 1} ({len(batch)} files) ---") # Parallelisierung innerhalb des Batches (begrenzt durch sem) tasks = [process_with_limit(f) for f in batch] results = await asyncio.gather(*tasks) for res in results: # Robuste Auswertung der Rückgabe-Dictionaries if not isinstance(res, dict): stats["errors"] += 1 continue status = res.get("status") if status == "success": stats["processed"] += 1 elif status == "error": stats["errors"] += 1 logger.error(f"❌ Fehler in {res.get('path')}: {res.get('error')}") elif status == "unchanged": stats["skipped"] += 1 else: stats["skipped"] += 1 # ========================================================================= # PHASE 2: Global Symmetry Commitment # Ziel: Finale Integrität. Triggert erst, wenn Phase 1 komplett indiziert ist. # Verwendet die identische ID-Logik aus graph_utils v1.6.2. # ========================================================================= if args.apply: logger.info(f"🔄 [Phase 2] Starting global symmetry injection for the entire vault...") try: # Diese Methode prüft den Puffer gegen die nun vollständige Datenbank. # Verhindert Duplikate bei der 'Steinzeitaxt' durch Authority-Lookup. sym_res = await service.commit_vault_symmetries() if sym_res.get("status") == "success": logger.info(f"✅ Phase 2 abgeschlossen. Hinzugefügt: {sym_res.get('added', 0)} geschützte Symmetrien.") else: logger.info(f"⏭️ Phase 2 übersprungen: {sym_res.get('reason', 'Keine Daten oder bereits vorhanden')}") except Exception as e: logger.error(f"❌ Fehler in Phase 2: {e}") else: logger.info("⏭️ [Phase 2] Dry-Run: Keine Symmetrie-Injektion durchgeführt.") logger.info(f"--- Import beendet ---") logger.info(f"Statistiken: {stats}") def main(): """Einstiegspunkt und Argument-Parsing.""" load_dotenv() # Standard-Präfix aus Umgebungsvariable oder Fallback default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") # Optionaler Vault-Root aus .env default_vault = os.getenv("MINDNET_VAULT_ROOT", "./vault") parser = argparse.ArgumentParser(description="Mindnet Ingester: Two-Phase Markdown Import") parser.add_argument("--vault", default=default_vault, help="Pfad zum Obsidian Vault") parser.add_argument("--prefix", default=default_prefix, help="Qdrant Collection Präfix") parser.add_argument("--force", action="store_true", help="Erzwingt Neu-Indizierung aller Dateien") parser.add_argument("--apply", action="store_true", help="Schreibt physisch in die Datenbank") args = parser.parse_args() try: asyncio.run(main_async(args)) except KeyboardInterrupt: logger.info("Import durch Nutzer abgebrochen.") except Exception as e: logger.critical(f"FATALER FEHLER: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": main()