#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script: scripts/import_markdown.py Version: 0.7.1 (2025-09-06) Autor: mindnet / Architektur Datenimporte & Sync Kurzbeschreibung --------------- Importiert Markdown-Notizen aus einem Obsidian-ähnlichen Vault in Qdrant: - Validiert Frontmatter / Note-Payload (gegen note.schema.json). - Chunking + Embeddings. - Leitet Edges direkt beim Import aus [[Wikilinks]] ab: - 'references' (Note→Note) - 'references_at' (Chunk→Note) - 'backlink' (Note←Note) (symmetrisch zu 'references') - Idempotente Upserts (deterministische IDs über qdrant_points). Neu in 0.7.1 ------------ - Korrekte Änderungsdetektion via SHA-256 über die **komplette Datei** (Frontmatter+Body): - Feld: payload.hash_fulltext - Vergleicht neuen Hash gegen bestehenden Hash in Qdrant. - Nur bei Änderung → Verarbeitung/Upsert; sonst "skip". - `--purge-before-upsert` wird **nur** ausgeführt, wenn sich die Note **wirklich geändert** hat. - Robuste Qdrant-Scroll-Kompatibilität (2- oder 3-Tupel Rückgaben). Aufrufbeispiele --------------- Dry-Run (nur prüfen, nichts schreiben): python3 -m scripts.import_markdown --vault ./vault Nur eine spezifische Note: python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo Apply (schreiben) mit Purge (nur geänderte Noten werden bereinigt + neu geschrieben): python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert Parameter --------- --vault PATH : Pflicht. Root-Verzeichnis des Vaults. --apply : Wenn gesetzt, werden Upserts durchgeführt (sonst Dry-Run). --purge-before-upsert : Vor Upsert alte Chunks/Edges der **geänderten** Note löschen. --note-id ID : Optional, verarbeitet nur diese eine Note. Umgebungsvariablen (.env) ------------------------- QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM Defaults: url=http://127.0.0.1:6333, prefix=mindnet, dim=384 Kompatibilität -------------- - Bestehende Kernmodule werden weiterverwendet: app.core.parser (read_markdown, normalize_frontmatter, validate_required_frontmatter) app.core.validate_note (validate_note_payload) app.core.chunker (assemble_chunks) app.core.chunk_payload (make_chunk_payloads) app.core.embed (embed_texts) app.core.qdrant (QdrantConfig, ensure_collections, get_client, collection_names) app.core.qdrant_points (points_for_note, points_for_chunks, points_for_edges, upsert_batch) app.core.derive_edges (build_note_index, derive_wikilink_edges) Hinweise -------- - Bitte im aktivierten venv laufen lassen: source .venv/bin/activate """ from __future__ import annotations import argparse import glob import hashlib import json import os import sys from typing import List, Dict, Tuple, Optional from dotenv import load_dotenv from qdrant_client.http import models as rest from qdrant_client import QdrantClient # Kern-Bausteine (aus eurem Projekt) from app.core.parser import ( read_markdown, normalize_frontmatter, validate_required_frontmatter, ) from app.core.validate_note import validate_note_payload from app.core.chunker import assemble_chunks from app.core.chunk_payload import make_chunk_payloads from app.core.embed import embed_texts from app.core.qdrant import QdrantConfig, ensure_collections, get_client, collection_names from app.core.qdrant_points import ( points_for_note, points_for_chunks, points_for_edges, upsert_batch, ) from app.core.derive_edges import build_note_index, derive_wikilink_edges # ------------------------------------------------- # Hilfsfunktionen # ------------------------------------------------- def iter_md(root: str) -> List[str]: patterns = ["**/*.md", "*.md"] out: List[str] = [] for p in patterns: out.extend(glob.glob(os.path.join(root, p), recursive=True)) # de-dupe + sort return sorted(list(dict.fromkeys(out))) def file_sha256(path: str) -> str: """SHA256 über die **Rohdatei** (Frontmatter + Body).""" h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() def qdrant_scroll(client: QdrantClient, **kwargs) -> Tuple[List, Optional[str]]: """ Wrapper, der sowohl (points, next_offset) als auch (points, next_page, _) Signaturen abdeckt. """ res = client.scroll(**kwargs) if isinstance(res, tuple): if len(res) == 2: return res[0], res[1] if len(res) >= 3: return res[0], res[1] return res, None def fetch_existing_note_hash(client: QdrantClient, prefix: str, note_id: str) -> Optional[str]: """Liest hash_fulltext aus mindnet_notes.payload für eine Note.""" notes_col, _, _ = collection_names(prefix) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) pts, _ = qdrant_scroll( client, collection_name=notes_col, scroll_filter=f, with_payload=True, with_vectors=False, limit=1 ) if not pts: return None pl = getattr(pts[0], "payload", {}) or {} return pl.get("hash_fulltext") def make_note_stub(abs_path: str, vault_root: str) -> Dict: """Minimaler Stub (id/title/path) für build_note_index.""" parsed = read_markdown(abs_path) fm = normalize_frontmatter(parsed.frontmatter or {}) if "id" not in fm or not fm["id"]: raise ValueError(f"Missing id in frontmatter: {abs_path}") rel = os.path.relpath(abs_path, vault_root) return {"note_id": fm["id"], "title": fm.get("title"), "path": rel} def build_vault_index(vault_root: str) -> tuple[Dict, Dict, Dict]: """Index für robuste Wikilink-Auflösung über alle Noten im Vault.""" stubs = [] for p in iter_md(vault_root): try: stubs.append(make_note_stub(p, vault_root)) except Exception: continue return build_note_index(stubs) def purge_for_note(client: QdrantClient, prefix: str, note_id: str, chunk_ids: List[str]) -> None: """ Selektives Purge der alten Daten für **diese** Note: - löscht Chunks (payload.note_id == note_id) - löscht Edges, deren source_id == note_id ODER in chunk_ids Notes selbst werden nicht gelöscht (Upsert reicht). """ notes_col, chunks_col, edges_col = collection_names(prefix) # Chunks löschen f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) # Edges löschen (OR über source_id Werte) should_conds = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))] for cid in chunk_ids: should_conds.append(rest.FieldCondition(key="source_id", match=rest.MatchValue(value=cid))) if should_conds: f_edges = rest.Filter(should=should_conds) client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) # ------------------------------------------------- # Main # ------------------------------------------------- def main(): ap = argparse.ArgumentParser() ap.add_argument("--vault", required=True, help="Pfad zum Vault-Root") ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)") ap.add_argument( "--purge-before-upsert", action="store_true", help="Vor Upsert alte Chunks/Edges **nur für geänderte** Noten löschen.", ) ap.add_argument("--note-id", help="Optional: nur diese Note verarbeiten") args = ap.parse_args() load_dotenv() cfg = QdrantConfig( url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"), api_key=os.getenv("QDRANT_API_KEY", None), prefix=os.getenv("COLLECTION_PREFIX", "mindnet"), dim=int(os.getenv("VECTOR_DIM", "384")), ) client = get_client(cfg) ensure_collections(client, cfg.prefix, cfg.dim) vault_root = os.path.abspath(args.vault) files = iter_md(vault_root) if not files: print("Keine Markdown-Dateien gefunden.", file=sys.stderr) sys.exit(2) # Index einmal bauen (für Linkauflösung bei geänderten Noten) note_index = build_vault_index(vault_root) processed = 0 for abs_path in files: parsed = read_markdown(abs_path) fm = normalize_frontmatter(parsed.frontmatter or {}) try: validate_required_frontmatter(fm) except Exception: # unvollständige Note überspringen continue if args.note_id and fm.get("id") != args.note_id: continue processed += 1 # Änderungsdetektion (Datei-Hash vs. Qdrant) new_hash = file_sha256(abs_path) old_hash = fetch_existing_note_hash(client, cfg.prefix, fm["id"]) changed = (old_hash != new_hash) decision = "skip" if changed: decision = "apply" if args.apply else "dry-run" # Bei "skip" kein teures Chunking/Embedding/Edges nötig if not changed: print(json.dumps({ "note_id": fm["id"], "title": fm.get("title"), "changed": False, "decision": "skip", "path": os.path.relpath(abs_path, vault_root), }, ensure_ascii=False)) continue # --- Ab hier: Nur für geänderte Noten --- # Note-Payload erzeugen from app.core.note_payload import make_note_payload # lazy import note_pl = make_note_payload(parsed, vault_root=vault_root) # Hash im Payload mitschreiben (Schema erlaubt hash_fulltext) note_pl["hash_fulltext"] = new_hash validate_note_payload(note_pl) # Chunking & Payloads chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) # Embeddings texts = [c.get("text") or c.get("content") or "" for c in chunk_pls] vectors = embed_texts(texts) # Edges ableiten (Note-/Chunk-Level) edges = derive_wikilink_edges(note_pl, chunk_pls, note_index) # Purge (nur wenn apply + Option gesetzt) if args.apply and args.purge_before_upsert: # Chunk-IDs bestimmen (für Edge-Purge by source_id) chunk_ids = [] for i, ch in enumerate(chunk_pls, start=1): cid = ch.get("chunk_id") or ch.get("id") or f"{fm['id']}#{i}" ch["chunk_id"] = cid # sicherstellen chunk_ids.append(cid) purge_for_note(client, cfg.prefix, fm["id"], chunk_ids) # Upserts (nur Apply) if args.apply: notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec=None, dim=cfg.dim) upsert_batch(client, notes_col, note_pts) chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors) upsert_batch(client, chunks_col, chunk_pts) edges_col, edge_pts = points_for_edges(cfg.prefix, edges) upsert_batch(client, edges_col, edge_pts) # Logging geänderte Note print(json.dumps({ "note_id": fm["id"], "title": fm.get("title"), "chunks": len(chunk_pls), "edges": len(edges), "changed": True, "decision": decision, "path": note_pl["path"], }, ensure_ascii=False)) print(f"Done. Processed notes: {processed}") if __name__ == "__main__": main()