From 47e6d56b2121df7ce491d5e0cde5083ecaa44868 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 6 Sep 2025 14:04:46 +0200 Subject: [PATCH] scripts/import_markdown.py aktualisiert --- scripts/import_markdown.py | 388 +++++++++++++++++++------------------ 1 file changed, 202 insertions(+), 186 deletions(-) diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index 7aafd05..ddb93ac 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -1,261 +1,277 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Script: scripts/import_markdown.py -Version: v2.4.1 (2025-09-05) +Script: scripts/import_markdown.py +Version: 0.6.0 (2025-09-06) +Autor: mindnet / Architektur Datenimporte & Sync -Beschreibung - Importiert Markdown-Notizen in Qdrant (Notes, Chunks, Edges) mit Delta-Detection. - - Chunking + Embedding (MiniLM 384d, externer Embed-Server) - - Edges direkt beim Import aus Wikilinks ([[…]]) ableiten (inkl. references_at) - - Idempotenz via stabile UUIDv5-IDs und hash_fulltext - - Create/Update/Skip pro Note: - * Unverändert (hash_fulltext gleich) ⇒ Skip - * Geändert ⇒ Chunks & Edges der Note purge + Replace (Upsert) - - Dry-Run löscht/ändert nichts; zeigt die Entscheidung je Note +Kurzbeschreibung +--------------- +Importiert Markdown-Notizen aus einem Obsidian-ähnlichen Vault in Qdrant: +- Validiert Frontmatter / Note-Payload. +- Chunking + Embeddings. +- Leitet Edges direkt beim Import aus [[Wikilinks]] ab: + - 'references' (Note→Note) + - 'references_at' (Chunk→Note) + - 'backlink' (Note←Note) nur für Note→Note-Kanten. -Aufruf - python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id ID] - [--embed-note] [--force-replace] +Neu in 0.6.0 +------------ +- Option `--purge-before-upsert`: löscht für die jeweils verarbeitete Note + *vor* dem Upsert alle zugehörigen Chunks und Edges in Qdrant (selektiv!), + um Leichen nach Re-Chunking zu vermeiden. +- Robuste Link-Auflösung via Note-Index (ID / Titel-Slug / Datei-Slug) + konsistent zu `derive_edges.py`. + +Aufrufbeispiele +--------------- +Dry-Run (keine Schreibzugriffe): + python3 -m scripts.import_markdown --vault ./vault + +Nur eine bestimmte Note: + python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo + +Apply (schreiben) mit Purge: + python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert Parameter - --vault Pfad zum Obsidian-Vault (erforderlich) - --apply Ohne Flag: Dry-Run (nur JSON-Zeilen). Mit Flag: schreibt in Qdrant. - --note-id Nur eine spezifische Note-ID verarbeiten (Filter) - --embed-note Optional: Note-Volltext zusätzlich einbetten - --force-replace Erzwingt Neuaufbau von Chunks/Edges der Note (auch wenn Hash unverändert) +--------- +--vault PATH : Pflicht. Root-Verzeichnis des Vaults. +--apply : Wenn gesetzt, werden Upserts durchgeführt (sonst Dry-Run). +--purge-before-upsert : Wenn gesetzt, werden vor dem Upsert (nur bei --apply) + alte Chunks und Edges dieser Note in Qdrant gelöscht. +--note-id ID : Optional, verarbeitet nur diese eine Note. -Hinweise - - Im venv arbeiten: `source .venv/bin/activate` - - Embed-Server muss laufen (http://127.0.0.1:8990) - - Qdrant via ENV: QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM +Umgebungsvariablen (.env) +------------------------- +QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM +Standard: url=http://127.0.0.1:6333, prefix=mindnet, dim=384 -Changelog - v2.4.1: FIX – Kompatibilität zu verschiedenen qdrant-client Versionen: - `scroll()`-Rückgabe kann 2- oder 3-teilig sein → robustes Unpacking. - v2.4.0: NEU – Delta-Detection über hash_fulltext; Skip/Replace-Entscheidung. - Purge bei Updates: löscht Chunks & Edges der Quelle, dann Upsert. - Dry-Run garantiert ohne Mutationen. - v2.3.1: FIX – Für derive_wikilink_edges werden echte Chunk-Texte übergeben - ({"chunk_id","text"}) → erzeugt `references_at`. - v2.3.0: Umstellung auf app.core.derive_edges; Edge-IDs inkl. Occurrence. +Kompatibilität +-------------- +- Nutzt die bestehenden Kernmodule: + app.core.parser (read_markdown, normalize_frontmatter, validate_required_frontmatter) + app.core.validate_note (validate_note_payload) + app.core.chunker (assemble_chunks) + app.core.chunk_payload (make_chunk_payloads) + app.core.embed (embed_texts) + app.core.qdrant (QdrantConfig, get_client, ensure_collections) + app.core.qdrant_points (points_for_note, points_for_chunks, points_for_edges, upsert_batch) + app.core.derive_edges (build_note_index, derive_wikilink_edges) + +Änderungshinweise vs. früherer Importer +--------------------------------------- +- Alte, globale Lösch-Workarounds entfallen. Selektives Purge ist jetzt optional und sicher. +- Edges werden nur noch in der neuen, einheitlichen Struktur erzeugt. """ from __future__ import annotations -import argparse, os, glob, json, sys, hashlib -from typing import Optional, Tuple, List +import argparse +import glob +import json +import os +import sys +from typing import List, Dict + from dotenv import load_dotenv from qdrant_client.http import models as rest -from qdrant_client import QdrantClient -from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter -from app.core.note_payload import make_note_payload +# Kern-Bausteine (vorhanden in eurem Projekt) +from app.core.parser import ( + read_markdown, + normalize_frontmatter, + validate_required_frontmatter, +) from app.core.validate_note import validate_note_payload from app.core.chunker import assemble_chunks from app.core.chunk_payload import make_chunk_payloads -from app.core.embed import embed_texts, embed_one -from app.core.qdrant import QdrantConfig, ensure_collections, get_client -from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch +from app.core.embed import embed_texts +from app.core.qdrant import QdrantConfig, ensure_collections, get_client, collection_names +from app.core.qdrant_points import ( + points_for_note, + points_for_chunks, + points_for_edges, + upsert_batch, +) from app.core.derive_edges import build_note_index, derive_wikilink_edges -# ------------------------------- -# Utility -# ------------------------------- -def iter_md(root: str, exclude_dirs=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]: - files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)] - out = [] +# ------------------------------------------------- +# Hilfsfunktionen +# ------------------------------------------------- + +def iter_md(root: str) -> List[str]: + patterns = ["**/*.md", "*.md"] + out: List[str] = [] + for p in patterns: + out.extend(glob.glob(os.path.join(root, p), recursive=True)) + return sorted(list(dict.fromkeys(out))) # de-dupe + sort + + +def make_note_stub(abs_path: str, vault_root: str) -> Dict: + """ + Erstellt einen minimalen Note-Stub für den Index (build_note_index): + { note_id, title, path } + """ + parsed = read_markdown(abs_path) + fm = normalize_frontmatter(parsed.frontmatter or {}) + # Validierung minimal: wir brauchen id + title (title optional für Slug-Auflösung) + if "id" not in fm or not fm["id"]: + raise ValueError(f"Missing id in frontmatter: {abs_path}") + rel = os.path.relpath(abs_path, vault_root) + return {"note_id": fm["id"], "title": fm.get("title"), "path": rel} + + +def build_vault_index(vault_root: str) -> tuple[Dict, Dict, Dict]: + """ + Liest alle Noten ein und baut den Dreifach-Index für Wikilink-Auflösung. + """ + files = iter_md(vault_root) + stubs = [] for p in files: - pn = p.replace("\\","/") - if any(ex in pn for ex in exclude_dirs): + try: + stubs.append(make_note_stub(p, vault_root)) + except Exception: + # Notiz ohne id → wird vom Importer später ohnehin übersprungen continue - out.append(p) - return out + return build_note_index(stubs) -def sha256_hex(text: str) -> str: - return hashlib.sha256(text.encode("utf-8")).hexdigest() -def collection_names(prefix: str) -> Tuple[str, str, str]: - return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" - -def _scroll(client: QdrantClient, **kwargs): +def purge_for_note(client, prefix: str, note_id: str, chunk_ids: List[str]) -> Dict[str, int]: """ - Wrapper für client.scroll, der 2-teilige und 3-teilige Rückgaben unterstützt. - Neuere qdrant-client Versionen liefern (points, next_page), ältere evtl. (points, offset, next_page). + Selektives Purge für die aktuelle Note: + - Chunks: alle mit payload.note_id == note_id + - Edges: alle mit payload.source_id == note_id ODER == einem der chunk_ids + - Notes: werden nicht gelöscht (Upsert überschreibt Payload/Vektor) """ - res = client.scroll(**kwargs) - if isinstance(res, tuple): - if len(res) == 2: - points, _ = res - return points - elif len(res) == 3: - points, _, _ = res - return points - # Fallback: wenn sich API ändert, versuchen wir, wie eine Sequenz zuzugreifen - try: - return res[0] - except Exception: - return [] + notes_col, chunks_col, edges_col = collection_names(prefix) + counts = {"chunks_deleted": 0, "edges_deleted": 0} -def fetch_existing_note_hash(client: QdrantClient, prefix: str, note_id: str) -> Optional[str]: - """Liest hash_fulltext der Note aus Qdrant (falls vorhanden).""" - notes_col, _, _ = collection_names(prefix) - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - pts = _scroll(client, collection_name=notes_col, scroll_filter=f, with_payload=True, with_vectors=False, limit=1) - if not pts: - return None - pl = (pts[0].payload or {}) - return pl.get("hash_fulltext") or None # kann bei Altbeständen fehlen + # Chunks löschen (Filter must: note_id == X) + f_chunks = rest.Filter( + must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))] + ) + res_chunks = client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) + counts["chunks_deleted"] = getattr(res_chunks, "status", None) and 0 or 0 # Qdrant liefert keine count hier -def purge_note_edges(client: QdrantClient, prefix: str, source_note_id: str) -> None: - """ - Löscht Edges der Quelle: - - alle mit source_id == note - - alle Backlinks, die auf die Quelle zeigen (kind=backlink & target_id=note) - """ - _, _, edges_col = collection_names(prefix) - cond_source = rest.FieldCondition(key="source_id", match=rest.MatchValue(value=source_note_id)) - cond_kind = rest.FieldCondition(key="kind", match=rest.MatchValue(value="backlink")) - cond_target = rest.FieldCondition(key="target_id", match=rest.MatchValue(value=source_note_id)) - filt = rest.Filter(should=[cond_source, rest.Filter(must=[cond_kind, cond_target])]) - client.delete(collection_name=edges_col, points_selector=filt, wait=True) + # Edges löschen: OR über Note-ID und alle Chunk-IDs + should_conds = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))] + for cid in chunk_ids: + should_conds.append(rest.FieldCondition(key="source_id", match=rest.MatchValue(value=cid))) -def purge_note_chunks(client: QdrantClient, prefix: str, note_id: str) -> None: - """Löscht alle Chunks einer Note (payload.note_id == note_id).""" - _, chunks_col, _ = collection_names(prefix) - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - client.delete(collection_name=chunks_col, points_selector=f, wait=True) + f_edges = rest.Filter(should=should_conds) if should_conds else None + if f_edges is not None: + client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) -# ------------------------------- -# Hauptlogik -# ------------------------------- + return counts + + +# ------------------------------------------------- +# Main +# ------------------------------------------------- def main(): - load_dotenv() ap = argparse.ArgumentParser() - ap.add_argument("--vault", required=True, help="Pfad zum Obsidian Vault (z.B. ./vault)") + ap.add_argument("--vault", required=True, help="Pfad zum Vault-Root") ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)") - ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten") - ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)") - ap.add_argument("--force-replace", action="store_true", help="Erzwingt Purge+Replace der Note (auch wenn Hash gleich)") + ap.add_argument( + "--purge-before-upsert", + action="store_true", + help="Vor Upsert alte Chunks/Edges der aktuellen Note löschen (nur mit --apply wirksam).", + ) + ap.add_argument("--note-id", help="Optional: nur diese Note verarbeiten") args = ap.parse_args() - # Qdrant + load_dotenv() cfg = QdrantConfig( url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"), - api_key=os.getenv("QDRANT_API_KEY") or None, + api_key=os.getenv("QDRANT_API_KEY", None), prefix=os.getenv("COLLECTION_PREFIX", "mindnet"), - dim=int(os.getenv("VECTOR_DIM","384")), + dim=int(os.getenv("VECTOR_DIM", "384")), ) client = get_client(cfg) ensure_collections(client, cfg.prefix, cfg.dim) - root = os.path.abspath(args.vault) - files = iter_md(root) + vault_root = os.path.abspath(args.vault) + files = iter_md(vault_root) if not files: - print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2) + print("Keine Markdown-Dateien gefunden.", file=sys.stderr) + sys.exit(2) - # --- Vorab: Note-Index für Linkauflösung (by id/slug/path) --- - note_stubs = [] - for path in files: - parsed = read_markdown(path) - fm = normalize_frontmatter(parsed.frontmatter) + # 1) Note-Index über den gesamten Vault (für robuste Link-Auflösung) + note_index = build_vault_index(vault_root) + + processed = 0 + for abs_path in files: + parsed = read_markdown(abs_path) + fm = normalize_frontmatter(parsed.frontmatter or {}) try: validate_required_frontmatter(fm) except Exception: - continue - if args.note_id and fm.get("id") != args.note_id: - continue - rel = os.path.relpath(parsed.path, root).replace("\\","/") - note_stubs.append({"note_id": fm["id"], "title": fm.get("title",""), "path": rel}) - note_index = build_note_index(note_stubs) - - total_notes = 0 - for path in files: - parsed = read_markdown(path) - fm = normalize_frontmatter(parsed.frontmatter) - try: - validate_required_frontmatter(fm) # Pflichtfelder lt. Schema/Design - except Exception: + # unvollständige Note überspringen continue if args.note_id and fm.get("id") != args.note_id: continue - total_notes += 1 - note_id = fm["id"] + processed += 1 - # --- Delta-Detection --- - fulltext = parsed.body - new_hash = sha256_hex(fulltext) - old_hash = fetch_existing_note_hash(client, cfg.prefix, note_id) - changed = (old_hash != new_hash) or (old_hash is None) or args.force_replace - - # Note-Payload - note_pl = make_note_payload(parsed, vault_root=root) - note_pl["fulltext"] = fulltext # für derive_edges (references) - note_pl["hash_fulltext"] = new_hash # Schema-Feld vorhanden + # --- Note-Payload --- + from app.core.note_payload import make_note_payload # lazy import (bestehende Funktion) + note_pl = make_note_payload(parsed, vault_root=vault_root) validate_note_payload(note_pl) - # Früher Exit (Dry-Run/Skip) - if not changed: - print(json.dumps({ - "note_id": note_id, - "title": fm["title"], - "changed": False, - "decision": "skip", - "path": note_pl["path"] - }, ensure_ascii=False)) - continue - - # Chunks (inkl. Texte für references_at) - chunks = assemble_chunks(note_id, fulltext, fm.get("type", "concept")) + # --- Chunking & Payloads --- + chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) - chunks_for_links = [ - {"chunk_id": (pl.get("chunk_id") or pl.get("id") or f"{note_id}#{i+1}"), - "text": chunks[i].text} - for i, pl in enumerate(chunk_pls) - if i < len(chunks) - ] - # Embeddings (Chunks) - texts = [ch.text for ch in chunks] + # --- Embeddings --- + texts = [c.get("text") or c.get("content") or "" for c in chunk_pls] vectors = embed_texts(texts) - # Optional: Note-Vektor - note_vec = embed_one(fulltext) if args.embed_note else None + # --- Edge-Ableitung (direkt) --- + edges = derive_wikilink_edges(note_pl, chunk_pls, note_index) - # Kanten (Note- & Chunk-Ebene) - edges = derive_wikilink_edges(note_pl, chunks_for_links, note_index) + # --- Ausgabe je Note (Entscheidung) --- + decision = "apply" if args.apply else "dry-run" - # Dry-Run-Ausgabe - print(json.dumps({ - "note_id": note_id, - "title": fm["title"], - "chunks": len(chunk_pls), - "edges": len(edges), - "changed": True, - "decision": "replace" if args.apply else "dry-run", - "path": note_pl["path"] - }, ensure_ascii=False)) + # --- Purge vor Upsert (nur wenn --apply) --- + if args.apply and args.purge_before_upsert: + # Chunk-IDs (neu) ermitteln → für Edge-Purge by source_id + chunk_ids = [] + for i, ch in enumerate(chunk_pls, start=1): + cid = ch.get("chunk_id") or ch.get("id") or f"{fm['id']}#{i}" + ch["chunk_id"] = cid # sicherstellen + chunk_ids.append(cid) + purge_for_note(client, cfg.prefix, fm["id"], chunk_ids) + + # --- Upserts (nur bei --apply) --- if args.apply: - # Purge alte Daten der Note (Chunks + Edges), dann Upsert - purge_note_chunks(client, cfg.prefix, note_id) - purge_note_edges(client, cfg.prefix, note_id) - - # Notes upsert - notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim) + # Note + notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec=None, dim=cfg.dim) upsert_batch(client, notes_col, note_pts) - # Chunks upsert + # Chunks chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors) upsert_batch(client, chunks_col, chunk_pts) - # Edges upsert + # Edges edges_col, edge_pts = points_for_edges(cfg.prefix, edges) upsert_batch(client, edges_col, edge_pts) - print(f"Done. Processed notes: {total_notes}") + # Logging pro Note + print(json.dumps({ + "note_id": fm["id"], + "title": fm.get("title"), + "chunks": len(chunk_pls), + "edges": len(edges), + "changed": True, # Hash-/Zeitvergleich kann optional hier ergänzt werden + "decision": decision, + "path": note_pl["path"], + }, ensure_ascii=False)) + + print(f"Done. Processed notes: {processed}") + if __name__ == "__main__": main()