mindnet/scripts/import_markdown.py
Lars 7cc823e2f4 NEUSTART von vorne mit frischer Codebasis
Update qdrant_points.py, graph_utils.py, ingestion_db.py, ingestion_processor.py, and import_markdown.py: Enhance UUID generation for edge IDs, improve error handling, and refine documentation for clarity. Implement atomic consistency in batch upserts and ensure strict phase separation in the ingestion workflow. Update versioning to reflect changes in functionality and maintain compatibility with the ingestion service.
2026-01-10 10:56:47 +01:00

240 lines
10 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FILE: scripts/import_markdown.py
VERSION: 2.6.0 (2026-01-10)
STATUS: Active (Core)
COMPATIBILITY: IngestionProcessor v3.4.1+
Zweck:
-------
Hauptwerkzeug zum Importieren von Markdown-Dateien aus einem lokalen Obsidian-Vault in die
Qdrant Vektor-Datenbank. Das Script ist darauf optimiert, die strukturelle Integrität des
Wissensgraphen zu wahren und die manuelle Nutzer-Autorität vor automatisierten System-Eingriffen
zu schützen.
Hintergrund der 2-Phasen-Strategie (Authority-First):
------------------------------------------------------
Um das Problem der "Ghost-IDs" und der asynchronen Überschreibungen zu lösen, implementiert
dieses Script eine strikte Trennung der Schreibvorgänge:
1. PHASE 1: Authority Processing (Batch-Modus)
- Alle Dateien werden gescannt und verarbeitet.
- Notizen, Chunks und explizite (vom Nutzer gesetzte) Kanten werden sofort geschrieben.
- Durch die Verwendung von 'wait=True' in der Datenbank-Layer wird sichergestellt,
dass diese Informationen physisch indiziert sind, bevor der nächste Schritt erfolgt.
- Symmetrische Gegenkanten werden während dieser Phase lediglich im Speicher gepuffert.
2. PHASE 2: Global Symmetry Commitment (Finaler Schritt)
- Erst nach Abschluss aller Batches wird die Methode commit_vault_symmetries() aufgerufen.
- Diese prüft die gepufferten Symmetrie-Vorschläge gegen die bereits existierende
Nutzer-Autorität in der Datenbank.
- Existiert bereits eine manuelle Kante für dieselbe Verbindung, wird die automatische
Symmetrie unterdrückt.
Detaillierte Funktionsweise:
----------------------------
1. PASS 1: Global Pre-Scan
- Scannt rekursiv alle Markdown-Dateien im Vault.
- Schließt System-Ordner wie .trash, .obsidian, .sync sowie Vorlagen konsequent aus.
- Extrahiert Note-Kontext (ID, Titel, Dateiname) ohne DB-Schreibzugriff.
- Füllt den LocalBatchCache im IngestionService, der als Single-Source-of-Truth für
die spätere Link-Auflösung (Kanonisierung) dient.
- Dies stellt sicher, dass Wikilinks wie [[Klaus]] korrekt zu Zeitstempel-IDs wie
202601031726-klaus aufgelöst werden, BEVOR eine UUID für die Kante berechnet wird.
2. PASS 2: Semantic Processing
- Verarbeitet Dateien in konfigurierten Batches (Standard: 20 Dateien).
- Implementiert Cloud-Resilienz durch Semaphoren (max. 5 parallele Zugriffe).
- Nutzt die Mixture of Experts (MoE) Architektur zur semantischen Validierung von Links.
- Führt eine Hash-basierte Change Detection durch, um unnötige Schreibvorgänge zu vermeiden.
- Schreibt die Ergebnisse (Notes, Chunks, Explicit Edges) konsistent nach Qdrant.
Ergebnis-Interpretation:
------------------------
- Log-Ausgabe: Liefert detaillierte Informationen über den Fortschritt, LLM-Entscheidungen
und die finale Symmetrie-Validierung.
- Statistiken: Gibt am Ende eine Zusammenfassung über verarbeitete, übersprungene und
fehlerhafte Dateien aus.
- Dry-Run: Ohne den Parameter --apply werden keine physischen Änderungen an der Datenbank
vorgenommen, der gesamte Workflow (inkl. LLM-Anfragen) wird jedoch simuliert.
Verwendung:
-----------
- Regelmäßiger Import nach Änderungen im Vault.
- Initialer Aufbau eines neuen Wissensgraphen.
- Erzwingung einer Re-Indizierung mittels --force.
"""
import asyncio
import os
import argparse
import logging
import sys
from pathlib import Path
from typing import List, Dict, Any
from dotenv import load_dotenv
# Root Logger Setup:INFO-Level für volle Transparenz der fachlichen Prozesse
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
# Sicherstellung, dass das Root-Verzeichnis im Python-Pfad liegt
sys.path.append(os.getcwd())
# App-spezifische Imports
from app.core.ingestion import IngestionService
from app.core.parser import pre_scan_markdown
logger = logging.getLogger("importer")
async def main_async(args):
"""
Haupt-Workflow der Ingestion. Koordiniert die zwei Durchläufe (Pass 1/2)
und die zwei Schreibphasen (Phase 1/2).
"""
vault_path = Path(args.vault).resolve()
if not vault_path.exists():
logger.error(f"Vault-Pfad existiert nicht: {vault_path}")
return
# 1. Initialisierung des zentralen Ingestion-Services
logger.info(f"Initializing IngestionService (Prefix: {args.prefix})")
service = IngestionService(collection_prefix=args.prefix)
logger.info(f"Scanning {vault_path}...")
all_files_raw = list(vault_path.rglob("*.md"))
# --- GLOBALER ORDNER-FILTER ---
# Diese Liste stellt sicher, dass keine System-Leichen oder temporäre Dateien
# den Graphen korrumpieren oder zu ID-Kollisionen führen.
files = []
ignore_list = [".trash", ".obsidian", ".sync", "templates", "_system", ".git"]
for f in all_files_raw:
f_str = str(f)
# Filtert Ordner aus der ignore_list und versteckte Verzeichnisse
if not any(folder in f_str for folder in ignore_list) and not "/." in f_str:
files.append(f)
files.sort()
logger.info(f"Found {len(files)} relevant markdown files (filtered trash/system/hidden).")
# =========================================================================
# PASS 1: Global Pre-Scan
# Ziel: Aufbau eines vollständigen Mappings von Bezeichnungen zu stabilen IDs.
# =========================================================================
logger.info(f"🔍 [Pass 1] Global Pre-Scan: Building context cache for {len(files)} files...")
for f_path in files:
try:
# Extrahiert Frontmatter und Metadaten ohne DB-Last
ctx = pre_scan_markdown(str(f_path))
if ctx:
# Mehrfache Indizierung für maximale Trefferrate bei Wikilinks
service.batch_cache[ctx.note_id] = ctx
service.batch_cache[ctx.title] = ctx
# Auch den Dateinamen ohne Endung als Alias hinterlegen
service.batch_cache[os.path.splitext(f_path.name)[0]] = ctx
except Exception as e:
logger.warning(f"⚠️ Pre-scan fehlgeschlagen für {f_path.name}: {e}")
# =========================================================================
# PHASE 1: Authority Processing (Batch-Lauf)
# Ziel: Verarbeitung der Dateiinhalte und Speicherung der Nutzer-Autorität.
# =========================================================================
stats = {"processed": 0, "skipped": 0, "errors": 0}
# Semaphore begrenzt die Parallelität zum Schutz der lokalen oder Cloud-API
sem = asyncio.Semaphore(5)
async def process_with_limit(f_path):
"""Kapselt den Prozess-Aufruf mit Ressourcen-Limitierung."""
async with sem:
try:
# Verwendet process_file (v3.4.1), das explizite Kanten sofort schreibt
# und Symmetrien für Phase 2 im Service-Puffer sammelt.
return await service.process_file(
file_path=str(f_path),
vault_root=str(vault_path),
force_replace=args.force,
apply=args.apply,
purge_before=True
)
except Exception as e:
return {"status": "error", "error": str(e), "path": str(f_path)}
logger.info(f"🚀 [Phase 1] Starting semantic processing in batches...")
batch_size = 20
for i in range(0, len(files), batch_size):
batch = files[i:i+batch_size]
logger.info(f"--- Processing Batch {i//batch_size + 1} ({len(batch)} files) ---")
# Parallelisierung innerhalb des Batches (begrenzt durch sem)
tasks = [process_with_limit(f) for f in batch]
results = await asyncio.gather(*tasks)
for res in results:
# Robuste Auswertung der Rückgabe-Dictionaries
if not isinstance(res, dict):
stats["errors"] += 1
continue
status = res.get("status")
if status == "success":
stats["processed"] += 1
elif status == "error":
stats["errors"] += 1
logger.error(f"❌ Fehler in {res.get('path')}: {res.get('error')}")
elif status == "unchanged":
stats["skipped"] += 1
else:
stats["skipped"] += 1
# =========================================================================
# PHASE 2: Global Symmetry Commitment
# Ziel: Finale Integrität. Triggert erst, wenn Phase 1 komplett indiziert ist.
# =========================================================================
if args.apply:
logger.info(f"🔄 [Phase 2] Starting global symmetry injection for the entire vault...")
try:
# Diese Methode prüft den Puffer gegen die nun vollständige Datenbank
sym_res = await service.commit_vault_symmetries()
if sym_res.get("status") == "success":
logger.info(f"✅ Phase 2 abgeschlossen. Hinzugefügt: {sym_res.get('added', 0)} geschützte Symmetrien.")
else:
logger.info(f"⏭️ Phase 2 übersprungen: {sym_res.get('reason', 'Keine Daten')}")
except Exception as e:
logger.error(f"❌ Fehler in Phase 2: {e}")
else:
logger.info("⏭️ [Phase 2] Dry-Run: Keine Symmetrie-Injektion durchgeführt.")
logger.info(f"--- Import beendet ---")
logger.info(f"Statistiken: {stats}")
def main():
"""Einstiegspunkt und Argument-Parsing."""
load_dotenv()
# Standard-Präfix aus Umgebungsvariable oder Fallback
default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
parser = argparse.ArgumentParser(description="Mindnet Ingester: Two-Phase Markdown Import")
parser.add_argument("--vault", default="./vault", help="Pfad zum Obsidian Vault")
parser.add_argument("--prefix", default=default_prefix, help="Qdrant Collection Präfix")
parser.add_argument("--force", action="store_true", help="Erzwingt Neu-Indizierung aller Dateien")
parser.add_argument("--apply", action="store_true", help="Schreibt physisch in die Datenbank")
args = parser.parse_args()
try:
asyncio.run(main_async(args))
except KeyboardInterrupt:
logger.info("Import durch Nutzer abgebrochen.")
except Exception as e:
logger.critical(f"FATALER FEHLER: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()