From 7953acf3ee7cd58966a09bcf6bfff1b188b4b61d Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 10 Jan 2026 07:35:50 +0100 Subject: [PATCH] Update import_markdown.py to version 2.5.0: Implement global two-phase write strategy, enhance folder filtering to exclude system directories, and refine logging for improved clarity. Adjusted processing phases for better organization and error handling during markdown ingestion. --- scripts/import_markdown.py | 166 ++++++++++++------------------------- 1 file changed, 52 insertions(+), 114 deletions(-) diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index ebf4914..d616f3a 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -2,81 +2,32 @@ # -*- coding: utf-8 -*- """ FILE: scripts/import_markdown.py -VERSION: 2.4.1 (2025-12-15) +VERSION: 2.5.0 (2026-01-10) STATUS: Active (Core) -COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b) +COMPATIBILITY: IngestionProcessor v3.3.5+ Zweck: ------- Hauptwerkzeug zum Importieren von Markdown-Dateien aus einem Vault in Qdrant. -Implementiert den Two-Pass Workflow (WP-15b) für robuste Edge-Validierung. +Implementiert die globale 2-Phasen-Schreibstrategie. -Funktionsweise: ---------------- -1. PASS 1: Global Pre-Scan - - Scannt alle Markdown-Dateien im Vault - - Extrahiert Note-Kontext (ID, Titel, Dateiname) - - Füllt LocalBatchCache für semantische Edge-Validierung - - Indiziert nach ID, Titel und Dateiname für Link-Auflösung - -2. PASS 2: Semantic Processing - - Verarbeitet Dateien in Batches (20 Dateien, max. 5 parallel) - - Nutzt gefüllten Cache für binäre Edge-Validierung - - Erzeugt Notes, Chunks und Edges in Qdrant - - Respektiert Hash-basierte Change Detection - -Ergebnis-Interpretation: ------------------------- -- Log-Ausgabe: Fortschritt und Statistiken -- Stats: processed, skipped, errors -- Exit-Code 0: Erfolgreich (auch wenn einzelne Dateien Fehler haben) -- Ohne --apply: Dry-Run (keine DB-Änderungen) - -Verwendung: ------------ -- Regelmäßiger Import nach Vault-Änderungen -- Initial-Import eines neuen Vaults -- Re-Indexierung mit --force - -Hinweise: ---------- -- Two-Pass Workflow sorgt für robuste Edge-Validierung -- Change Detection verhindert unnötige Re-Indexierung -- Parallele Verarbeitung für Performance (max. 5 gleichzeitig) -- Cloud-Resilienz durch Semaphore-Limits - -Aufruf: -------- -python3 -m scripts.import_markdown --vault ./vault --apply -python3 -m scripts.import_markdown --vault ./vault --prefix mindnet_dev --force --apply - -Parameter: ----------- ---vault PATH Pfad zum Vault-Verzeichnis (Default: ./vault) ---prefix TEXT Collection-Präfix (Default: ENV COLLECTION_PREFIX oder mindnet) ---force Erzwingt Re-Indexierung aller Dateien (ignoriert Hashes) ---apply Führt tatsächliche DB-Schreibvorgänge durch (sonst Dry-Run) - -Änderungen: ------------ -v2.4.1 (2025-12-15): WP-15b Two-Pass Workflow - - Implementiert Pre-Scan für LocalBatchCache - - Indizierung nach ID, Titel und Dateiname - - Batch-Verarbeitung mit Semaphore-Limits -v2.0.0: Initial Release +Änderungen v2.5.0: +------------------ +- Globale Phasentrennung: commit_vault_symmetries() wird erst am Ende aufgerufen. +- Erweiterter Ordner-Filter: Schließt .trash und andere Systemordner aus. """ import asyncio import os import argparse import logging +import sys from pathlib import Path from dotenv import load_dotenv -# Setzt das Level global auf INFO, damit der Fortschritt im Log sichtbar ist +# Setzt das Level global auf INFO logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') -# Importiere den neuen Async Service und stelle Python-Pfad sicher -import sys +# Stelle sicher, dass das Root-Verzeichnis im Python-Pfad ist sys.path.append(os.getcwd()) from app.core.ingestion import IngestionService @@ -95,97 +46,84 @@ async def main_async(args): service = IngestionService(collection_prefix=args.prefix) logger.info(f"Scanning {vault_path}...") - files = list(vault_path.rglob("*.md")) - # Exclude .obsidian folder if present - files = [f for f in files if ".obsidian" not in str(f)] - files.sort() + all_files = list(vault_path.rglob("*.md")) - logger.info(f"Found {len(files)} markdown files.") + # --- ORDNER-FILTER --- + files = [] + ignore_folders = [".trash", ".obsidian", ".sync", "templates", "_system"] + for f in all_files: + f_str = str(f) + if not any(folder in f_str for folder in ignore_folders) and not "/." in f_str: + files.append(f) + + files.sort() + logger.info(f"Found {len(files)} relevant markdown files.") # ========================================================================= - # PASS 1: Global Pre-Scan (WP-15b Harvester) - # Füllt den LocalBatchCache für die semantische Kanten-Validierung. - # Nutzt ID, Titel und Filename für robusten Look-up. + # PASS 1: Global Pre-Scan # ========================================================================= - logger.info(f"🔍 [Pass 1] Pre-scanning {len(files)} files for global context cache...") + logger.info(f"🔍 [Pass 1] Pre-scanning files for global context cache...") for f_path in files: try: ctx = pre_scan_markdown(str(f_path)) if ctx: - # 1. Look-up via Note ID (UUID oder Frontmatter ID) service.batch_cache[ctx.note_id] = ctx - - # 2. Look-up via Titel (Wichtig für Wikilinks [[Titel]]) service.batch_cache[ctx.title] = ctx - - # 3. Look-up via Dateiname (Wichtig für Wikilinks [[Filename]]) fname = os.path.splitext(f_path.name)[0] service.batch_cache[fname] = ctx - - except Exception as e: - logger.warning(f"⚠️ Could not pre-scan {f_path.name}: {e}") - - logger.info(f"✅ Context Cache populated for {len(files)} notes.") + except Exception: pass # ========================================================================= - # PASS 2: Processing (Semantic Batch-Verarbeitung) - # Nutzt den gefüllten Cache zur binären Validierung semantischer Kanten. + # PHASE 1: Batch-Import (Explicit Edges only) # ========================================================================= stats = {"processed": 0, "skipped": 0, "errors": 0} - sem = asyncio.Semaphore(5) # Max 5 parallele Dateien für Cloud-Stabilität + sem = asyncio.Semaphore(5) async def process_with_limit(f_path): async with sem: try: - # Nutzt den nun gefüllten Batch-Cache in der process_file Logik - res = await service.process_file( - file_path=str(f_path), - vault_root=str(vault_path), - force_replace=args.force, - apply=args.apply, - purge_before=True + return await service.process_file( + file_path=str(f_path), vault_root=str(vault_path), + force_replace=args.force, apply=args.apply, purge_before=True ) - return res except Exception as e: return {"status": "error", "error": str(e), "path": str(f_path)} - logger.info(f"🚀 [Pass 2] Starting semantic processing in batches...") - batch_size = 20 for i in range(0, len(files), batch_size): batch = files[i:i+batch_size] - logger.info(f"Processing batch {i} to {i+len(batch)}...") - + logger.info(f"--- Processing Batch {i//batch_size + 1} ---") tasks = [process_with_limit(f) for f in batch] results = await asyncio.gather(*tasks) - for res in results: - if res.get("status") == "success": - stats["processed"] += 1 - elif res.get("status") == "error": - stats["errors"] += 1 - logger.error(f"Error in {res.get('path')}: {res.get('error')}") - else: - stats["skipped"] += 1 + if res.get("status") == "success": stats["processed"] += 1 + elif res.get("status") == "error": stats["errors"] += 1 + else: stats["skipped"] += 1 - logger.info(f"Done. Stats: {stats}") - if not args.apply: - logger.info("DRY RUN. Use --apply to write to DB.") + # ========================================================================= + # PHASE 2: Global Symmetry Injection + # ========================================================================= + if args.apply: + logger.info(f"🔄 [Phase 2] Starting global symmetry injection...") + sym_res = await service.commit_vault_symmetries() + if sym_res.get("status") == "success": + logger.info(f"✅ Added {sym_res.get('added', 0)} protected symmetry edges.") + + logger.info(f"Done. Final Stats: {stats}") def main(): load_dotenv() default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") - - parser = argparse.ArgumentParser(description="Two-Pass Markdown Ingestion for Mindnet") - parser.add_argument("--vault", default="./vault", help="Path to vault root") - parser.add_argument("--prefix", default=default_prefix, help="Collection prefix") - parser.add_argument("--force", action="store_true", help="Force re-index all files") - parser.add_argument("--apply", action="store_true", help="Perform writes to Qdrant") - + parser = argparse.ArgumentParser() + parser.add_argument("--vault", default="./vault") + parser.add_argument("--prefix", default=default_prefix) + parser.add_argument("--force", action="store_true") + parser.add_argument("--apply", action="store_true") args = parser.parse_args() - - # Starte den asynchronen Haupt-Loop - asyncio.run(main_async(args)) + try: + asyncio.run(main_async(args)) + except Exception as e: + logger.critical(f"FATAL ERROR: {e}") if __name__ == "__main__": main() \ No newline at end of file