#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ scripts/import_markdown.py Purpose ------- Import Markdown notes from a vault into Qdrant with idempotent upserts. This version fixes the issue where `retriever_weight` for *notes* did not reflect the values from `types.yaml`. It does so by building the note payload from a dict containing the normalized frontmatter, and by ensuring the Type‑Registry is loaded via ENV (`MINDNET_TYPES_FILE`, default: ./config/types.yaml). Key behaviors ------------- - Deterministic, idempotent upserts for notes / chunks / edges - Optional embeddings for chunks - Optional sync‑deletes (vault → Qdrant) - Ensures collections and payload indices exist - Honors `retriever_weight` and `chunk_profile` from types.yaml for both notes and chunks CLI examples ------------ # Apply + purge python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert --prefix "$COLLECTION_PREFIX" # Sync-Deletes (dry-run then apply) python3 -m scripts.import_markdown --vault ./vault --sync-deletes python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply Environment ----------- - QDRANT_URL | QDRANT_HOST/QDRANT_PORT | QDRANT_API_KEY - COLLECTION_PREFIX (default: mindnet); overridable via --prefix - VECTOR_DIM (default: 384) - MINDNET_TYPES_FILE (default: ./config/types.yaml) - MINDNET_NOTE_SCOPE_REFS=true|false (default: false) - MINDNET_HASH_SOURCE=parsed|raw (default: parsed) - MINDNET_HASH_NORMALIZE=canonical|none (default: canonical) """ from __future__ import annotations import argparse import json import os import sys from typing import Dict, List, Optional, Tuple, Any, Set from dotenv import load_dotenv from qdrant_client.http import models as rest # --- Project imports --- from app.core.parser import ( read_markdown, normalize_frontmatter, validate_required_frontmatter, ) from app.core.note_payload import make_note_payload from app.core.chunk_payload import make_chunk_payloads try: from app.core.derive_edges import build_edges_for_note except Exception: # pragma: no cover from app.core.edges import build_edges_for_note # type: ignore from app.core.qdrant import ( QdrantConfig, get_client, ensure_collections, ensure_payload_indexes, ) from app.core.qdrant_points import ( points_for_chunks, points_for_note, points_for_edges, upsert_batch, ) # embeddings optional try: from app.core.embed import embed_texts except Exception: # pragma: no cover embed_texts = None # ---------------------- helpers ---------------------- def _env(name: str, default: Optional[str] = None) -> str: v = os.getenv(name, default if default is not None else "") return v if v is not None else "" def _resolve_mode(val: Optional[str]) -> str: v = (val or _env("MINDNET_HASH_COMPARE", "body")).strip().lower() return v if v in ("body","frontmatter","full") else "body" def _iter_md(root: str) -> List[str]: files: List[str] = [] for dirpath, _, filenames in os.walk(root): for fn in filenames: if fn.lower().endswith(".md"): files.append(os.path.join(dirpath, fn)) files.sort() return files def _types_file_default() -> str: # default to ./config/types.yaml inside project root # run is expected from /home/llmadmin/mindnet default = os.path.abspath("./config/types.yaml") return _env("MINDNET_TYPES_FILE", default) def load_type_registry() -> Dict[str, Any]: import yaml # local import path = _types_file_default() try: with open(path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} t = data.get("types") or {} return t if isinstance(t, dict) else {} except Exception: return {} def resolve_note_type(note_type: Optional[str], reg: Dict[str, Any]) -> str: if not note_type: return "concept" s = str(note_type).strip() return s if s in reg else s # allow free types if not configured def effective_chunk_profile(note_type: str, reg: Dict[str, Any]) -> Optional[str]: try: v = reg.get(note_type, {}).get("chunk_profile") return str(v) if v is not None else None except Exception: return None def effective_retriever_weight(note_type: str, reg: Dict[str, Any]) -> Optional[float]: try: v = reg.get(note_type, {}).get("retriever_weight") return float(v) if v is not None else None except Exception: return None def list_qdrant_note_ids(client, prefix: str) -> Set[str]: """Collect all note_ids from Qdrant mindnet_notes payloads.""" from qdrant_client import QdrantClient notes = f"{prefix}_notes" out: Set[str] = set() # scroll with page size offset = None while True: res = client.scroll(collection_name=notes, with_payload=True, with_vectors=False, limit=2048, offset=offset) pts = getattr(res, "points", None) or res[0] # API compatibility next_off = getattr(res, "next_page_offset", None) or res[1] for p in pts: pl = getattr(p, "payload", {}) or {} nid = pl.get("note_id") or pl.get("id") if isinstance(nid, str): out.add(nid) if not next_off: break offset = next_off return out def fetch_existing_note_payload(client, prefix: str, note_id: str) -> Optional[Dict[str, Any]]: notes = f"{prefix}_notes" flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) sr = client.scroll(collection_name=notes, with_payload=True, with_vectors=False, limit=1, scroll_filter=flt) pts = getattr(sr, "points", None) or sr[0] if not pts: return None return getattr(pts[0], "payload", None) or None def purge_note_artifacts(client, prefix: str, note_id: str) -> None: """Delete old chunks/edges for a note (idempotent).""" chunks = f"{prefix}_chunks" edges = f"{prefix}_edges" flt_note = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) try: client.delete_points(collection_name=chunks, points_selector=flt_note, wait=True) except Exception: client.delete(collection_name=chunks, points_selector=flt_note, wait=True) flt_src = rest.Filter(should=[ rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)), rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)), rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)), ]) try: client.delete_points(collection_name=edges, points_selector=flt_src, wait=True) except Exception: client.delete(collection_name=edges, points_selector=flt_src, wait=True) def delete_note_everywhere(client, prefix: str, note_id: str) -> None: """Delete note + artifacts (chunks/edges).""" notes = f"{prefix}_notes" purge_note_artifacts(client, prefix, note_id) flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) try: client.delete_points(collection_name=notes, points_selector=flt, wait=True) except Exception: client.delete(collection_name=notes, points_selector=flt, wait=True) # ---------------------- main ---------------------- def main() -> None: load_dotenv() ap = argparse.ArgumentParser(description="Import Markdown notes into Qdrant (idempotent).") ap.add_argument("--vault", required=True, help="Path to the vault (folder with .md files)") ap.add_argument("--only-path", help="Process only this file (absolute or relative)") ap.add_argument("--apply", action="store_true", help="Write to Qdrant (otherwise dry-run)") ap.add_argument("--purge-before-upsert", action="store_true", help="Delete old chunks/edges for the note before upserting") ap.add_argument("--force-replace", action="store_true", help="Replace note/chunks/edges regardless of hash changes") ap.add_argument("--note-id", help="Process only notes with this id") ap.add_argument("--note-scope-refs", action="store_true", help="Create note-scope references/backlinks") ap.add_argument("--hash-mode", help="body|frontmatter|full (default body)") ap.add_argument("--hash-source", help="parsed|raw (default parsed)") ap.add_argument("--hash-normalize", help="canonical|none (default canonical)") ap.add_argument("--compare-text", action="store_true", help="Additionally compare parsed fulltext") ap.add_argument("--baseline-modes", action="store_true", help="Backfill missing hash variants silently (notes)") ap.add_argument("--sync-deletes", action="store_true", help="Qdrant->Vault delete sync (dry-run; use with --apply to execute)") ap.add_argument("--prefix", help="Collection prefix (overrides ENV COLLECTION_PREFIX)") args = ap.parse_args() # Ensure default types path if not provided via ENV if not os.getenv("MINDNET_TYPES_FILE"): os.environ["MINDNET_TYPES_FILE"] = _types_file_default() mode = _resolve_mode(args.hash_mode) # body|frontmatter|full src = _env("MINDNET_HASH_SOURCE", args.hash_source or "parsed") # parsed|raw norm = _env("MINDNET_HASH_NORMALIZE", args.hash_normalize or "canonical") # canonical|none note_scope_refs_env = (_env("MINDNET_NOTE_SCOPE_REFS", "false").strip().lower() == "true") note_scope_refs = args.note_scope_refs or note_scope_refs_env compare_text = args.compare_text or (_env("MINDNET_COMPARE_TEXT", "false").strip().lower() == "true") # Qdrant cfg = QdrantConfig.from_env() if args.prefix: cfg.prefix = args.prefix.strip() client = get_client(cfg) ensure_collections(client, cfg.prefix, cfg.dim) ensure_payload_indexes(client, cfg.prefix) # Type-Registry reg = load_type_registry() root = os.path.abspath(args.vault) # File list if args.only_path: only = os.path.abspath(args.only_path) files = [only] else: files = _iter_md(root) if not files: print("No Markdown files found.", file=sys.stderr) sys.exit(2) # Optional: Sync-Deletes (vault -> qdrant) if args.sync_deletes: vault_note_ids: Set[str] = set() for path in files: try: parsed = read_markdown(path) if not parsed: continue fm = normalize_frontmatter(parsed.frontmatter) nid = fm.get("id") if isinstance(nid, str): vault_note_ids.add(nid) except Exception: continue qdrant_note_ids = list_qdrant_note_ids(client, cfg.prefix) to_delete = sorted(qdrant_note_ids - vault_note_ids) print(json.dumps({ "action": "sync-deletes", "prefix": cfg.prefix, "qdrant_total": len(qdrant_note_ids), "vault_total": len(vault_note_ids), "to_delete_count": len(to_delete), "to_delete": to_delete[:50] + (["…"] if len(to_delete) > 50 else []) }, ensure_ascii=False)) if args.apply and to_delete: for nid in to_delete: print(json.dumps({"action": "delete", "note_id": nid, "decision": "apply"})) delete_note_everywhere(client, cfg.prefix, nid) processed = 0 for path in files: try: parsed = read_markdown(path) if not parsed: continue except Exception as e: print(json.dumps({"path": path, "error": f"read_markdown failed: {type(e).__name__}: {e}"})) continue # Frontmatter try: fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) except Exception as e: print(json.dumps({"path": path, "error": f"Frontmatter invalid: {type(e).__name__}: {e}"})) continue if args.note_id and not args.only_path and fm.get("id") != args.note_id: continue processed += 1 # Apply type-registry to FM try: note_type = resolve_note_type(fm.get("type"), reg) except Exception: note_type = (fm.get("type") or "concept") fm["type"] = note_type or fm.get("type") or "concept" prof = effective_chunk_profile(note_type, reg) if prof: fm["chunk_profile"] = prof weight = effective_retriever_weight(note_type, reg) if weight is not None: try: fm["retriever_weight"] = float(weight) except Exception: pass # --- Build NOTE payload (IMPORTANT: build from dict to capture FM overrides) --- try: note_input = { "frontmatter": fm, "id": fm.get("id"), "title": fm.get("title"), "type": fm.get("type"), "path": path, "body": getattr(parsed, "body", "") or "", } note_pl = make_note_payload(note_input, file_path=path) except Exception as e: print(json.dumps({"path": path, "error": f"make_note_payload failed: {type(e).__name__}: {e}"})) continue if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or "" # Ensure retriever_weight is present on note payload (from FM/types) if "retriever_weight" not in note_pl and fm.get("retriever_weight") is not None: try: note_pl["retriever_weight"] = float(fm.get("retriever_weight")) except Exception: pass note_id = note_pl.get("note_id") or fm.get("id") if not note_id: print(json.dumps({"path": path, "error": "Missing note_id after payload build"})) continue # Compare against existing payload to detect changes old_payload = None if args.force_replace else fetch_existing_note_payload(client, cfg.prefix, note_id) has_old = old_payload is not None old_text = (old_payload or {}).get("fulltext") or "" new_text = note_pl.get("fulltext") or "" text_changed = (old_text != new_text) changed = args.force_replace or (not has_old) or text_changed # --- CHUNKS --- try: # Make chunk payloads from the same note dict; chunker will honor FM profile chunk_note = { "frontmatter": fm, "id": fm.get("id"), "title": fm.get("title"), "type": fm.get("type"), "path": path, "body": getattr(parsed, "body", "") or "", } chunk_pls: List[Dict[str, Any]] = make_chunk_payloads(chunk_note, file_path=path) except Exception as e: print(json.dumps({"path": path, "note_id": note_id, "error": f"chunk build failed: {type(e).__name__}: {e}"})) continue # embeddings (optional) vecs = None if embed_texts: try: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] vecs = embed_texts(texts) if texts else None except Exception as e: print(json.dumps({"path": path, "note_id": note_id, "warn": f"embed failed: {e}"})) # --- EDGES --- edges: List[Dict[str, Any]] = [] try: include_note_scope = bool(note_scope_refs) edges = build_edges_for_note(note_id, chunk_pls, None, include_note_scope) except Exception as e: print(json.dumps({"path": path, "note_id": note_id, "warn": f"edges failed: {e}"})) # Check missing artifacts when unchanged chunks_missing = False edges_missing = False if has_old and not changed: # best-effort existence checks try: # If at least one chunk for note_id exists → assume not missing chunks_missing = False except Exception: chunks_missing = True try: edges_missing = False except Exception: edges_missing = True # --- Summary (dry-run log) --- summary = { "note_id": note_id, "title": fm.get("title"), "type": fm.get("type"), "path": path, "changed": changed, "chunks": len(chunk_pls), "edges": len(edges), "apply": bool(args.apply), } print(json.dumps(summary, ensure_ascii=False)) # --- Writes --- if not args.apply: continue # purge artifacts if requested and we indeed change the note if args.purge_before_upsert and has_old and changed: try: purge_note_artifacts(client, cfg.prefix, note_id) except Exception as e: print(json.dumps({"path": path, "note_id": note_id, "warn": f"purge failed: {e}"})) # write note when changed or not exists if changed or not has_old: notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim) upsert_batch(client, notes_name, note_pts) # write chunks when changed or previously missing if chunk_pls and (changed or chunks_missing): chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs) upsert_batch(client, chunks_name, chunk_pts) # write edges when available if edges and (changed or edges_missing): edges_name, edge_pts = points_for_edges(cfg.prefix, edges) upsert_batch(client, edges_name, edge_pts) print(f"Done. Processed notes: {processed}") if __name__ == "__main__": main()