Update import_markdown.py to version 2.5.0: Implement global two-phase write strategy, enhance folder filtering to exclude system directories, and refine logging for improved clarity. Adjusted processing phases for better organization and error handling during markdown ingestion.

This commit is contained in:
Lars 2026-01-10 07:35:50 +01:00
parent 3f528f2184
commit 7953acf3ee

View File

@ -2,81 +2,32 @@
# -*- coding: utf-8 -*-
"""
FILE: scripts/import_markdown.py
VERSION: 2.4.1 (2025-12-15)
VERSION: 2.5.0 (2026-01-10)
STATUS: Active (Core)
COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b)
COMPATIBILITY: IngestionProcessor v3.3.5+
Zweck:
-------
Hauptwerkzeug zum Importieren von Markdown-Dateien aus einem Vault in Qdrant.
Implementiert den Two-Pass Workflow (WP-15b) für robuste Edge-Validierung.
Implementiert die globale 2-Phasen-Schreibstrategie.
Funktionsweise:
---------------
1. PASS 1: Global Pre-Scan
- Scannt alle Markdown-Dateien im Vault
- Extrahiert Note-Kontext (ID, Titel, Dateiname)
- Füllt LocalBatchCache für semantische Edge-Validierung
- Indiziert nach ID, Titel und Dateiname für Link-Auflösung
2. PASS 2: Semantic Processing
- Verarbeitet Dateien in Batches (20 Dateien, max. 5 parallel)
- Nutzt gefüllten Cache für binäre Edge-Validierung
- Erzeugt Notes, Chunks und Edges in Qdrant
- Respektiert Hash-basierte Change Detection
Ergebnis-Interpretation:
------------------------
- Log-Ausgabe: Fortschritt und Statistiken
- Stats: processed, skipped, errors
- Exit-Code 0: Erfolgreich (auch wenn einzelne Dateien Fehler haben)
- Ohne --apply: Dry-Run (keine DB-Änderungen)
Verwendung:
-----------
- Regelmäßiger Import nach Vault-Änderungen
- Initial-Import eines neuen Vaults
- Re-Indexierung mit --force
Hinweise:
---------
- Two-Pass Workflow sorgt für robuste Edge-Validierung
- Change Detection verhindert unnötige Re-Indexierung
- Parallele Verarbeitung für Performance (max. 5 gleichzeitig)
- Cloud-Resilienz durch Semaphore-Limits
Aufruf:
-------
python3 -m scripts.import_markdown --vault ./vault --apply
python3 -m scripts.import_markdown --vault ./vault --prefix mindnet_dev --force --apply
Parameter:
----------
--vault PATH Pfad zum Vault-Verzeichnis (Default: ./vault)
--prefix TEXT Collection-Präfix (Default: ENV COLLECTION_PREFIX oder mindnet)
--force Erzwingt Re-Indexierung aller Dateien (ignoriert Hashes)
--apply Führt tatsächliche DB-Schreibvorgänge durch (sonst Dry-Run)
Änderungen:
-----------
v2.4.1 (2025-12-15): WP-15b Two-Pass Workflow
- Implementiert Pre-Scan für LocalBatchCache
- Indizierung nach ID, Titel und Dateiname
- Batch-Verarbeitung mit Semaphore-Limits
v2.0.0: Initial Release
Änderungen v2.5.0:
------------------
- Globale Phasentrennung: commit_vault_symmetries() wird erst am Ende aufgerufen.
- Erweiterter Ordner-Filter: Schließt .trash und andere Systemordner aus.
"""
import asyncio
import os
import argparse
import logging
import sys
from pathlib import Path
from dotenv import load_dotenv
# Setzt das Level global auf INFO, damit der Fortschritt im Log sichtbar ist
# Setzt das Level global auf INFO
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
# Importiere den neuen Async Service und stelle Python-Pfad sicher
import sys
# Stelle sicher, dass das Root-Verzeichnis im Python-Pfad ist
sys.path.append(os.getcwd())
from app.core.ingestion import IngestionService
@ -95,97 +46,84 @@ async def main_async(args):
service = IngestionService(collection_prefix=args.prefix)
logger.info(f"Scanning {vault_path}...")
files = list(vault_path.rglob("*.md"))
# Exclude .obsidian folder if present
files = [f for f in files if ".obsidian" not in str(f)]
files.sort()
all_files = list(vault_path.rglob("*.md"))
logger.info(f"Found {len(files)} markdown files.")
# --- ORDNER-FILTER ---
files = []
ignore_folders = [".trash", ".obsidian", ".sync", "templates", "_system"]
for f in all_files:
f_str = str(f)
if not any(folder in f_str for folder in ignore_folders) and not "/." in f_str:
files.append(f)
files.sort()
logger.info(f"Found {len(files)} relevant markdown files.")
# =========================================================================
# PASS 1: Global Pre-Scan (WP-15b Harvester)
# Füllt den LocalBatchCache für die semantische Kanten-Validierung.
# Nutzt ID, Titel und Filename für robusten Look-up.
# PASS 1: Global Pre-Scan
# =========================================================================
logger.info(f"🔍 [Pass 1] Pre-scanning {len(files)} files for global context cache...")
logger.info(f"🔍 [Pass 1] Pre-scanning files for global context cache...")
for f_path in files:
try:
ctx = pre_scan_markdown(str(f_path))
if ctx:
# 1. Look-up via Note ID (UUID oder Frontmatter ID)
service.batch_cache[ctx.note_id] = ctx
# 2. Look-up via Titel (Wichtig für Wikilinks [[Titel]])
service.batch_cache[ctx.title] = ctx
# 3. Look-up via Dateiname (Wichtig für Wikilinks [[Filename]])
fname = os.path.splitext(f_path.name)[0]
service.batch_cache[fname] = ctx
except Exception as e:
logger.warning(f"⚠️ Could not pre-scan {f_path.name}: {e}")
logger.info(f"✅ Context Cache populated for {len(files)} notes.")
except Exception: pass
# =========================================================================
# PASS 2: Processing (Semantic Batch-Verarbeitung)
# Nutzt den gefüllten Cache zur binären Validierung semantischer Kanten.
# PHASE 1: Batch-Import (Explicit Edges only)
# =========================================================================
stats = {"processed": 0, "skipped": 0, "errors": 0}
sem = asyncio.Semaphore(5) # Max 5 parallele Dateien für Cloud-Stabilität
sem = asyncio.Semaphore(5)
async def process_with_limit(f_path):
async with sem:
try:
# Nutzt den nun gefüllten Batch-Cache in der process_file Logik
res = await service.process_file(
file_path=str(f_path),
vault_root=str(vault_path),
force_replace=args.force,
apply=args.apply,
purge_before=True
return await service.process_file(
file_path=str(f_path), vault_root=str(vault_path),
force_replace=args.force, apply=args.apply, purge_before=True
)
return res
except Exception as e:
return {"status": "error", "error": str(e), "path": str(f_path)}
logger.info(f"🚀 [Pass 2] Starting semantic processing in batches...")
batch_size = 20
for i in range(0, len(files), batch_size):
batch = files[i:i+batch_size]
logger.info(f"Processing batch {i} to {i+len(batch)}...")
logger.info(f"--- Processing Batch {i//batch_size + 1} ---")
tasks = [process_with_limit(f) for f in batch]
results = await asyncio.gather(*tasks)
for res in results:
if res.get("status") == "success":
stats["processed"] += 1
elif res.get("status") == "error":
stats["errors"] += 1
logger.error(f"Error in {res.get('path')}: {res.get('error')}")
else:
stats["skipped"] += 1
if res.get("status") == "success": stats["processed"] += 1
elif res.get("status") == "error": stats["errors"] += 1
else: stats["skipped"] += 1
logger.info(f"Done. Stats: {stats}")
if not args.apply:
logger.info("DRY RUN. Use --apply to write to DB.")
# =========================================================================
# PHASE 2: Global Symmetry Injection
# =========================================================================
if args.apply:
logger.info(f"🔄 [Phase 2] Starting global symmetry injection...")
sym_res = await service.commit_vault_symmetries()
if sym_res.get("status") == "success":
logger.info(f"✅ Added {sym_res.get('added', 0)} protected symmetry edges.")
logger.info(f"Done. Final Stats: {stats}")
def main():
load_dotenv()
default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
parser = argparse.ArgumentParser(description="Two-Pass Markdown Ingestion for Mindnet")
parser.add_argument("--vault", default="./vault", help="Path to vault root")
parser.add_argument("--prefix", default=default_prefix, help="Collection prefix")
parser.add_argument("--force", action="store_true", help="Force re-index all files")
parser.add_argument("--apply", action="store_true", help="Perform writes to Qdrant")
parser = argparse.ArgumentParser()
parser.add_argument("--vault", default="./vault")
parser.add_argument("--prefix", default=default_prefix)
parser.add_argument("--force", action="store_true")
parser.add_argument("--apply", action="store_true")
args = parser.parse_args()
# Starte den asynchronen Haupt-Loop
asyncio.run(main_async(args))
try:
asyncio.run(main_async(args))
except Exception as e:
logger.critical(f"FATAL ERROR: {e}")
if __name__ == "__main__":
main()