#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ===================================================================== scripts/import_markdown.py — mindnet · WP-03 (Version 3.9.0) ===================================================================== Zweck: - Importiert Obsidian-Markdown-Dateien (Vault) in Qdrant: * Notes (mit optionaler Schema-Validierung, Hash-Erkennung) * Chunks (window & text, Overlap-Metadaten) * Edges (belongs_to, prev/next, references, backlink optional, depends_on/assigned_to) - Idempotenz über stabile IDs (note_id, chunk_id) & Hash-Signaturen (Option C). - **Optional**: Embeddings für Note/Chunks via HTTP-Endpoint (/embed). - **Optional**: JSON-Schema-Validierung gegen bereitgestellte Schemata. - **Optional**: Note-Scope-References zusätzlich zu Chunk-Refs. Highlights ggü. Minimal-Variante: - Hash Option C (body/frontmatter/full × parsed/raw × normalize) - Baseline-Modus (fehlende Signaturen initial schreiben) - Purge vor Upsert (nur geänderte Note: alte Chunks/Edges löschen) - UTF-8 fehlertoleranter Parser (Fallback Latin-1 → Re-encode) - Type-Registry: dynamische Chunk-Profile (optional) - Include/Exclude & Single-File-Import (--path) & Skip-Regeln - embedding_exclude respektiert - NDJSON-Logging & Abschlussstatistik Aufrufe (Beispiele): # Dry-Run (zeigt Entscheidungen) python3 -m scripts.import_markdown --vault ./vault --prefix mindnet # Apply + Purge für geänderte Notes python3 -m scripts.import_markdown --vault ./vault --prefix mindnet --apply --purge-before-upsert # Note-Scope-Refs zusätzlich anlegen python3 -m scripts.import_markdown --vault ./vault --apply --note-scope-refs # Embeddings aktivieren (Endpoint kann per ENV überschrieben werden) python3 -m scripts.import_markdown --vault ./vault --apply --with-embeddings # Schema-Validierung (verwende die *.schema.json-Dateien) python3 -m scripts.import_markdown --vault ./vault --apply --validate-schemas \ --note-schema ./schemas/note.schema.json \ --chunk-schema ./schemas/chunk.schema.json \ --edge-schema ./schemas/edge.schema.json # Nur eine Datei importieren python3 -m scripts.import_markdown --path ./vault/40_concepts/concept-alpha.md --apply # Version anzeigen python3 -m scripts.import_markdown --version ENV (Auszug): COLLECTION_PREFIX Prefix der Qdrant-Collections (Default: mindnet) QDRANT_URL / QDRANT_API_KEY Qdrant-Verbindung # Hash-Steuerung MINDNET_HASH_COMPARE body | frontmatter | full (Default: body) MINDNET_HASH_SOURCE parsed | raw (Default: parsed) MINDNET_HASH_NORMALIZE canonical | whitespace | none (Default: canonical) # Embeddings (nur wenn --with-embeddings) EMBED_URL z. B. http://127.0.0.1:8000/embed EMBED_MODEL Freitext (nur Logging) EMBED_BATCH Batchgröße (Default: 16) Abwärtskompatibilität: - Felder & Flows aus v3.7.x bleiben erhalten. - Neue Features sind optional (default OFF). - Bestehende IDs/Signaturen unverändert. Lizenz: MIT (projektintern) """ __version__ = "3.9.0" import os import sys import re import json import argparse import pathlib from typing import Any, Dict, List, Optional, Iterable, Tuple # Core-Bausteine (bestehend) from app.core.parser import read_markdown from app.core.note_payload import make_note_payload from app.core.chunk_payload import make_chunk_payloads from app.core.derive_edges import build_edges_for_note from app.core.qdrant import get_client, QdrantConfig from app.core.qdrant_points import ( ensure_collections_for_prefix, upsert_notes, upsert_chunks, upsert_edges, delete_chunks_of_note, delete_edges_of_note, fetch_note_hash_signature, store_note_hashes_signature, ) from app.core.type_registry import load_type_registry # optional # --------------------------- # Hash-Option-C Steuerung # --------------------------- DEFAULT_COMPARE = os.environ.get("MINDNET_HASH_COMPARE", "body").lower() DEFAULT_SOURCE = os.environ.get("MINDNET_HASH_SOURCE", "parsed").lower() DEFAULT_NORM = os.environ.get("MINDNET_HASH_NORMALIZE", "canonical").lower() VALID_COMPARE = {"body", "frontmatter", "full"} VALID_SOURCE = {"parsed", "raw"} VALID_NORM = {"canonical", "whitespace", "none"} def _active_hash_key(compare: str, source: str, normalize: str) -> str: c = compare if compare in VALID_COMPARE else "body" s = source if source in VALID_SOURCE else "parsed" n = normalize if normalize in VALID_NORM else "canonical" return f"{c}:{s}:{n}" # --------------------------- # Schema-Validierung (optional) # --------------------------- def _load_json(path: Optional[str]) -> Optional[Dict[str, Any]]: if not path: return None p = pathlib.Path(path) if not p.exists(): return None with p.open("r", encoding="utf-8") as f: return json.load(f) def _validate(obj: Dict[str, Any], schema: Optional[Dict[str, Any]], kind: str) -> List[str]: """Grobe Validierung ohne hard dependency auf jsonschema; prüft Basisfelder.""" if not schema: return [] errs: List[str] = [] # sehr einfache Checks auf required: req = schema.get("required", []) for k in req: if k not in obj: errs.append(f"{kind}: missing required '{k}'") # type=object etc. sparen wir uns bewusst (leichtgewichtig). return errs # --------------------------- # Embedding (optional) # --------------------------- def _post_json(url: str, payload: Any, timeout: float = 60.0) -> Any: """Einfacher HTTP-Client ohne externe Abhängigkeiten.""" import urllib.request import urllib.error data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) except urllib.error.URLError as e: raise RuntimeError(f"embed http error: {e}") def _embed_texts(url: str, texts: List[str], batch: int = 16) -> List[List[float]]: out: List[List[float]] = [] for i in range(0, len(texts), batch): chunk = texts[i:i+batch] resp = _post_json(url, {"inputs": chunk}) vectors = resp.get("embeddings") or resp.get("data") or resp # flexibel if not isinstance(vectors, list): raise RuntimeError("embed response malformed") out.extend(vectors) return out # --------------------------- # Skip-Regeln & Dateiauswahl # --------------------------- SILVERBULLET_BASENAMES = {"CONFIG.md", "index.md"} # werden explizit übersprungen def _should_skip_md(path: str) -> bool: base = os.path.basename(path).lower() if base in {b.lower() for b in SILVERBULLET_BASENAMES}: return True return False def _list_md_files(root: str, include: Optional[str] = None, exclude: Optional[str] = None) -> List[str]: files: List[str] = [] inc_re = re.compile(include) if include else None exc_re = re.compile(exclude) if exclude else None for dirpath, _, filenames in os.walk(root): for fn in filenames: if not fn.lower().endswith(".md"): continue full = os.path.join(dirpath, fn) rel = os.path.relpath(full, root).replace("\\", "/") if _should_skip_md(full): continue if inc_re and not inc_re.search(rel): continue if exc_re and exc_re.search(rel): continue files.append(full) files.sort() return files # --------------------------- # CLI # --------------------------- def _args() -> argparse.Namespace: ap = argparse.ArgumentParser(description="Import Obsidian Markdown → Qdrant (Notes/Chunks/Edges).") gsrc = ap.add_mutually_exclusive_group(required=True) gsrc.add_argument("--vault", help="Root-Verzeichnis des Vaults") gsrc.add_argument("--path", help="Nur eine einzelne Markdown-Datei importieren") ap.add_argument("--prefix", help="Collection-Prefix (ENV: COLLECTION_PREFIX, Default: mindnet)") ap.add_argument("--apply", action="store_true", help="Änderungen in Qdrant schreiben (sonst Dry-Run)") ap.add_argument("--purge-before-upsert", action="store_true", help="Bei geänderter Note: alte Chunks/Edges löschen (nur diese Note)") ap.add_argument("--note-scope-refs", action="store_true", help="Auch Note-Scope 'references' + 'backlink' erzeugen") ap.add_argument("--baseline-modes", action="store_true", help="Fehlende Hash-Signaturen initial speichern") # Filter ap.add_argument("--include", help="Regex auf Relativpfad (nur passende Dateien)") ap.add_argument("--exclude", help="Regex auf Relativpfad (diese Dateien überspringen)") # Validierung ap.add_argument("--validate-schemas", action="store_true", help="JSON-Schemata prüfen (leichtgewichtig)") ap.add_argument("--note-schema", help="Pfad zu note.schema.json") ap.add_argument("--chunk-schema", help="Pfad zu chunk.schema.json") ap.add_argument("--edge-schema", help="Pfad zu edge.schema.json") # Embeddings (optional) ap.add_argument("--with-embeddings", action="store_true", help="Embeddings für Note & Chunks erzeugen") ap.add_argument("--embed-url", help="Override EMBED_URL (Default aus ENV)") ap.add_argument("--embed-batch", type=int, default=int(os.environ.get("EMBED_BATCH", "16")), help="Embedding-Batchgröße") ap.add_argument("--version", action="store_true", help="Version anzeigen und beenden") return ap.parse_args() # --------------------------- # Hauptlogik # --------------------------- def main() -> None: args = _args() if args.version: print(f"import_markdown.py {__version__}") sys.exit(0) # Qdrant prefix = args.prefix or os.environ.get("COLLECTION_PREFIX", "mindnet") qc = QdrantConfig.from_env_or_default() client = get_client(qc) notes_col, chunks_col, edges_col = ensure_collections_for_prefix(client, prefix) # Type-Registry (optional, fällt auf Default zurück) type_reg = load_type_registry(silent=True) # Hash-Modus aktiv compare = DEFAULT_COMPARE source = DEFAULT_SOURCE norm = DEFAULT_NORM active_key = _active_hash_key(compare, source, norm) # Schemata (optional) note_schema = _load_json(args.note_schema) if args.validate_schemas else None chunk_schema = _load_json(args.chunk_schema) if args.validate_schemas else None edge_schema = _load_json(args.edge_schema) if args.validate_schemas else None # Embeddings (optional) embed_enabled = bool(args.with_embeddings) embed_url = args.embed_url or os.environ.get("EMBED_URL", "").strip() if embed_enabled and not embed_url: print(json.dumps({"warn": "with-embeddings active, but EMBED_URL not configured — embeddings skipped"})) embed_enabled = False # Dateiliste files: List[str] = [] if args.path: if not os.path.isfile(args.path): print(json.dumps({"path": args.path, "error": "not a file"})) sys.exit(1) if _should_skip_md(args.path): print(json.dumps({"path": args.path, "skipped": "by rule"})) sys.exit(0) files = [os.path.abspath(args.path)] vault_root = os.path.dirname(os.path.abspath(args.path)) else: if not os.path.isdir(args.vault): print(json.dumps({"vault": args.vault, "error": "not a directory"})) sys.exit(1) vault_root = os.path.abspath(args.vault) files = _list_md_files(vault_root, include=args.include, exclude=args.exclude) processed = 0 stats = {"notes": 0, "chunks": 0, "edges": 0, "changed": 0, "skipped": 0, "embedded": 0} for path in files: rel_path = os.path.relpath(path, vault_root).replace("\\", "/") parsed = read_markdown(path) # Note-Payload (inkl. fulltext, hashes[...] etc.) note_pl = make_note_payload(parsed, vault_root=vault_root) if not isinstance(note_pl, dict): print(json.dumps({ "path": path, "note_id": getattr(parsed, "id", ""), "error": "make_note_payload returned non-dict", "returned_type": type(note_pl).__name__ })) stats["skipped"] += 1 continue # Exclude via Frontmatter? if str(note_pl.get("embedding_exclude", "false")).lower() in {"1", "true", "yes"}: # wir importieren dennoch Note/Chunks/Edges, aber **ohne** Embeddings embedding_allowed = False else: embedding_allowed = True # Type-Profil note_type = str(note_pl.get("type", "concept") or "concept") profile = type_reg.get("types", {}).get(note_type, {}).get("chunk_profile", None) # Chunks erzeugen chunks = make_chunk_payloads( note_id=note_pl["note_id"], body=note_pl.get("fulltext", ""), note_type=note_type, profile=profile ) # Edges edges: List[Dict[str, Any]] = [] try: edges = build_edges_for_note(note_payload=note_pl, chunks=chunks, add_note_scope_refs=args.note_scope_refs) except Exception as e: print(json.dumps({ "path": path, "note_id": note_pl["note_id"], "error": f"build_edges_for_note failed: {getattr(e, 'args', [''])[0]}" })) edges = [] # Schema-Checks (weich) if args.validate_schemas: n_err = _validate(note_pl, note_schema, "note") for c in chunks: n_err += _validate(c, chunk_schema, "chunk") for ed in edges: n_err += _validate(ed, edge_schema, "edge") if n_err: print(json.dumps({"note_id": note_pl["note_id"], "schema_warnings": n_err}, ensure_ascii=False)) # Hash-Vergleich prev_sig = fetch_note_hash_signature(client, notes_col, note_pl["note_id"], active_key) curr_sig = note_pl.get("hashes", {}).get(active_key, "") is_changed = (prev_sig != curr_sig) # Baseline: fehlende aktive Signatur speichern if args.baseline_modes and not prev_sig and curr_sig and args.apply: store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig) # Embeddings (optional; erst NACH Änderungserkennung, um unnötige Calls zu sparen) if embed_enabled and embedding_allowed: try: texts = [note_pl.get("fulltext", "")] note_vecs = _embed_texts(embed_url, texts, batch=max(1, int(args.embed_batch))) note_pl["embedding"] = note_vecs[0] if note_vecs else None # Chunk-Embeddings chunk_texts = [c.get("window") or c.get("text") or "" for c in chunks] if chunk_texts: chunk_vecs = _embed_texts(embed_url, chunk_texts, batch=max(1, int(args.embed_batch))) for c, v in zip(chunks, chunk_vecs): c["embedding"] = v stats["embedded"] += 1 except Exception as e: print(json.dumps({"note_id": note_pl["note_id"], "warn": f"embedding failed: {e}"})) # Apply/Upsert decision = "dry-run" if args.apply: if is_changed and args.purge_before_upsert: delete_chunks_of_note(client, chunks_col, note_pl["note_id"]) delete_edges_of_note(client, edges_col, note_pl["note_id"]) upsert_notes(client, notes_col, [note_pl]) if chunks: upsert_chunks(client, chunks_col, chunks) if edges: upsert_edges(client, edges_col, edges) if curr_sig: store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig) decision = ("apply" if is_changed else "apply-skip-unchanged") else: decision = "dry-run" # Log print(json.dumps({ "note_id": note_pl["note_id"], "title": note_pl.get("title"), "chunks": len(chunks), "edges": len(edges), "changed": bool(is_changed), "decision": decision, "path": rel_path, "hash_mode": compare, "hash_normalize": norm, "hash_source": source, "prefix": prefix }, ensure_ascii=False)) stats["notes"] += 1 stats["chunks"] += len(chunks) stats["edges"] += len(edges) if is_changed: stats["changed"] += 1 processed += 1 print(f"Done. Processed notes: {processed}") print(json.dumps({"stats": stats}, ensure_ascii=False)) if __name__ == "__main__": main()